cleanup: extract RowBatchQueue into its own file While looking at IMPALA-7096, I noticed that RowBatchQueue was implemented in a strange place.
Change-Id: I3577c1c6920b8cf858c8d49f8812ccc305d833f6 Reviewed-on: http://gerrit.cloudera.org:8080/10943 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c845aab8 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c845aab8 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c845aab8 Branch: refs/heads/master Commit: c845aab86ea8ffb27ffb8d2f577f7a71e7e46120 Parents: 3f393bc Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Thu Jul 12 23:10:38 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Tue Jul 17 22:06:54 2018 +0000 ---------------------------------------------------------------------- be/src/exec/blocking-join-node.h | 1 + be/src/exec/exec-node.cc | 31 ------------- be/src/exec/exec-node.h | 35 --------------- be/src/exec/hdfs-scan-node.cc | 1 + be/src/exec/kudu-scan-node.cc | 1 + be/src/exec/scan-node.cc | 1 + be/src/exec/scan-node.h | 1 + be/src/exec/scanner-context.cc | 1 + be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/data-stream-recvr.cc | 1 + be/src/runtime/krpc-data-stream-recvr.cc | 1 + be/src/runtime/row-batch-queue.cc | 55 +++++++++++++++++++++++ be/src/runtime/row-batch-queue.h | 65 +++++++++++++++++++++++++++ 13 files changed, 129 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/blocking-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h index b7dd79a..8198ad0 100644 --- a/be/src/exec/blocking-join-node.h +++ b/be/src/exec/blocking-join-node.h @@ -25,6 +25,7 @@ #include "exec/exec-node.h" #include "util/promise.h" +#include "util/stopwatch.h" #include "gen-cpp/PlanNodes_types.h" // for TJoinOp http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 384d65d..766f421 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -77,37 +77,6 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) { return p->metadata(); } -ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) - : BlockingQueue<unique_ptr<RowBatch>>(max_batches) { -} - -ExecNode::RowBatchQueue::~RowBatchQueue() { - DCHECK(cleanup_queue_.empty()); -} - -void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) { - if (!BlockingPut(move(batch))) { - lock_guard<SpinLock> l(lock_); - cleanup_queue_.push_back(move(batch)); - } -} - -unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() { - unique_ptr<RowBatch> result; - if (BlockingGet(&result)) return result; - return unique_ptr<RowBatch>(); -} - -void ExecNode::RowBatchQueue::Cleanup() { - unique_ptr<RowBatch> batch = NULL; - while ((batch = GetBatch()) != NULL) { - batch.reset(); - } - - lock_guard<SpinLock> l(lock_); - cleanup_queue_.clear(); -} - ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : id_(tnode.node_id), type_(tnode.node_type), http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 9a87a56..a62ed6c 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -30,7 +30,6 @@ #include "runtime/bufferpool/reservation-tracker.h" #include "runtime/descriptors.h" // for RowDescriptor #include "runtime/reservation-manager.h" -#include "util/blocking-queue.h" #include "util/runtime-profile.h" namespace impala { @@ -243,40 +242,6 @@ class ExecNode { return reservation_manager_.ReleaseUnusedReservation(); } - /// Extends blocking queue for row batches. Row batches have a property that - /// they must be processed in the order they were produced, even in cancellation - /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches - /// and we need to make sure those ptrs stay valid. - /// Row batches that are added after Shutdown() are queued in another queue, which can - /// be cleaned up during Close(). - /// All functions are thread safe. - class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> { - public: - /// max_batches is the maximum number of row batches that can be queued. - /// When the queue is full, producers will block. - RowBatchQueue(int max_batches); - ~RowBatchQueue(); - - /// Adds a batch to the queue. This is blocking if the queue is full. - void AddBatch(std::unique_ptr<RowBatch> batch); - - /// Gets a row batch from the queue. Returns NULL if there are no more. - /// This function blocks. - /// Returns NULL after Shutdown(). - std::unique_ptr<RowBatch> GetBatch(); - - /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch() - /// after this is called. - void Cleanup(); - - private: - /// Lock protecting cleanup_queue_ - SpinLock lock_; - - /// Queue of orphaned row batches - std::list<std::unique_ptr<RowBatch>> cleanup_queue_; - }; - /// Unique within a single plan tree. int id_; TPlanNodeType::type type_; http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 1ceaf2c..5d4f9b0 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -31,6 +31,7 @@ #include "runtime/runtime-state.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/thread-resource-mgr.h" #include "util/debug-util.h" #include "util/disk-info.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index 48816f9..30194f9 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -27,6 +27,7 @@ #include "runtime/mem-pool.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/thread-resource-mgr.h" #include "runtime/tuple-row.h" #include "util/runtime-profile-counters.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index 01aa269..4d59eed 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -22,6 +22,7 @@ #include "exprs/scalar-expr.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" #include "util/disk-info.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h index 63bb59b..1d0728c 100644 --- a/be/src/exec/scan-node.h +++ b/be/src/exec/scan-node.h @@ -28,6 +28,7 @@ namespace impala { +class RowBatchQueue; class TScanRange; /// Abstract base class of all scan nodes. Subclasses support different storage layers http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 75aacee..a9fad6a 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -26,6 +26,7 @@ #include "runtime/exec-env.h" #include "runtime/mem-pool.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/runtime-state.h" #include "runtime/string-buffer.h" #include "util/debug-util.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 2fea5bd..e09b27c 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -61,6 +61,7 @@ add_library(Runtime reservation-manager.cc row-batch.cc ${ROW_BATCH_PROTO_SRCS} + row-batch-queue.cc runtime-filter.cc runtime-filter-bank.cc runtime-filter-ir.cc http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 8d9047f..c9a9ab9 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -22,6 +22,7 @@ #include "runtime/data-stream-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/sorted-run-merger.h" #include "util/condition-variable.h" #include "util/runtime-profile-counters.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/krpc-data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc index be51f32..3933e02 100644 --- a/be/src/runtime/krpc-data-stream-recvr.cc +++ b/be/src/runtime/krpc-data-stream-recvr.cc @@ -31,6 +31,7 @@ #include "runtime/krpc-data-stream-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" +#include "runtime/row-batch-queue.h" #include "runtime/sorted-run-merger.h" #include "service/data-stream-service.h" #include "util/runtime-profile-counters.h" http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/row-batch-queue.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc new file mode 100644 index 0000000..1fd5555 --- /dev/null +++ b/be/src/runtime/row-batch-queue.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/row-batch-queue.h" + +#include "runtime/row-batch.h" + +#include "common/names.h" + +namespace impala { + +RowBatchQueue::RowBatchQueue(int max_batches) + : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {} + +RowBatchQueue::~RowBatchQueue() { + DCHECK(cleanup_queue_.empty()); +} + +void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) { + if (!BlockingPut(move(batch))) { + lock_guard<SpinLock> l(lock_); + cleanup_queue_.push_back(move(batch)); + } +} + +unique_ptr<RowBatch> RowBatchQueue::GetBatch() { + unique_ptr<RowBatch> result; + if (BlockingGet(&result)) return result; + return unique_ptr<RowBatch>(); +} + +void RowBatchQueue::Cleanup() { + unique_ptr<RowBatch> batch = nullptr; + while ((batch = GetBatch()) != nullptr) { + batch.reset(); + } + + lock_guard<SpinLock> l(lock_); + cleanup_queue_.clear(); +} +} http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/row-batch-queue.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h new file mode 100644 index 0000000..bd2f551 --- /dev/null +++ b/be/src/runtime/row-batch-queue.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H +#define IMPALA_RUNTIME_BLOCKING_QUEUE_H + +#include <list> +#include <memory> + +#include "util/blocking-queue.h" +#include "util/spinlock.h" + +namespace impala { + +class RowBatch; + +/// Extends blocking queue for row batches. Row batches have a property that +/// they must be processed in the order they were produced, even in cancellation +/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches +/// and we need to make sure those ptrs stay valid. +/// Row batches that are added after Shutdown() are queued in a separate "cleanup" +/// queue, which can be cleaned up during Close(). +/// All functions are thread safe. +class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> { + public: + /// max_batches is the maximum number of row batches that can be queued. + /// When the queue is full, producers will block. + RowBatchQueue(int max_batches); + ~RowBatchQueue(); + + /// Adds a batch to the queue. This is blocking if the queue is full. + void AddBatch(std::unique_ptr<RowBatch> batch); + + /// Gets a row batch from the queue. Returns NULL if there are no more. + /// This function blocks. + /// Returns NULL after Shutdown(). + std::unique_ptr<RowBatch> GetBatch(); + + /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch() + /// after this is called. + void Cleanup(); + + private: + /// Lock protecting cleanup_queue_ + SpinLock lock_; + + /// Queue of orphaned row batches + std::list<std::unique_ptr<RowBatch>> cleanup_queue_; +}; +} +#endif