IMPALA-3567: Part 1: groundwork to make Join build sides DataSinks Refactor DataSink interface to be more generic. We need more flexibility in setting up MemTrackers, so that memory is accounted against the right ExecNode. Also removes some redundancy between DataSink subclasses in setting up RuntimeProfiles, etc.
Remove the redundancy in the DataSink between passing eos to GetNext() and FlushFinal(). This simplifies HdfsTableSink quite a bit and makes handling empty batches simpler. Partially refactor join nodes so that the control flow between BlockingJoinNode::Open() and its subclasses is easier to follow. BlockingJoinNode now only calls one virtual function on its subclasses: ConstructJoinBuild(). Once we convert all join nodes to use the DataSink interface, we will also be able to remove that as well. As a minor optimisation, avoid updating a timer that is ignored for non-async builds. As a proof of concept, this patch separates out the build side of NestedLoopJoinNode into a class that implements the DataSink interface. The main challenge here is that NestedLoopJoinNode recycles row batches to avoid reallocations and copies of row batches in subplans. The solution to this is: - Retain the special-case optimisation for SingularRowSrc - Use the row batch cache and RowBatch::AcquireState() to copy the state of row batches passed to Send(), while recycling the RowBatch objects. Refactoring the partitioned hash join is left for Part 2. Testing: Ran exhaustive, core ASAN, and exhaustive non-partioned agg/join builds. Also ran a local stress test. Performance: Ran TPC-H nested locally. The results show that the change is perf-neutral. +------------------+-----------------------+---------+------------+------------+----------------+ | Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) | +------------------+-----------------------+---------+------------+------------+----------------+ | TPCH_NESTED(_20) | parquet / none / none | 7.75 | +0.19% | 5.64 | -0.34% | +------------------+-----------------------+---------+------------+------------+----------------+ +------------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ | Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters | +------------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ | TPCH_NESTED(_20) | TPCH-Q17 | parquet / none / none | 18.96 | 17.95 | +5.61% | 4.97% | 0.71% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q14 | parquet / none / none | 3.61 | 3.56 | +1.25% | 0.97% | 1.19% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q8 | parquet / none / none | 6.25 | 6.23 | +0.44% | 0.44% | 0.90% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q10 | parquet / none / none | 5.84 | 5.83 | +0.30% | 1.21% | 1.82% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q5 | parquet / none / none | 4.91 | 4.90 | +0.11% | 0.18% | 0.78% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q21 | parquet / none / none | 17.82 | 17.81 | +0.07% | 0.66% | 0.58% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q4 | parquet / none / none | 5.12 | 5.12 | -0.02% | 0.97% | 1.23% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q9 | parquet / none / none | 23.85 | 23.88 | -0.15% | 0.72% | 0.22% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q12 | parquet / none / none | 6.15 | 6.16 | -0.16% | 1.60% | 1.72% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q3 | parquet / none / none | 5.46 | 5.47 | -0.23% | 1.28% | 0.90% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q16 | parquet / none / none | 3.61 | 3.62 | -0.26% | 1.00% | 1.36% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q19 | parquet / none / none | 20.19 | 20.31 | -0.58% | 1.67% | 0.65% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q7 | parquet / none / none | 9.42 | 9.48 | -0.68% | 0.87% | 0.71% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q18 | parquet / none / none | 12.94 | 13.06 | -0.90% | 0.59% | 0.48% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q22 | parquet / none / none | 1.09 | 1.10 | -0.92% | 2.26% | 2.22% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q13 | parquet / none / none | 3.75 | 3.78 | -0.94% | 2.04% | 2.86% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q20 | parquet / none / none | 4.33 | 4.37 | -1.10% | 3.00% | 2.43% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q2 | parquet / none / none | 2.39 | 2.42 | -1.38% | 1.54% | 1.30% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q11 | parquet / none / none | 1.43 | 1.46 | -1.78% | 2.05% | 2.77% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q6 | parquet / none / none | 2.29 | 2.33 | -1.79% | 0.56% | 1.23% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q15 | parquet / none / none | 5.04 | 5.13 | -1.84% | 0.61% | 2.01% | 1 | 10 | | TPCH_NESTED(_20) | TPCH-Q1 | parquet / none / none | 5.98 | 6.12 | -2.30% | 1.84% | 3.19% | 1 | 10 | +------------------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+ Change-Id: I9d7608181eeacfe706a09c1e153d0a3e1ee9b475 Reviewed-on: http://gerrit.cloudera.org:8080/3842 Reviewed-by: Tim Armstrong <[email protected]> Reviewed-by: Marcel Kornacker <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3e2411f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3e2411f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3e2411f3 Branch: refs/heads/master Commit: 3e2411f3078314c03bbe9c9b225770aa1580fdc4 Parents: ffa7829 Author: Tim Armstrong <[email protected]> Authored: Wed May 18 15:15:02 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Tue Aug 9 19:26:22 2016 +0000 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 1 + be/src/exec/blocking-join-node.cc | 110 ++++++++++++---- be/src/exec/blocking-join-node.h | 57 +++++--- be/src/exec/data-sink.cc | 15 ++- be/src/exec/data-sink.h | 54 +++++--- be/src/exec/hash-join-node.cc | 25 +++- be/src/exec/hash-join-node.h | 12 +- be/src/exec/hbase-table-sink.cc | 20 ++- be/src/exec/hbase-table-sink.h | 12 +- be/src/exec/hdfs-table-sink.cc | 90 ++++++------- be/src/exec/hdfs-table-sink.h | 22 +--- be/src/exec/kudu-table-sink-test.cc | 7 +- be/src/exec/kudu-table-sink.cc | 36 +++-- be/src/exec/kudu-table-sink.h | 13 +- be/src/exec/nested-loop-join-builder.cc | 105 +++++++++++++++ be/src/exec/nested-loop-join-builder.h | 100 ++++++++++++++ be/src/exec/nested-loop-join-node.cc | 175 ++++++++++--------------- be/src/exec/nested-loop-join-node.h | 48 +++---- be/src/exec/partitioned-hash-join-node.cc | 70 +++++----- be/src/exec/partitioned-hash-join-node.h | 7 +- be/src/exec/row-batch-cache.h | 26 +++- be/src/runtime/data-stream-sender.cc | 38 +++--- be/src/runtime/data-stream-sender.h | 11 +- be/src/runtime/data-stream-test.cc | 4 +- be/src/runtime/plan-fragment-executor.cc | 84 +++++------- be/src/runtime/plan-fragment-executor.h | 6 +- be/src/runtime/row-batch.cc | 9 ++ be/src/runtime/row-batch.h | 4 + be/src/util/stopwatch.h | 16 ++- 29 files changed, 717 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 14db1ae..afb1a59 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -64,6 +64,7 @@ add_library(Exec hbase-scan-node.cc hbase-table-scanner.cc incr-stats-util.cc + nested-loop-join-builder.cc nested-loop-join-node.cc parquet-column-readers.cc parquet-metadata-utils.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/blocking-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index 788cf15..ea541c0 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -19,7 +19,9 @@ #include <sstream> +#include "exec/data-sink.h" #include "exprs/expr.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/tuple-row.h" @@ -72,7 +74,6 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); - build_pool_.reset(new MemPool(mem_tracker())); build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime"); probe_timer_ = ADD_TIMER(runtime_profile(), "ProbeTime"); build_row_counter_ = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); @@ -128,6 +129,8 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) { new char[probe_tuple_row_size_ + build_tuple_row_size_]); } + build_batch_.reset( + new RowBatch(child(1)->row_desc(), state->batch_size(), mem_tracker())); probe_batch_.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); return Status::OK(); @@ -135,17 +138,22 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) { void BlockingJoinNode::Close(RuntimeState* state) { if (is_closed()) return; - if (build_pool_.get() != NULL) build_pool_->FreeAll(); + build_batch_.reset(); probe_batch_.reset(); if (semi_join_staging_row_ != NULL) delete[] semi_join_staging_row_; ExecNode::Close(state); } -void BlockingJoinNode::BuildSideThread(RuntimeState* state, Promise<Status>* status) { +void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, + Promise<Status>* status) { Status s; { SCOPED_TIMER(state->total_cpu_timer()); - s = ConstructBuildSide(state); + if (build_sink == NULL){ + s = ProcessBuildInput(state); + } else { + s = SendBuildInputToSink<true>(state, build_sink); + } } // IMPALA-1863: If the build-side thread failed, then we need to close the right // (build-side) child to avoid a potential deadlock between fragment instances. This @@ -161,19 +169,21 @@ void BlockingJoinNode::BuildSideThread(RuntimeState* state, Promise<Status>* sta } Status BlockingJoinNode::Open(RuntimeState* state) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Open(state)); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); eos_ = false; probe_side_eos_ = false; + return Status::OK(); +} +Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state, + DataSink* build_sink) { // If this node is not inside a subplan and can get a thread token, initiate the // construction of the build-side table in a separate thread, so that the left child // can do any initialisation in parallel. Otherwise, do this in the main thread. // Inside a subplan we expect Open() to be called a number of times proportional to the - // input data of the SubplanNode, so we prefer doing the join build in the main thread, - // assuming that thread creation is expensive relative to a single subplan iteration. + // input data of the SubplanNode, so we prefer doing processing the build input in the + // main thread, assuming that thread creation is expensive relative to a single subplan + // iteration. // // In this block, we also compute the 'overlap' time for the left and right child. This // is the time (i.e. clock reads) when the right child stops overlapping with the left @@ -184,7 +194,8 @@ Status BlockingJoinNode::Open(RuntimeState* state) { Promise<Status> build_side_status; AddRuntimeExecOption("Join Build-Side Prepared Asynchronously"); Thread build_thread(node_name_, "build thread", - bind(&BlockingJoinNode::BuildSideThread, this, state, &build_side_status)); + bind(&BlockingJoinNode::ProcessBuildInputAsync, this, state, build_sink, + &build_side_status)); if (!state->cgroup().empty()) { Status status = state->exec_env()->cgroups_mgr()->AssignThreadToCgroup( build_thread, state->cgroup()); @@ -200,7 +211,7 @@ Status BlockingJoinNode::Open(RuntimeState* state) { // The left/right child overlap stops here. built_probe_overlap_stop_watch_.SetTimeCeiling(); - // Blocks until ConstructBuildSide has returned, after which the build side structures + // Blocks until ProcessBuildInput has returned, after which the build side structures // are fully constructed. RETURN_IF_ERROR(build_side_status.Get()); RETURN_IF_ERROR(open_status); @@ -212,34 +223,79 @@ Status BlockingJoinNode::Open(RuntimeState* state) { // TODO: Remove this special-case behavior for subplans once we have proper // projection. See UnnestNode for details on the current projection implementation. RETURN_IF_ERROR(child(0)->Open(state)); - RETURN_IF_ERROR(ConstructBuildSide(state)); + if (build_sink == NULL) { + RETURN_IF_ERROR(ProcessBuildInput(state)); + } else { + RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink)); + } } else { // The left/right child never overlap. The overlap stops here. built_probe_overlap_stop_watch_.SetTimeCeiling(); - RETURN_IF_ERROR(ConstructBuildSide(state)); + if (build_sink == NULL) { + RETURN_IF_ERROR(ProcessBuildInput(state)); + } else { + RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink)); + } RETURN_IF_ERROR(child(0)->Open(state)); } + return Status::OK(); +} - // Seed left child in preparation for GetNext(). +Status BlockingJoinNode::GetFirstProbeRow(RuntimeState* state) { + DCHECK(!probe_side_eos_); + DCHECK_EQ(probe_batch_->num_rows(), 0); while (true) { RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_)); COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows()); probe_batch_pos_ = 0; - if (probe_batch_->num_rows() == 0) { - if (probe_side_eos_) { - RETURN_IF_ERROR(InitGetNext(NULL /* eos */)); - // If the probe side is exhausted, set the eos_ to true for only those - // join modes that don't need to process unmatched build rows. - eos_ = !NeedToProcessUnmatchedBuildRows(); - break; - } - probe_batch_->Reset(); - continue; - } else { + if (probe_batch_->num_rows() > 0) { current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++); - RETURN_IF_ERROR(InitGetNext(current_probe_row_)); - break; + return Status::OK(); + } else if (probe_side_eos_) { + // If the probe side is exhausted, set the eos_ to true for only those + // join modes that don't need to process unmatched build rows. + eos_ = !NeedToProcessUnmatchedBuildRows(); + return Status::OK(); } + probe_batch_->Reset(); + } +} + +template <bool ASYNC_BUILD> +Status BlockingJoinNode::SendBuildInputToSink(RuntimeState* state, + DataSink* build_sink) { + { + CONDITIONAL_SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_, ASYNC_BUILD); + RETURN_IF_ERROR(child(1)->Open(state)); + } + + { + SCOPED_TIMER(build_timer_); + RETURN_IF_ERROR(build_sink->Open(state)); + } + + DCHECK_EQ(build_batch_->num_rows(), 0); + bool eos = false; + do { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + { + CONDITIONAL_SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_, ASYNC_BUILD); + RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos)); + } + COUNTER_ADD(build_row_counter_, build_batch_->num_rows()); + + { + SCOPED_TIMER(build_timer_); + RETURN_IF_ERROR(build_sink->Send(state, build_batch_.get())); + } + build_batch_->Reset(); + } while (!eos); + + { + SCOPED_TIMER(build_timer_); + RETURN_IF_ERROR(build_sink->FlushFinal(state)); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/blocking-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h index c4f2579..b3d25c1 100644 --- a/be/src/exec/blocking-join-node.h +++ b/be/src/exec/blocking-join-node.h @@ -31,6 +31,7 @@ namespace impala { class MemPool; +class MemTracker; class RowBatch; class TupleRow; @@ -52,9 +53,7 @@ class BlockingJoinNode : public ExecNode { /// Prepare() work, e.g. codegen. virtual Status Prepare(RuntimeState* state); - /// Open prepares the build side structures (subclasses should implement - /// ConstructBuildSide()) and then prepares for GetNext with the first left child row - /// (subclasses should implement InitGetNext()). + /// Calls ExecNode::Open() and initializes 'eos_' and 'probe_side_eos_'. virtual Status Open(RuntimeState* state); /// Subclasses should close any other structures and then call @@ -67,7 +66,8 @@ class BlockingJoinNode : public ExecNode { const std::string node_name_; TJoinOp::type join_op_; - boost::scoped_ptr<MemPool> build_pool_; // holds everything referenced from build side + /// Store in node to avoid reallocating. Cleared after build completes. + boost::scoped_ptr<RowBatch> build_batch_; /// probe_batch_ must be cleared before calling GetNext(). The child node /// does not initialize all tuple ptrs in the row, only the ones that it @@ -77,8 +77,6 @@ class BlockingJoinNode : public ExecNode { bool eos_; // if true, nothing left to return in GetNext() bool probe_side_eos_; // if true, left child has no more rows to process - /// TODO: These variables should move to a join control block struct, which is local to - /// each probing thread. int probe_batch_pos_; // current scan pos in probe_batch_ TupleRow* current_probe_row_; // The row currently being probed bool matched_probe_; // if true, the current probe row is matched @@ -103,15 +101,36 @@ class BlockingJoinNode : public ExecNode { /// with the probe child Open(). MonotonicStopWatch built_probe_overlap_stop_watch_; - /// Init the build-side state for a new left child row (e.g. hash table iterator or list - /// iterator) given the first row. Used in Open() to prepare for GetNext(). - /// A NULL ptr for first_left_child_row indicates the left child eos. - virtual Status InitGetNext(TupleRow* first_left_child_row) = 0; - - /// We parallelize building the build-side with Open'ing the left child. If, for example, - /// the left child is another join node, it can start to build its own build-side at the - /// same time. - virtual Status ConstructBuildSide(RuntimeState* state) = 0; + /// Processes the build-side input. + /// Called from ConstructBuildAndOpenProbe() if the subclass does not provide a + /// DataSink to consume the build input. + /// Note that this can be called concurrently with Open'ing the left child to + /// increase parallelism. If, for example, the left child is another join node, + /// it can start its own build at the same time. + /// TODO: move all subclasses to use the DataSink interface and remove this method. + virtual Status ProcessBuildInput(RuntimeState* state) = 0; + + /// Processes the build-side input and opens the probe side. Will do both concurrently + /// if the plan shape and thread token availability permit it. + /// If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'. Otherwise + /// calls ProcessBuildInput on the subclass. + Status ConstructBuildAndOpenProbe(RuntimeState* state, DataSink* build_sink); + + /// Helper function to process the build input by sending it to a DataSink. + /// ASYNC_BUILD enables timers that impose some overhead but are required if the build + /// is processed concurrently with the Open() of the left child. + template <bool ASYNC_BUILD> + Status SendBuildInputToSink(RuntimeState* state, DataSink* build_sink); + + /// Set up 'current_probe_row_' to point to the first input row from the left child + /// (probe side). Fills 'probe_batch_' with rows from the left child and updates + /// 'probe_batch_pos_' to the index of the row in 'probe_batch_' after + /// 'current_probe_row_'. 'probe_side_eos_' is set to true if 'probe_batch_' is the + /// last batch to be returned from the child. + /// If eos of the left child is reached and no rows are returned, 'current_probe_row_' + /// is set to NULL and 'eos_' is set to true for join modes where unmatched rows from + /// the build side do not need to be returned. + Status GetFirstProbeRow(RuntimeState* state); /// Gives subclasses an opportunity to add debug output to the debug string printed by /// DebugString(). @@ -180,9 +199,11 @@ class BlockingJoinNode : public ExecNode { const MonotonicStopWatch* child_overlap_timer); private: - /// Supervises ConstructBuildSide in a separate thread, and returns its status in the - /// promise parameter. - void BuildSideThread(RuntimeState* state, Promise<Status>* status); + /// The main function for the thread that processes the build input asynchronously. + /// Its status is returned in the 'status' promise. If 'build_sink' is non-NULL, it + /// is used for the build. Otherwise, ProcessBuildInput() is called on the subclass. + void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, + Promise<Status>* status); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index 44de219..a6c8fcd 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -37,6 +37,11 @@ namespace impala { +DataSink::DataSink(const RowDescriptor& row_desc) : + closed_(false), row_desc_(row_desc), mem_tracker_(NULL) {} + +DataSink::~DataSink() {} + Status DataSink::CreateDataSink(ObjectPool* pool, const TDataSink& thrift_sink, const vector<TExpr>& output_exprs, const TPlanFragmentInstanceCtx& fragment_instance_ctx, @@ -144,14 +149,16 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats, return ss.str(); } -Status DataSink::Prepare(RuntimeState* state) { - expr_mem_tracker_.reset( - new MemTracker(-1, -1, "Data sink expr", state->instance_mem_tracker(), false)); +Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { + DCHECK(mem_tracker != NULL); + profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName())); + mem_tracker_ = mem_tracker; + expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker, false)); return Status::OK(); } void DataSink::Close(RuntimeState* state) { - if (expr_mem_tracker_.get() != NULL) { + if (expr_mem_tracker_ != NULL) { expr_mem_tracker_->UnregisterFromParent(); expr_mem_tracker_.reset(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/data-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h index a72f0ac..95acf16 100644 --- a/be/src/exec/data-sink.h +++ b/be/src/exec/data-sink.h @@ -24,11 +24,13 @@ #include "common/status.h" #include "runtime/runtime-state.h" +#include "util/runtime-profile.h" #include "gen-cpp/DataSinks_types.h" #include "gen-cpp/Exprs_types.h" namespace impala { +class MemTracker; class ObjectPool; class RowBatch; class RuntimeProfile; @@ -38,31 +40,42 @@ class TPlanExecParams; class TPlanFragmentInstanceCtx; class RowDescriptor; -/// Superclass of all data sinks. +/// A data sink is an abstract interface for data sinks that consume RowBatches. E.g. +/// a sink may write a HDFS table, send data across the network, or build hash tables +/// for a join. +// +/// Clients of the DataSink interface drive the data sink by repeatedly calling Send() +/// with batches. Before Send() is called, the sink must be initialized by calling +/// Prepare() during the prepare phase of the query fragment, then Open(). After the last +/// batch has been sent, FlushFinal() should be called to complete any processing. +/// Close() is called to release any resources before destroying the sink. class DataSink { public: - DataSink() : closed_(false) { } - virtual ~DataSink() {} + DataSink(const RowDescriptor& row_desc); + virtual ~DataSink(); + + /// Return the name to use in profiles, etc. + virtual std::string GetName() = 0; - /// Setup. Call before Send(), Open(), or Close(). - /// Subclasses must call DataSink::Prepare(). - virtual Status Prepare(RuntimeState* state); + /// Setup. Call before Send(), Open(), or Close() during the prepare phase of the query + /// fragment. Any memory allocated will be tracked against the caller-provided + /// 'mem_tracker'. Subclasses must call DataSink::Prepare(). + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); - /// Call before Send() or Close(). + /// Call before Send() to open the sink. virtual Status Open(RuntimeState* state) = 0; - /// Send a row batch into this sink. - /// eos should be true when the last batch is passed to Send() - virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos) = 0; + /// Send a row batch into this sink. Send() may modify 'batch' by acquiring its state. + virtual Status Send(RuntimeState* state, RowBatch* batch) = 0; /// Flushes any remaining buffered state. /// Further Send() calls are illegal after FlushFinal(). This is to be called only /// before calling Close(). virtual Status FlushFinal(RuntimeState* state) = 0; - /// Releases all resources that were allocated in Prepare()/Send(). + /// Releases all resources that were allocated in Open()/Send(). /// Further Send() calls or FlushFinal() calls are illegal after calling Close(). - /// It must be okay to call this multiple times. Subsequent calls should be ignored. + /// Must be idempotent. virtual void Close(RuntimeState* state); /// Creates a new data sink from thrift_sink. A pointer to the @@ -72,9 +85,6 @@ class DataSink { const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink); - /// Returns the runtime profile for the sink. - virtual RuntimeProfile* profile() = 0; - /// Merges one update to the insert stats for a partition. dst_stats will have the /// combined stats of src_stats and dst_stats after this method returns. static void MergeInsertStats(const TInsertStats& src_stats, @@ -85,11 +95,25 @@ class DataSink { static std::string OutputInsertStats(const PartitionStatusMap& stats, const std::string& prefix = ""); + MemTracker* mem_tracker() const { return mem_tracker_; } + RuntimeProfile* profile() const { return profile_; } + protected: /// Set to true after Close() has been called. Subclasses should check and set this in /// Close(). bool closed_; + /// The row descriptor for the rows consumed by the sink. Not owned. + const RowDescriptor& row_desc_; + + /// The runtime profile for this DataSink. Initialized in Prepare(). Not owned. + RuntimeProfile* profile_; + + /// The MemTracker for all allocations made by the DataSink. Initialized in Prepare(). + /// Not owned. + MemTracker* mem_tracker_; + + /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in Prepare(). boost::scoped_ptr<MemTracker> expr_mem_tracker_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc index 9eb9276..7a3ca87 100644 --- a/be/src/exec/hash-join-node.cc +++ b/be/src/exec/hash-join-node.cc @@ -25,6 +25,7 @@ #include "exec/old-hash-table.inline.h" #include "exprs/expr.h" #include "gutil/strings/substitute.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter.h" #include "runtime/runtime-filter-bank.h" @@ -142,6 +143,7 @@ Status HashJoinNode::Prepare(RuntimeState* state) { filter_expr_ctxs_, child(1)->row_desc().tuple_descriptors().size(), stores_nulls, is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_)); + build_pool_.reset(new MemPool(mem_tracker())); bool build_codegen_enabled = false; bool probe_codegen_enabled = false; @@ -184,6 +186,7 @@ Status HashJoinNode::Reset(RuntimeState* state) { void HashJoinNode::Close(RuntimeState* state) { if (is_closed()) return; if (hash_tbl_.get() != NULL) hash_tbl_->Close(); + if (build_pool_.get() != NULL) build_pool_->FreeAll(); Expr::Close(build_expr_ctxs_, state); Expr::Close(probe_expr_ctxs_, state); Expr::Close(filter_expr_ctxs_, state); @@ -191,12 +194,25 @@ void HashJoinNode::Close(RuntimeState* state) { BlockingJoinNode::Close(state); } -Status HashJoinNode::ConstructBuildSide(RuntimeState* state) { +Status HashJoinNode::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(BlockingJoinNode::Open(state)); RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state)); RETURN_IF_ERROR(Expr::Open(filter_expr_ctxs_, state)); RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state)); + // Check for errors and free local allocations before opening children. + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL)); + RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); + InitGetNext(); + return Status::OK(); +} + +Status HashJoinNode::ProcessBuildInput(RuntimeState* state) { // Do a full scan of child(1) and store everything in hash_tbl_ // The hash join node needs to keep in memory all build tuples, including the tuple // row ptrs. The row ptrs are copied into the hash table's internal structure so they @@ -251,14 +267,13 @@ Status HashJoinNode::ConstructBuildSide(RuntimeState* state) { return Status::OK(); } -Status HashJoinNode::InitGetNext(TupleRow* first_probe_row) { - if (first_probe_row == NULL) { +void HashJoinNode::InitGetNext() { + if (current_probe_row_ == NULL) { hash_tbl_iterator_ = hash_tbl_->Begin(); } else { matched_probe_ = false; - hash_tbl_iterator_ = hash_tbl_->Find(first_probe_row); + hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_); } - return Status::OK(); } Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h index 1c22a42..2e5ca6e 100644 --- a/be/src/exec/hash-join-node.h +++ b/be/src/exec/hash-join-node.h @@ -54,7 +54,7 @@ class HashJoinNode : public BlockingJoinNode { virtual Status Init(const TPlanNode& tnode, RuntimeState* state); virtual Status Prepare(RuntimeState* state); - // Open() implemented in BlockingJoinNode + virtual Status Open(RuntimeState* state); virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); virtual Status Reset(RuntimeState* state); virtual void Close(RuntimeState* state); @@ -63,13 +63,16 @@ class HashJoinNode : public BlockingJoinNode { protected: virtual void AddToDebugString(int indentation_level, std::stringstream* out) const; - virtual Status InitGetNext(TupleRow* first_probe_row); - virtual Status ConstructBuildSide(RuntimeState* state); + + virtual Status ProcessBuildInput(RuntimeState* state); private: boost::scoped_ptr<OldHashTable> hash_tbl_; OldHashTable::Iterator hash_tbl_iterator_; + /// holds everything referenced from build side + boost::scoped_ptr<MemPool> build_pool_; + /// our equi-join predicates "<lhs> = <rhs>" are separated into /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0)) std::vector<ExprContext*> probe_expr_ctxs_; @@ -116,6 +119,9 @@ class HashJoinNode : public BlockingJoinNode { RuntimeProfile::Counter* build_buckets_counter_; // num buckets in hash table RuntimeProfile::Counter* hash_tbl_load_factor_counter_; + /// Prepares for the first call to GetNext(). Must be called after GetFirstProbeRow(). + void InitGetNext(); + /// GetNext helper function for the common join cases: Inner join, left semi and left /// outer Status LeftJoinGetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hbase-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index f154cd1..e6052cc 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -36,10 +36,10 @@ const static string& ROOT_PARTITION_KEY = HBaseTableSink::HBaseTableSink(const RowDescriptor& row_desc, const vector<TExpr>& select_list_texprs, const TDataSink& tsink) - : table_id_(tsink.table_sink.target_table_id), + : DataSink(row_desc), + table_id_(tsink.table_sink.target_table_id), table_desc_(NULL), hbase_table_writer_(NULL), - row_desc_(row_desc), select_list_texprs_(select_list_texprs) { } @@ -53,11 +53,9 @@ Status HBaseTableSink::PrepareExprs(RuntimeState* state) { return Status::OK(); } -Status HBaseTableSink::Prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::Prepare(state)); - runtime_profile_ = state->obj_pool()->Add( - new RuntimeProfile(state->obj_pool(), "HbaseTableSink")); - SCOPED_TIMER(runtime_profile_->total_time_counter()); +Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker)); + SCOPED_TIMER(profile()->total_time_counter()); // Get the hbase table descriptor. The table name will be used. table_desc_ = static_cast<HBaseTableDescriptor*>( @@ -66,7 +64,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state) { RETURN_IF_ERROR(PrepareExprs(state)); // Now that expressions are ready to materialize tuples, create the writer. hbase_table_writer_.reset( - new HBaseTableWriter(table_desc_, output_expr_ctxs_, runtime_profile_)); + new HBaseTableWriter(table_desc_, output_expr_ctxs_, profile())); // Try and init the table writer. This can create connections to HBase and // to zookeeper. @@ -86,8 +84,8 @@ Status HBaseTableSink::Open(RuntimeState* state) { return Expr::Open(output_expr_ctxs_, state); } -Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); +Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(profile()->total_time_counter()); ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. @@ -104,7 +102,7 @@ Status HBaseTableSink::FlushFinal(RuntimeState* state) { void HBaseTableSink::Close(RuntimeState* state) { if (closed_) return; - SCOPED_TIMER(runtime_profile_->total_time_counter()); + SCOPED_TIMER(profile()->total_time_counter()); if (hbase_table_writer_.get() != NULL) { hbase_table_writer_->Close(state); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hbase-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h index 517355a..7e40033 100644 --- a/be/src/exec/hbase-table-sink.h +++ b/be/src/exec/hbase-table-sink.h @@ -38,12 +38,12 @@ class HBaseTableSink : public DataSink { HBaseTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); - virtual Status Prepare(RuntimeState* state); + virtual std::string GetName() { return "HBaseTableSink"; } + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); virtual Status Open(RuntimeState* state); - virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos); + virtual Status Send(RuntimeState* state, RowBatch* batch); virtual Status FlushFinal(RuntimeState* state); virtual void Close(RuntimeState* state); - virtual RuntimeProfile* profile() { return runtime_profile_; } private: /// Turn thrift TExpr into Expr and prepare them to run @@ -61,14 +61,8 @@ class HBaseTableSink : public DataSink { boost::scoped_ptr<HBaseTableWriter> hbase_table_writer_; /// Owned by the RuntimeState. - const RowDescriptor& row_desc_; - - /// Owned by the RuntimeState. const std::vector<TExpr>& select_list_texprs_; std::vector<ExprContext*> output_expr_ctxs_; - - /// Allocated from runtime state's pool. - RuntimeProfile* runtime_profile_; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 85f180a..0626bd4 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -58,7 +58,7 @@ const static string& ROOT_PARTITION_KEY = HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, const vector<TExpr>& select_list_texprs, const TDataSink& tsink) - : row_desc_(row_desc), + : DataSink(row_desc), table_id_(tsink.table_sink.target_table_id), skip_header_line_count_( tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count @@ -108,12 +108,10 @@ Status HdfsTableSink::PrepareExprs(RuntimeState* state) { return Status::OK(); } -Status HdfsTableSink::Prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::Prepare(state)); +Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker)); unique_id_str_ = PrintId(state->fragment_instance_id(), "-"); - runtime_profile_ = state->obj_pool()->Add( - new RuntimeProfile(state->obj_pool(), "HdfsTableSink")); - SCOPED_TIMER(runtime_profile_->total_time_counter()); + SCOPED_TIMER(profile()->total_time_counter()); // TODO: Consider a system-wide random number generator, initialised in a single place. ptime now = microsec_clock::local_time(); @@ -141,8 +139,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state) { PrintId(state->query_id(), "_")); RETURN_IF_ERROR(PrepareExprs(state)); - mem_tracker_.reset(new MemTracker(profile(), -1, -1, profile()->name(), - state->instance_mem_tracker())); partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); @@ -534,39 +530,33 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, return Status::OK(); } -Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); +Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(profile()->total_time_counter()); ExprContext::FreeLocalAllocations(output_expr_ctxs_); ExprContext::FreeLocalAllocations(partition_key_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); - bool empty_input_batch = batch->num_rows() == 0; - // We don't do any work for an empty batch aside from end of stream finalization. - if (empty_input_batch && !eos) return Status::OK(); + // We don't do any work for an empty batch. + if (batch->num_rows() == 0) return Status::OK(); // If there are no partition keys then just pass the whole batch to one partition. if (dynamic_partition_key_expr_ctxs_.empty()) { // If there are no dynamic keys just use an empty key. PartitionPair* partition_pair; - // Populate the partition_pair even if the input is empty because we need it to - // delete the existing data for 'insert overwrite'. We need to handle empty input - // batches carefully so that empty partitions are correctly created at eos. - RETURN_IF_ERROR(GetOutputPartition(state, ROOT_PARTITION_KEY, &partition_pair, - empty_input_batch)); - if (!empty_input_batch) { - // Pass the row batch to the writer. If new_file is returned true then the current - // file is finalized and a new file is opened. - // The writer tracks where it is in the batch when it returns with new_file set. - OutputPartition* output_partition = partition_pair->first; - bool new_file; - do { - RETURN_IF_ERROR(output_partition->writer->AppendRowBatch( - batch, partition_pair->second, &new_file)); - if (new_file) { - RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); - RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); - } - } while (new_file); - } + RETURN_IF_ERROR( + GetOutputPartition(state, ROOT_PARTITION_KEY, &partition_pair, false)); + // Pass the row batch to the writer. If new_file is returned true then the current + // file is finalized and a new file is opened. + // The writer tracks where it is in the batch when it returns with new_file set. + OutputPartition* output_partition = partition_pair->first; + bool new_file; + do { + RETURN_IF_ERROR(output_partition->writer->AppendRowBatch( + batch, partition_pair->second, &new_file)); + if (new_file) { + RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); + RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); + } + } while (new_file); } else { for (int i = 0; i < batch->num_rows(); ++i) { current_row_ = batch->GetRow(i); @@ -594,16 +584,6 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) { partition->second.second.clear(); } } - - if (eos) { - // Close Hdfs files, and update stats in runtime state. - for (PartitionMap::iterator cur_partition = - partition_keys_to_output_partitions_.begin(); - cur_partition != partition_keys_to_output_partitions_.end(); - ++cur_partition) { - RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first)); - } - } return Status::OK(); } @@ -646,7 +626,23 @@ void HdfsTableSink::ClosePartitionFile(RuntimeState* state, OutputPartition* par } Status HdfsTableSink::FlushFinal(RuntimeState* state) { - // Currently a no-op function. + SCOPED_TIMER(profile()->total_time_counter()); + + if (dynamic_partition_key_expr_ctxs_.empty()) { + // Make sure we create an output partition even if the input is empty because we need + // it to delete the existing data for 'insert overwrite'. + PartitionPair* dummy; + RETURN_IF_ERROR(GetOutputPartition(state, ROOT_PARTITION_KEY, &dummy, true)); + } + + // Close Hdfs files, and update stats in runtime state. + for (PartitionMap::iterator cur_partition = + partition_keys_to_output_partitions_.begin(); + cur_partition != partition_keys_to_output_partitions_.end(); + ++cur_partition) { + RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first)); + } + // TODO: Move call to ClosePartitionFile() here so that the error status can be // propagated. If closing the file fails, the query should fail. return Status::OK(); @@ -654,7 +650,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) { void HdfsTableSink::Close(RuntimeState* state) { if (closed_) return; - SCOPED_TIMER(runtime_profile_->total_time_counter()); + SCOPED_TIMER(profile()->total_time_counter()); for (PartitionMap::iterator cur_partition = partition_keys_to_output_partitions_.begin(); cur_partition != partition_keys_to_output_partitions_.end(); @@ -674,10 +670,6 @@ void HdfsTableSink::Close(RuntimeState* state) { } Expr::Close(output_expr_ctxs_, state); Expr::Close(partition_key_expr_ctxs_, state); - if (mem_tracker_.get() != NULL) { - mem_tracker_->UnregisterFromParent(); - mem_tracker_.reset(); - } DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index b2543ff..fc32f4d 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -27,7 +27,6 @@ #include "common/object-pool.h" #include "exec/data-sink.h" #include "runtime/descriptors.h" -#include "util/runtime-profile.h" namespace impala { @@ -131,18 +130,20 @@ class HdfsTableSink : public DataSink { HdfsTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); + virtual std::string GetName() { return "HdfsTableSink"; } + /// Prepares output_exprs and partition_key_exprs, and connects to HDFS. - virtual Status Prepare(RuntimeState* state); + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); /// Opens output_exprs and partition_key_exprs, prepares the single output partition for /// static inserts, and populates partition_descriptor_map_. virtual Status Open(RuntimeState* state); /// Append all rows in batch to the temporary Hdfs files corresponding to partitions. - virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos); + virtual Status Send(RuntimeState* state, RowBatch* batch); - /// Currently a no-op function. - /// TODO: Move calls to functions that can fail in Close() to FlushFinal() + /// Finalize any open files. + /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to FlushFinal() virtual Status FlushFinal(RuntimeState* state); /// Move temporary Hdfs files to final locations. @@ -152,9 +153,7 @@ class HdfsTableSink : public DataSink { int skip_header_line_count() const { return skip_header_line_count_; } - virtual RuntimeProfile* profile() { return runtime_profile_; } const HdfsTableDescriptor& TableDesc() { return *table_desc_; } - MemTracker* mem_tracker() { return mem_tracker_.get(); } RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; } RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; } @@ -195,7 +194,7 @@ class HdfsTableSink : public DataSink { /// the partition_keys_to_output_partitions_. /// no_more_rows indicates that no more rows will be added to the partition. Status GetOutputPartition(RuntimeState* state, const std::string& key, - PartitionPair** partition_pair, bool no_more_rows); + PartitionPair** partition_pair, bool no_more_rows); /// Initialise and prepare select and partition key expressions Status PrepareExprs(RuntimeState* state); @@ -234,9 +233,6 @@ class HdfsTableSink : public DataSink { /// Current row from the current RowBatch to output TupleRow* current_row_; - /// Row descriptor of row batches passed in Send(). Set in c'tor. - const RowDescriptor& row_desc_; - /// Table id resolved in Prepare() to set tuple_desc_; TableId table_id_; @@ -286,10 +282,6 @@ class HdfsTableSink : public DataSink { PartitionDescriptorMap; PartitionDescriptorMap partition_descriptor_map_; - boost::scoped_ptr<MemTracker> mem_tracker_; - - /// Allocated from runtime state's pool. - RuntimeProfile* runtime_profile_; RuntimeProfile::Counter* partitions_created_counter_; RuntimeProfile::Counter* files_created_counter_; RuntimeProfile::Counter* rows_inserted_counter_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/kudu-table-sink-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink-test.cc b/be/src/exec/kudu-table-sink-test.cc index ef6a9c5..96aec55 100644 --- a/be/src/exec/kudu-table-sink-test.cc +++ b/be/src/exec/kudu-table-sink-test.cc @@ -224,14 +224,15 @@ class KuduTableSinkTest : public testing::Test { vector<TExpr> exprs; CreateTExpr(schema_cols, &exprs); KuduTableSink sink(*row_desc_, exprs, data_sink_); - ASSERT_OK(sink.Prepare(&runtime_state_)); + ASSERT_OK(sink.Prepare(&runtime_state_, &mem_tracker_)); ASSERT_OK(sink.Open(&runtime_state_)); vector<RowBatch*> row_batches; row_batches.push_back(CreateRowBatch(0, kNumRowsPerBatch, factor, val, skip_val)); - ASSERT_OK(sink.Send(&runtime_state_, row_batches.front(), false)); + ASSERT_OK(sink.Send(&runtime_state_, row_batches.front())); row_batches.push_back(CreateRowBatch(kNumRowsPerBatch, kNumRowsPerBatch, factor, val, skip_val)); - ASSERT_OK(sink.Send(&runtime_state_,row_batches.back(), true)); + ASSERT_OK(sink.Send(&runtime_state_,row_batches.back())); + ASSERT_OK(sink.FlushFinal(&runtime_state_)); STLDeleteElements(&row_batches); sink.Close(&runtime_state_); Verify(num_columns, 2 * kNumRowsPerBatch, factor, val, skip_val); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index d42cc7e..79a6a83 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -43,8 +43,6 @@ using kudu::client::KuduInsert; using kudu::client::KuduUpdate; using kudu::client::KuduError; -static const char* KUDU_SINK_NAME = "KuduTableSink"; - namespace impala { const static string& ROOT_PARTITION_KEY = @@ -53,8 +51,8 @@ const static string& ROOT_PARTITION_KEY = KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const vector<TExpr>& select_list_texprs, const TDataSink& tsink) - : table_id_(tsink.table_sink.target_table_id), - row_desc_(row_desc), + : DataSink(row_desc), + table_id_(tsink.table_sink.target_table_id), select_list_texprs_(select_list_texprs), sink_action_(tsink.table_sink.action), kudu_table_sink_(tsink.table_sink.kudu_table_sink), @@ -76,11 +74,9 @@ Status KuduTableSink::PrepareExprs(RuntimeState* state) { return Status::OK(); } -Status KuduTableSink::Prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::Prepare(state)); - runtime_profile_ = state->obj_pool()->Add( - new RuntimeProfile(state->obj_pool(), KUDU_SINK_NAME)); - SCOPED_TIMER(runtime_profile_->total_time_counter()); +Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker)); + SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR(PrepareExprs(state)); // Get the kudu table descriptor. @@ -101,17 +97,14 @@ Status KuduTableSink::Prepare(RuntimeState* state) { state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); // Add counters - kudu_flush_counter_ = - ADD_COUNTER(runtime_profile_, "TotalKuduFlushOperations", TUnit::UNIT); - kudu_error_counter_ = - ADD_COUNTER(runtime_profile_, "TotalKuduFlushErrors", TUnit::UNIT); - kudu_flush_timer_ = ADD_TIMER(runtime_profile_, "KuduFlushTimer"); - rows_written_ = - ADD_COUNTER(runtime_profile_, "RowsWritten", TUnit::UNIT); - rows_written_rate_ = runtime_profile_->AddDerivedCounter( + kudu_flush_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushOperations", TUnit::UNIT); + kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT); + kudu_flush_timer_ = ADD_TIMER(profile(), "KuduFlushTimer"); + rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT); + rows_written_rate_ = profile()->AddDerivedCounter( "RowsWrittenRate", TUnit::UNIT_PER_SECOND, bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_written_, - runtime_profile_->total_time_counter())); + profile()->total_time_counter())); return Status::OK(); } @@ -147,8 +140,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() { } } -Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); +Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(profile()->total_time_counter()); ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); @@ -294,8 +287,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) { void KuduTableSink::Close(RuntimeState* state) { if (closed_) return; - SCOPED_TIMER(runtime_profile_->total_time_counter()); + SCOPED_TIMER(profile()->total_time_counter()); Expr::Close(output_expr_ctxs_, state); + DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index 5373624..2eeb721 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -46,16 +46,18 @@ class KuduTableSink : public DataSink { KuduTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); + virtual std::string GetName() { return "KuduTableSink"; } + /// Prepares the expressions to be applied and creates a KuduSchema based on the /// expressions and KuduTableDescriptor. - virtual Status Prepare(RuntimeState* state); + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); /// Connects to Kudu and creates the KuduSession to be used for the writes. virtual Status Open(RuntimeState* state); /// Transforms 'batch' into Kudu writes and sends them to Kudu. /// The KuduSession is flushed on each row batch. - virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos); + virtual Status Send(RuntimeState* state, RowBatch* batch); /// Does nothing. We currently flush on each Send() call. virtual Status FlushFinal(RuntimeState* state); @@ -63,8 +65,6 @@ class KuduTableSink : public DataSink { /// Closes the KuduSession and the expressions. virtual void Close(RuntimeState* state); - virtual RuntimeProfile* profile() { return runtime_profile_; } - private: /// Turn thrift TExpr into Expr and prepare them to run Status PrepareExprs(RuntimeState* state); @@ -82,8 +82,6 @@ class KuduTableSink : public DataSink { /// Used to get the KuduTableDescriptor from the RuntimeState TableId table_id_; - /// Row descriptor of row batches passed in Send(). Set in c'tor. - const RowDescriptor& row_desc_; /// The descriptor of the KuduTable being written to. Set on Prepare(). const KuduTableDescriptor* table_desc_; @@ -98,9 +96,6 @@ class KuduTableSink : public DataSink { std::tr1::shared_ptr<kudu::client::KuduTable> table_; std::tr1::shared_ptr<kudu::client::KuduSession> session_; - /// Allocated from runtime state's pool. - RuntimeProfile* runtime_profile_; - /// Used to specify the type of write operation (INSERT/UPDATE/DELETE). TSinkAction::type sink_action_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/nested-loop-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc new file mode 100644 index 0000000..4315b93 --- /dev/null +++ b/be/src/exec/nested-loop-join-builder.cc @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/nested-loop-join-builder.h" + +#include <utility> + +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" + +#include "common/names.h" + +using namespace impala; + +NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state, + MemTracker* mem_tracker) + : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size(), + mem_tracker) {} + +Status NljBuilder::Prepare(RuntimeState* state, + MemTracker* mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker)); + return Status::OK(); +} + +Status NljBuilder::Open(RuntimeState* state) { + return Status::OK(); +} + +Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) { + // Swap the contents of the batch into a batch owned by the builder. + RowBatch* build_batch = GetNextEmptyBatch(); + build_batch->AcquireState(batch); + + AddBuildBatch(build_batch); + if (build_batch->need_to_return()) { + // This batch and earlier batches may refer to resources passed from the child + // that aren't owned by the row batch itself. Deep copying ensures that the row + // batches are backed by memory owned by this node that is safe to hold on to. + RETURN_IF_ERROR(DeepCopyBuildBatches(state)); + } + return Status::OK(); +} + +Status NljBuilder::FlushFinal(RuntimeState* state) { + if (copied_build_batches_.total_num_rows() > 0) { + // To simplify things, we only want to process one list, so we need to copy + // the remaining input batches. + RETURN_IF_ERROR(DeepCopyBuildBatches(state)); + } + + DCHECK(copied_build_batches_.total_num_rows() == 0 || + input_build_batches_.total_num_rows() == 0); + return Status::OK(); +} + +void NljBuilder::Reset() { + build_batch_cache_.Reset(); + input_build_batches_.Reset(); + copied_build_batches_.Reset(); +} + +void NljBuilder::Close(RuntimeState* state) { + if (closed_) return; + build_batch_cache_.Clear(); + input_build_batches_.Reset(); + copied_build_batches_.Reset(); + DataSink::Close(state); + closed_ = true; +} + +Status NljBuilder::DeepCopyBuildBatches(RuntimeState* state) { + for (RowBatchList::BatchIterator it = input_build_batches_.BatchesBegin(); + it != input_build_batches_.BatchesEnd(); ++it) { + RowBatch* input_batch = *it; + // TODO: it would be more efficient to do the deep copy within the same batch, rather + // than to a new batch. + RowBatch* copied_batch = build_batch_cache_.GetNextBatch(); + input_batch->DeepCopyTo(copied_batch); + copied_build_batches_.AddRowBatch(copied_batch); + // Reset input batches as we go to free up memory if possible. + input_batch->Reset(); + + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->CheckQueryState()); + } + input_build_batches_.Reset(); + return Status::OK(); +} + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/nested-loop-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h new file mode 100644 index 0000000..a64315a --- /dev/null +++ b/be/src/exec/nested-loop-join-builder.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_NESTED_LOOP_JOIN_BUILDER_H +#define IMPALA_EXEC_NESTED_LOOP_JOIN_BUILDER_H + +#include "exec/blocking-join-node.h" +#include "exec/data-sink.h" +#include "exec/row-batch-cache.h" +#include "exec/row-batch-list.h" +#include "runtime/descriptors.h" + +namespace impala { + +/// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join. +/// Implements the DataSink interface but also exposes some methods for direct use by +/// NestedLoopJoinNode. +/// +/// The builder will operate in one of two modes depending on the memory ownership of +/// row batches pulled from the child node on the build side. If the row batches own all +/// tuple memory, the non-copying mode is used and row batches are simply accumulated in +/// the builder. If the batches reference tuple data they do not own, the copying mode +/// is used and all data is deep copied into memory owned by the builder. +class NljBuilder : public DataSink { + public: + NljBuilder(const RowDescriptor& row_desc, RuntimeState* state, MemTracker* mem_tracker); + + /// Implementations of DataSink methods. + virtual std::string GetName() { return "Nested Loop Join Build"; } + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); + virtual Status Open(RuntimeState* state); + virtual Status Send(RuntimeState* state, RowBatch* batch); + virtual Status FlushFinal(RuntimeState* state); + virtual void Close(RuntimeState* state); + + /// Reset the builder to the same state as it was in after calling Open(). + void Reset(); + + /// Returns the next build batch that should be filled and passed to AddBuildBatch(). + /// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for efficiency. + inline RowBatch* GetNextEmptyBatch() { return build_batch_cache_.GetNextBatch(); } + + /// Add a batch to the build side. Does not copy the data, so either resources must + /// be owned by the batch (or a later batch), or DeepCopyBuildBatches() must be called + /// before the referenced resources are released. + /// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for efficiency. + inline void AddBuildBatch(RowBatch* batch) { input_build_batches_.AddRowBatch(batch); } + + /// Return a pointer to the final list of build batches. + /// Only valid to call after FlushFinal() has been called. + RowBatchList* GetFinalBuildBatches() { + if (copied_build_batches_.total_num_rows() > 0) { + DCHECK_EQ(input_build_batches_.total_num_rows(), 0); + return &copied_build_batches_; + } else { + return &input_build_batches_; + } + } + + inline RowBatchList* input_build_batches() { return &input_build_batches_; } + inline RowBatchList* copied_build_batches() { return &copied_build_batches_; } + + private: + /// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'. + /// Resets all the source batches and clears 'input_build_batches_'. + /// If the memory limit is exceeded while copying batches, returns a + /// MEM_LIMIT_EXCEEDED status, sets the query status to MEM_LIMIT_EXCEEDED and leave + /// the row batches to be cleaned up later when the node is closed. + Status DeepCopyBuildBatches(RuntimeState* state); + + /// Creates and caches RowBatches for the build side. The RowBatch objects are owned + /// by this cache. The cache helps to avoid creating new RowBatches after a Reset(). + RowBatchCache build_batch_cache_; + + /// List of the input build batches we obtained from the child, which may reference + /// memory that is owned by the child node. + RowBatchList input_build_batches_; + + /// List of build batches that were deep copied from 'input_build_batches_' and are + /// backed by each row batch's pool. + RowBatchList copied_build_batches_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/nested-loop-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index b94cef1..87fbf62 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -20,7 +20,6 @@ #include <sstream> #include <gutil/strings/substitute.h> -#include "exec/row-batch-cache.h" #include "exprs/expr.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" @@ -51,9 +50,8 @@ NestedLoopJoinNode::~NestedLoopJoinNode() { Status NestedLoopJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state)); DCHECK(tnode.__isset.nested_loop_join_node); - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, tnode.nested_loop_join_node.join_conjuncts, - &join_conjunct_ctxs_)); + RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, tnode.nested_loop_join_node.join_conjuncts, + &join_conjunct_ctxs_)); DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN || join_conjunct_ctxs_.size() == 0) << "Join conjuncts in a cross join"; @@ -61,8 +59,36 @@ Status NestedLoopJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { } Status NestedLoopJoinNode::Open(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(join_conjunct_ctxs_, state)); + SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(BlockingJoinNode::Open(state)); + RETURN_IF_ERROR(Expr::Open(join_conjunct_ctxs_, state)); + + // Check for errors and free local allocations before opening children. + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + if (child(1)->type() == TPlanNodeType::type::SINGULAR_ROW_SRC_NODE) { + DCHECK(IsInSubplan()); + // When inside a subplan, open the first child before doing the build such that + // UnnestNodes on the probe side are opened and project their unnested collection + // slots. Otherwise, the build might unnecessarily deep-copy those collection slots, + // and this node would return them in GetNext(). + // TODO: Remove this special-case behavior for subplans once we have proper + // projection. See UnnestNode for details on the current projection implementation. + RETURN_IF_ERROR(child(0)->Open(state)); + RETURN_IF_ERROR(ConstructSingularBuildSide(state)); + DCHECK_EQ(builder_->copied_build_batches()->total_num_rows(), 0); + build_batches_ = builder_->input_build_batches(); + } else { + RETURN_IF_ERROR( + BlockingJoinNode::ConstructBuildAndOpenProbe(state, builder_.get())); + build_batches_ = builder_->GetFinalBuildBatches(); + if (matching_build_rows_ != NULL) { + RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows())); + } + } + RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); + ResetForProbe(); return Status::OK(); } @@ -75,8 +101,10 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) { RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); RETURN_IF_ERROR(Expr::Prepare( join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker())); - build_batch_cache_.reset(new RowBatchCache( - child(1)->row_desc(), state->batch_size(), mem_tracker())); + + builder_.reset( + new NljBuilder(child(1)->row_desc(), state, mem_tracker())); + RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker())); // For some join modes we need to record the build rows with matches in a bitmap. if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::RIGHT_SEMI_JOIN || @@ -99,135 +127,76 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) { } Status NestedLoopJoinNode::Reset(RuntimeState* state) { - raw_build_batches_.Reset(); - copied_build_batches_.Reset(); + builder_->Reset(); build_batches_ = NULL; matched_probe_ = false; current_probe_row_ = NULL; probe_batch_pos_ = 0; process_unmatched_build_rows_ = false; - build_batch_cache_->Reset(); return BlockingJoinNode::Reset(state); } void NestedLoopJoinNode::Close(RuntimeState* state) { if (is_closed()) return; - raw_build_batches_.Reset(); - copied_build_batches_.Reset(); - build_batches_ = NULL; Expr::Close(join_conjunct_ctxs_, state); + if (builder_ != NULL) { + builder_->Close(state); + builder_.reset(); + } + build_batches_ = NULL; if (matching_build_rows_ != NULL) { mem_tracker()->Release(matching_build_rows_->MemUsage()); matching_build_rows_.reset(); } - build_batch_cache_.reset(); BlockingJoinNode::Close(state); } -Status NestedLoopJoinNode::ConstructBuildSide(RuntimeState* state) { - if (child(1)->type() == TPlanNodeType::type::SINGULAR_ROW_SRC_NODE) { - // Optimized path for a common subplan shape with a singular row src - // node on the build side. This specialized build construction is - // faster mostly because it avoids the expensive timers below. - DCHECK(IsInSubplan()); - RowBatch* batch = build_batch_cache_->GetNextBatch(); - bool eos; - RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos)); - DCHECK_EQ(batch->num_rows(), 1); - DCHECK(eos); - DCHECK(!batch->need_to_return()); - raw_build_batches_.AddRowBatch(batch); - build_batches_ = &raw_build_batches_; - if (matching_build_rows_ != NULL) { - DCHECK_EQ(matching_build_rows_->num_bits(), 1); - matching_build_rows_->SetAllBits(false); - } - return Status::OK(); - } - - { - SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_); - RETURN_IF_ERROR(child(1)->Open(state)); +Status NestedLoopJoinNode::ConstructSingularBuildSide(RuntimeState* state) { + // Optimized path for a common subplan shape with a singular row src node on the build + // side that avoids expensive timers, virtual function calls, and other overhead. + DCHECK_EQ(child(1)->type(), TPlanNodeType::type::SINGULAR_ROW_SRC_NODE); + DCHECK(IsInSubplan()); + RowBatch* batch = builder_->GetNextEmptyBatch(); + bool eos; + RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos)); + DCHECK_EQ(batch->num_rows(), 1); + DCHECK(eos); + DCHECK(!batch->need_to_return()); + builder_->AddBuildBatch(batch); + if (matching_build_rows_ != NULL) { + DCHECK_EQ(matching_build_rows_->num_bits(), 1); + matching_build_rows_->SetAllBits(false); } - bool eos = false; - do { - RowBatch* batch = build_batch_cache_->GetNextBatch(); - { - SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_); - RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos)); - } - SCOPED_TIMER(build_timer_); - raw_build_batches_.AddRowBatch(batch); - if (batch->need_to_return()) { - // This batch and earlier batches may refer to resources passed from the child - // that aren't owned by the row batch itself. Deep copying ensures that the row - // batches are backed by memory owned by this node that is safe to hold on to. - RETURN_IF_ERROR(DeepCopyBuildBatches(state)); - } + return Status::OK(); +} - VLOG_ROW << Substitute("raw_build_batches_: BuildList($0)", - raw_build_batches_.DebugString(child(1)->row_desc())); - VLOG_ROW << Substitute("copied_build_batches_: BuildList($0)", - copied_build_batches_.DebugString(child(1)->row_desc())); - COUNTER_SET(build_row_counter_, raw_build_batches_.total_num_rows() + - copied_build_batches_.total_num_rows()); - } while (!eos); - - SCOPED_TIMER(build_timer_); - if (copied_build_batches_.total_num_rows() > 0) { - // To simplify things, we only want to process one list, so we need to copy - // the remaining raw batches. - DeepCopyBuildBatches(state); - build_batches_ = &copied_build_batches_; +Status NestedLoopJoinNode::ResetMatchingBuildRows(RuntimeState* state, int64_t num_bits) { + // Reuse existing bitmap, expanding it if needed. + if (matching_build_rows_->num_bits() >= num_bits) { + matching_build_rows_->SetAllBits(false); } else { - // We didn't need to copy anything so we can just use raw batches. - build_batches_ = &raw_build_batches_; - } - - if (matching_build_rows_ != NULL) { - int64_t num_bits = build_batches_->total_num_rows(); - // Reuse existing bitmap, expanding it if needed. - if (matching_build_rows_->num_bits() >= num_bits) { - matching_build_rows_->SetAllBits(false); - } else { - // Account for the additional memory used by the bitmap. - int64_t bitmap_size_increase = - Bitmap::MemUsage(num_bits) - matching_build_rows_->MemUsage(); - if (!mem_tracker()->TryConsume(bitmap_size_increase)) { - return mem_tracker()->MemLimitExceeded(state, - "Could not expand bitmap in nested loop join", bitmap_size_increase); - } - matching_build_rows_->Reset(num_bits); + // Account for the additional memory used by the bitmap. + int64_t bitmap_size_increase = + Bitmap::MemUsage(num_bits) - matching_build_rows_->MemUsage(); + if (!mem_tracker()->TryConsume(bitmap_size_increase)) { + return mem_tracker()->MemLimitExceeded(state, + "Could not expand bitmap in nested loop join", bitmap_size_increase); } + matching_build_rows_->Reset(num_bits); } return Status::OK(); } -Status NestedLoopJoinNode::DeepCopyBuildBatches(RuntimeState* state) { - for (RowBatchList::BatchIterator it = raw_build_batches_.BatchesBegin(); - it != raw_build_batches_.BatchesEnd(); ++it) { - RowBatch* raw_batch = *it; - // TODO: it would be more efficient to do the deep copy within the same batch, rather - // than to a new batch. - RowBatch* copied_batch = build_batch_cache_->GetNextBatch(); - raw_batch->DeepCopyTo(copied_batch); - copied_build_batches_.AddRowBatch(copied_batch); - // Reset raw batches as we go to free up memory if possible. - raw_batch->Reset(); - - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); - } - raw_build_batches_.Reset(); +Status NestedLoopJoinNode::ProcessBuildInput(RuntimeState* state) { + DCHECK(false) << "Should not be called, NLJ uses the BuildSink API"; return Status::OK(); } -Status NestedLoopJoinNode::InitGetNext(TupleRow* first_left_row) { +void NestedLoopJoinNode::ResetForProbe() { DCHECK(build_batches_ != NULL); build_row_iterator_ = build_batches_->Iterator(); current_build_row_idx_ = 0; matched_probe_ = false; - return Status::OK(); } Status NestedLoopJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/nested-loop-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h index 6f3cdb7..1d3d2da 100644 --- a/be/src/exec/nested-loop-join-node.h +++ b/be/src/exec/nested-loop-join-node.h @@ -23,9 +23,7 @@ #include "exec/exec-node.h" #include "exec/blocking-join-node.h" -#include "exec/row-batch-list.h" -#include "runtime/descriptors.h" // for TupleDescriptor -#include "runtime/mem-pool.h" +#include "exec/nested-loop-join-builder.h" #include "gen-cpp/PlanNodes_types.h" @@ -34,16 +32,10 @@ namespace impala { class Bitmap; class RowBatch; class TupleRow; -class RowBatchCache; -/// Operator to perform nested-loop join. +/// Operator to perform nested-loop join. The build side is implemented by NljBuilder. /// This operator does not support spill to disk. Supports all join modes except /// null-aware left anti-join. -/// This operator will operate in one of two modes depending on the memory ownership of -/// row batches pulled from the child node on the build side. If the row batches own all -/// tuple memory, the non-copying mode is used and row batches are simply accumulated in -/// this node. If the batches reference tuple data they do not own, the copying mode is -/// used and all data is deep copied into memory owned by this node. /// /// TODO: Add support for null-aware left-anti join. class NestedLoopJoinNode : public BlockingJoinNode { @@ -60,27 +52,17 @@ class NestedLoopJoinNode : public BlockingJoinNode { virtual void Close(RuntimeState* state); protected: - virtual Status InitGetNext(TupleRow* first_left_row); - virtual Status ConstructBuildSide(RuntimeState* state); + virtual Status ProcessBuildInput(RuntimeState* state); private: ///////////////////////////////////////// /// BEGIN: Members that must be Reset() - /// Creates and caches RowBatches for the build side. The RowBatch objects are owned by - /// this cache, but the tuple data is always transferred to the output batch in - /// GetNext() when eos_ is set to true. The cache helps to avoid creating new - /// RowBatches after a Reset(). - boost::scoped_ptr<RowBatchCache> build_batch_cache_; + /// The build side rows of the join. + boost::scoped_ptr<NljBuilder> builder_; - /// List of build batches from child. - RowBatchList raw_build_batches_; - - /// List of build batches that were deep copied and are backed by each row batch's pool. - RowBatchList copied_build_batches_; - - /// Pointer to either raw_build_batches_ or copied_build_batches_ that contains the - /// batches to use during the probe phase. + /// Pointer to the RowBatchList (owned by 'builder_') that contains the batches to + /// use during the probe phase. RowBatchList* build_batches_; RowBatchList::TupleRowIterator build_row_iterator_; @@ -103,6 +85,16 @@ class NestedLoopJoinNode : public BlockingJoinNode { /// Join conjuncts std::vector<ExprContext*> join_conjunct_ctxs_; + /// Optimized build for the case where the right child is a SingularRowSrcNode. + Status ConstructSingularBuildSide(RuntimeState* state); + + /// Expand 'matching_build_rows_' to hold at least 'num_bits' bits and set + /// all its bits to zero. + Status ResetMatchingBuildRows(RuntimeState* state, int64_t num_bits); + + /// Prepares for probing the first batch. + void ResetForProbe(); + Status GetNextInnerJoin(RuntimeState* state, RowBatch* output_batch); Status GetNextLeftOuterJoin(RuntimeState* state, RowBatch* output_batch); Status GetNextRightOuterJoin(RuntimeState* state, RowBatch* output_batch); @@ -149,12 +141,6 @@ class NestedLoopJoinNode : public BlockingJoinNode { return current_probe_row_ != NULL; } - /// Deep copy all build batches in raw_build_batches_ to copied_build_batches_. - /// Resets all the source batches and clears raw_build_batches_. - /// If the memory limit is exceeded while copying batches, returns a MEM_LIMIT_EXCEEDED - /// status, sets the query status to MEM_LIMIT_EXCEEDED and leave the row batches to - /// be cleaned up later when the node is closed. - Status DeepCopyBuildBatches(RuntimeState* state); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index e8d2964..a2b5001 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -145,7 +145,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { AddExprCtxToFree(ctx.expr); } - // Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free + // Although ProcessBuildInput() may be run in a separate thread, it is safe to free // local allocations in QueryMaintenance() since the build thread is not run // concurrently with other expr evaluation in this join node. // Probe side expr is not included in QueryMaintenance(). We cache the probe expression @@ -242,22 +242,39 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { } Status PartitionedHashJoinNode::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(BlockingJoinNode::Open(state)); + RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); + RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state)); + RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state)); + for (const FilterContext& filter: filters_) RETURN_IF_ERROR(filter.expr->Open(state)); + AllocateRuntimeFilters(state); + if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_aware_partition_ = partition_pool_->Add(new Partition(state, this, 0)); RETURN_IF_ERROR( null_aware_partition_->build_rows()->Init(id(), runtime_profile(), false)); RETURN_IF_ERROR( null_aware_partition_->probe_rows()->Init(id(), runtime_profile(), false)); - null_probe_rows_ = new BufferedTupleStream( state, child(0)->row_desc(), state->block_mgr(), block_mgr_client_, true /* use_initial_small_buffers */, false /* read_write */ ); RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false)); } - RETURN_IF_ERROR(BlockingJoinNode::Open(state)); - DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); + // Check for errors and free local allocations before opening children. + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + // The prepare functions of probe expressions may have done local allocations implicitly + // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to + // be freed now as they don't get freed again till probing. Other exprs' local allocations + // are freed in ExecNode::FreeLocalAllocations() in ProcessBuildInput(). + ExprContext::FreeLocalAllocations(probe_expr_ctxs_); + RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL)); + RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); + ResetForProbe(); + DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); return Status::OK(); } @@ -509,7 +526,7 @@ not_built: return Status::OK(); } -bool PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) { +void PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) { DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0) << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN"; DCHECK(ht_ctx_.get() != NULL); @@ -517,7 +534,6 @@ bool PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) { filters_[i].local_bloom_filter = state->filter_bank()->AllocateScratchBloomFilter(filters_[i].filter->id()); } - return true; } void PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state, @@ -617,21 +633,7 @@ Status PartitionedHashJoinNode::SpillPartition(Partition** spilled_partition) { return Status::OK(); } -Status PartitionedHashJoinNode::ConstructBuildSide(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state)); - for (const FilterContext& filter: filters_) { - RETURN_IF_ERROR(filter.expr->Open(state)); - } - AllocateRuntimeFilters(state); - - // The prepare functions of probe expressions may have done local allocations implicitly - // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to - // be freed now as they don't get freed again till probing. Other exprs' local allocations - // are freed in ExecNode::FreeLocalAllocations() in ProcessBuildInput(). - ExprContext::FreeLocalAllocations(probe_expr_ctxs_); - +Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) { // Do a full scan of child(1) and partition the rows. { SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_); @@ -680,7 +682,7 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level COUNTER_ADD(partitions_created_, PARTITION_FANOUT); COUNTER_SET(max_partition_level_, level); - RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker()); + DCHECK_EQ(build_batch_->num_rows(), 0); bool eos = false; int64_t total_build_rows = 0; while (!eos) { @@ -693,30 +695,30 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level // If we are still consuming batches from the build side. { SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_); - RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos)); + RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos)); } - COUNTER_ADD(build_row_counter_, build_batch.num_rows()); + COUNTER_ADD(build_row_counter_, build_batch_->num_rows()); } else { // If we are consuming batches that have already been partitioned. - RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(&build_batch, &eos)); + RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(build_batch_.get(), &eos)); } - total_build_rows += build_batch.num_rows(); + total_build_rows += build_batch_->num_rows(); SCOPED_TIMER(partition_build_timer_); if (process_build_batch_fn_ == NULL) { bool build_filters = ht_ctx_->level() == 0; - RETURN_IF_ERROR(ProcessBuildBatch(&build_batch, build_filters)); + RETURN_IF_ERROR(ProcessBuildBatch(build_batch_.get(), build_filters)); } else { DCHECK(process_build_batch_fn_level0_ != NULL); if (ht_ctx_->level() == 0) { RETURN_IF_ERROR( - process_build_batch_fn_level0_(this, &build_batch, true)); + process_build_batch_fn_level0_(this, build_batch_.get(), true)); } else { - RETURN_IF_ERROR(process_build_batch_fn_(this, &build_batch, false)); + RETURN_IF_ERROR(process_build_batch_fn_(this, build_batch_.get(), false)); } } - build_batch.Reset(); - DCHECK(!build_batch.AtCapacity()); + build_batch_->Reset(); + DCHECK(!build_batch_->AtCapacity()); } if (ht_ctx_->level() == 0) PublishRuntimeFilters(state, total_build_rows); @@ -747,12 +749,6 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level return Status::OK(); } -Status PartitionedHashJoinNode::InitGetNext(TupleRow* first_probe_row) { - // TODO: Move this reset to blocking-join. Not yet though because of hash-join. - ResetForProbe(); - return Status::OK(); -} - Status PartitionedHashJoinNode::NextProbeRowBatch( RuntimeState* state, RowBatch* out_batch) { DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
