Repository: incubator-impala Updated Branches: refs/heads/master ffa7829b7 -> 3e2411f30
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index c493c1c..36dfce6 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -75,8 +75,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { protected: virtual void AddToDebugString(int indentation_level, std::stringstream* out) const; - virtual Status InitGetNext(TupleRow* first_probe_row); - virtual Status ConstructBuildSide(RuntimeState* state); + virtual Status ProcessBuildInput(RuntimeState* state); private: class Partition; @@ -354,8 +353,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// For each filter in filters_, allocate a bloom_filter from the fragment-local /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build - /// phase. Returns false if filter construction is disabled. - bool AllocateRuntimeFilters(RuntimeState* state); + /// phase. + void AllocateRuntimeFilters(RuntimeState* state); /// Publish the runtime filters to the fragment-local /// RuntimeFilterBank. 'total_build_rows' is used to determine whether the computed http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/row-batch-cache.h ---------------------------------------------------------------------- diff --git a/be/src/exec/row-batch-cache.h b/be/src/exec/row-batch-cache.h index 53345f9..81ae25c 100644 --- a/be/src/exec/row-batch-cache.h +++ b/be/src/exec/row-batch-cache.h @@ -19,6 +19,7 @@ #ifndef IMPALA_EXEC_ROW_BATCH_CACHE_H #define IMPALA_EXEC_ROW_BATCH_CACHE_H +#include <memory> #include <vector> #include "runtime/row-batch.h" @@ -40,16 +41,21 @@ class RowBatchCache { next_row_batch_idx_(0) { } + ~RowBatchCache() { + DCHECK_EQ(0, row_batches_.size()); + } + /// Returns the next batch from the cache. Expands the cache if necessary. RowBatch* GetNextBatch() { if (next_row_batch_idx_ >= row_batches_.size()) { // Expand the cache with a new row batch. - row_batches_.push_back(new RowBatch(row_desc_, batch_size_, mem_tracker_)); + row_batches_.push_back( + std::make_unique<RowBatch>(row_desc_, batch_size_, mem_tracker_)); } else { // Reset batch from the cache before returning it. row_batches_[next_row_batch_idx_]->Reset(); } - return row_batches_[next_row_batch_idx_++]; + return row_batches_[next_row_batch_idx_++].get(); } /// Resets the cache such that subsequent calls to GetNextBatch() return batches from @@ -57,10 +63,16 @@ class RowBatchCache { /// are invalid. void Reset() { next_row_batch_idx_ = 0; } - ~RowBatchCache() { - std::vector<RowBatch*>::iterator it; - for (it = row_batches_.begin(); it != row_batches_.end(); ++it) delete *it; - row_batches_.clear(); + /// Delete and free resources associated with all cached batch objects. Must be called + /// before the RowBatchCache is destroyed. + void Clear() { + row_batches_.clear(); // unique_ptr automatically calls all destructors. + } + + /// Return the last batch returned from GetNextBatch() since Reset(), or NULL if + /// GetNextBatch() has not yet been called. + RowBatch* GetLastBatchReturned() { + return next_row_batch_idx_ == 0 ? NULL : row_batches_[next_row_batch_idx_ - 1].get(); } private: @@ -70,7 +82,7 @@ class RowBatchCache { MemTracker* mem_tracker_; // not owned /// List of cached row-batch objects. The row-batch objects are owned by this cache. - std::vector<RowBatch*> row_batches_; + std::vector<std::unique_ptr<RowBatch>> row_batches_; /// Index of next row batch to return in GetRowBatch(). int next_row_batch_idx_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index b210a75..33dc7a0 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -23,6 +23,7 @@ #include "common/logging.h" #include "exprs/expr.h" #include "exprs/expr-context.h" +#include "gutil/strings/substitute.h" #include "runtime/descriptors.h" #include "runtime/tuple-row.h" #include "runtime/row-batch.h" @@ -47,6 +48,7 @@ using boost::condition_variable; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; +using strings::Substitute; namespace impala { @@ -158,7 +160,7 @@ Status DataStreamSender::Channel::Init(RuntimeState* state) { runtime_state_ = state; // TODO: figure out how to size batch_ int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1)); - batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker_.get())); + batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); return Status::OK(); } @@ -309,9 +311,9 @@ Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) { VLOG_RPC << "calling TransmitData(eos=true) to terminate channel."; rpc_status_ = DoTransmitDataRpc(&client, params, &res); if (!rpc_status_.ok()) { - stringstream msg; - msg << "TransmitData(eos=true) to " << address_ << " failed:\n" << rpc_status_.msg().msg(); - return Status(rpc_status_.code(), msg.str()); + return Status(rpc_status_.code(), + Substitute("TransmitData(eos=true) to $0 failed:\n $1", + TNetworkAddressToString(address_), rpc_status_.msg().msg())); } return Status(res.status); } @@ -327,14 +329,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size) - : sender_id_(sender_id), + : DataSink(row_desc), + sender_id_(sender_id), pool_(pool), - row_desc_(row_desc), current_channel_idx_(0), flushed_(false), closed_(false), current_thrift_batch_(&thrift_batch1_), - profile_(NULL), serialize_batch_timer_(NULL), thrift_transmit_timer_(NULL), bytes_sent_counter_(NULL), @@ -369,6 +370,10 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, } } +string DataStreamSender::GetName() { + return Substitute("DataStreamSender (dst_id=$0)", dest_node_id_); +} + DataStreamSender::~DataStreamSender() { // TODO: check that sender was either already closed() or there was an error // on some channel @@ -377,18 +382,12 @@ DataStreamSender::~DataStreamSender() { } } -Status DataStreamSender::Prepare(RuntimeState* state) { - DCHECK(state != NULL); +Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker)); state_ = state; - stringstream title; - title << "DataStreamSender (dst_id=" << dest_node_id_ << ")"; - profile_ = pool_->Add(new RuntimeProfile(pool_, title.str())); SCOPED_TIMER(profile_->total_time_counter()); - mem_tracker_.reset(new MemTracker(profile(), -1, -1, "DataStreamSender", - state->instance_mem_tracker())); - RETURN_IF_ERROR( - Expr::Prepare(partition_expr_ctxs_, state, row_desc_, mem_tracker_.get())); + RETURN_IF_ERROR(Expr::Prepare(partition_expr_ctxs_, state, row_desc_, mem_tracker)); bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); @@ -418,7 +417,7 @@ Status DataStreamSender::Open(RuntimeState* state) { return Expr::Open(partition_expr_ctxs_, state); } -Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch, bool eos) { +Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { DCHECK(!closed_); DCHECK(!flushed_); @@ -486,10 +485,7 @@ void DataStreamSender::Close(RuntimeState* state) { channels_[i]->Teardown(state); } Expr::Close(partition_expr_ctxs_, state); - if (mem_tracker_.get() != NULL) { - mem_tracker_->UnregisterFromParent(); - mem_tracker_.reset(); - } + DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-sender.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h index 4a7b74e..9b9e7d0 100644 --- a/be/src/runtime/data-stream-sender.h +++ b/be/src/runtime/data-stream-sender.h @@ -63,9 +63,11 @@ class DataStreamSender : public DataSink { int per_channel_buffer_size); virtual ~DataStreamSender(); + virtual std::string GetName(); + /// Must be called before other API calls, and before the codegen'd IR module is /// compiled (i.e. in an ExecNode's Prepare() function). - virtual Status Prepare(RuntimeState* state); + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker); /// Must be called before Send() or Close(), and after the codegen'd IR module is /// compiled (i.e. in an ExecNode's Open() function). @@ -81,7 +83,7 @@ class DataStreamSender : public DataSink { /// Blocks until all rows in batch are placed in their appropriate outgoing /// buffers (ie, blocks if there are still in-flight rpcs from the last /// Send() call). - virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos); + virtual Status Send(RuntimeState* state, RowBatch* batch); /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are /// illegal after calling Close(). @@ -96,8 +98,6 @@ class DataStreamSender : public DataSink { /// broadcast to multiple receivers, they are counted once per receiver. int64_t GetNumDataBytesSent() const; - virtual RuntimeProfile* profile() { return profile_; } - private: class Channel; @@ -105,7 +105,6 @@ class DataStreamSender : public DataSink { int sender_id_; RuntimeState* state_; ObjectPool* pool_; - const RowDescriptor& row_desc_; bool broadcast_; // if true, send all rows on all channels bool random_; // if true, round-robins row batches among channels int current_channel_idx_; // index of current channel to send to if random_ == true @@ -126,14 +125,12 @@ class DataStreamSender : public DataSink { std::vector<ExprContext*> partition_expr_ctxs_; // compute per-row partition values std::vector<Channel*> channels_; - RuntimeProfile* profile_; // Allocated from pool_ RuntimeProfile::Counter* serialize_batch_timer_; /// The concurrent wall time spent sending data over the network. RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_; RuntimeProfile::Counter* bytes_sent_counter_; RuntimeProfile::Counter* uncompressed_bytes_counter_; RuntimeProfile::Counter* total_sent_rows_counter_; - boost::scoped_ptr<MemTracker> mem_tracker_; /// Throughput per time spent in TransmitData RuntimeProfile::Counter* network_throughput_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index a0b292f..dae4724 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -489,7 +489,7 @@ class DataStreamTest : public testing::Test { const TDataStreamSink& sink = GetSink(partition_type); DataStreamSender sender( &obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size); - EXPECT_OK(sender.Prepare(&state)); + EXPECT_OK(sender.Prepare(&state, &tracker_)); EXPECT_OK(sender.Open(&state)); scoped_ptr<RowBatch> batch(CreateRowBatch()); SenderInfo& info = sender_info_[sender_num]; @@ -497,7 +497,7 @@ class DataStreamTest : public testing::Test { for (int i = 0; i < NUM_BATCHES; ++i) { GetNextBatch(batch.get(), &next_val); VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows(); - info.status = sender.Send(&state, batch.get(), false); + info.status = sender.Send(&state, batch.get()); if (!info.status.ok()) break; } VLOG_QUERY << "closing sender" << sender_num; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index a54db7a..35ac757 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -265,7 +265,9 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { obj_pool(), request.fragment_ctx.fragment.output_sink, request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx, row_desc(), &sink_)); - RETURN_IF_ERROR(sink_->Prepare(runtime_state())); + sink_mem_tracker_.reset(new MemTracker(-1, -1, sink_->GetName(), + runtime_state_->instance_mem_tracker(), true)); + RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get())); RuntimeProfile* sink_profile = sink_->profile(); if (sink_profile != NULL) { @@ -364,35 +366,24 @@ Status PlanFragmentExecutor::Open() { } Status PlanFragmentExecutor::OpenInternal() { - { - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(plan_->Open(runtime_state_.get())); - } + SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(plan_->Open(runtime_state_.get())); if (sink_.get() == NULL) return Status::OK(); - RETURN_IF_ERROR(sink_->Open(runtime_state_.get())); // If there is a sink, do all the work of driving it here, so that // when this returns the query has actually finished + RETURN_IF_ERROR(sink_->Open(runtime_state_.get())); while (!done_) { - RowBatch* batch; - RETURN_IF_ERROR(GetNextInternal(&batch)); - if (batch == NULL) break; - - if (VLOG_ROW_IS_ON) { - VLOG_ROW << "OpenInternal: #rows=" << batch->num_rows(); - for (int i = 0; i < batch->num_rows(); ++i) { - VLOG_ROW << PrintRow(batch->GetRow(i), row_desc()); - } - } - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(sink_->Send(runtime_state(), batch, done_)); + row_batch_->Reset(); + RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_)); + if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::OpenInternal()"); + COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); + RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get())); } // Flush the sink *before* stopping the report thread. Flush may need to add some // important information to the last report that gets sent. (e.g. table sinks record the // files they have written to in this method) - // - SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR(sink_->FlushFinal(runtime_state())); return Status::OK(); } @@ -473,47 +464,36 @@ void PlanFragmentExecutor::StopReportThread() { report_thread_->Join(); } -// TODO: why can't we just put the total_time_counter() at the -// beginning of Open() and GetNext(). This seems to really mess -// the timer here, presumably because the data stream sender is -// multithreaded and the timer we use gets confused. Status PlanFragmentExecutor::GetNext(RowBatch** batch) { - VLOG_FILE << "GetNext(): instance_id=" - << runtime_state_->fragment_instance_id(); - Status status = GetNextInternal(batch); + SCOPED_TIMER(profile()->total_time_counter()); + VLOG_FILE << "GetNext(): instance_id=" << runtime_state_->fragment_instance_id(); + + Status status = Status::OK(); + row_batch_->Reset(); + // Loop until we've got a non-empty batch, hit an error or exhausted the input. + while (!done_) { + status = plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_); + if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::GetNext()"); + if (!status.ok()) break; + if (row_batch_->num_rows() > 0) break; + row_batch_->Reset(); + } UpdateStatus(status); + if (done_) { VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_) << " instance_id=" << PrintId(runtime_state_->fragment_instance_id()); FragmentComplete(); - // GetNext() uses *batch = NULL to signal the end. - if (*batch != NULL && (*batch)->num_rows() == 0) *batch = NULL; + // Once all rows are returned, signal that we're done with an empty batch. + *batch = row_batch_->num_rows() == 0 ? NULL : row_batch_.get(); + return status; } + *batch = row_batch_.get(); + COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); return status; } -Status PlanFragmentExecutor::GetNextInternal(RowBatch** batch) { - if (done_) { - *batch = NULL; - return Status::OK(); - } - - while (!done_) { - row_batch_->Reset(); - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR( - plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_)); - *batch = row_batch_.get(); - if (row_batch_->num_rows() > 0) { - COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); - break; - } - } - - return Status::OK(); -} - void PlanFragmentExecutor::FragmentComplete() { // Check the atomic flag. If it is set, then a fragment complete report has already // been sent. @@ -596,6 +576,10 @@ void PlanFragmentExecutor::ReleaseThreadToken() { void PlanFragmentExecutor::Close() { if (closed_) return; row_batch_.reset(); + if (sink_mem_tracker_ != NULL) { + sink_mem_tracker_->UnregisterFromParent(); + sink_mem_tracker_.reset(); + } // Prepare may not have been called, which sets runtime_state_ if (runtime_state_.get() != NULL) { if (runtime_state_->query_resource_mgr() != NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index 0369b4c..f4355ea 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -194,6 +194,8 @@ class PlanFragmentExecutor { /// returned via GetNext's row batch /// Created in Prepare (if required), owned by this object. boost::scoped_ptr<DataSink> sink_; + boost::scoped_ptr<MemTracker> sink_mem_tracker_; + boost::scoped_ptr<RowBatch> row_batch_; boost::scoped_ptr<TRowBatch> thrift_batch_; @@ -280,10 +282,6 @@ class PlanFragmentExecutor { /// have been stopped. sink_ will be set to NULL after successful execution. Status OpenInternal(); - /// Executes GetNext() logic and returns resulting status. - /// sets done_ to true if the last row batch was returned. - Status GetNextInternal(RowBatch** batch); - /// Stops report thread, if one is running. Blocks until report thread terminates. /// Idempotent. void StopReportThread(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 22e1a70..e602293 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -26,6 +26,7 @@ #include "runtime/tuple-row.h" #include "util/compress.h" #include "util/decompress.h" +#include "util/debug-util.h" #include "util/fixed-size-hash-table.h" #include "gen-cpp/Results_types.h" @@ -454,4 +455,12 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, return Status::OK(); } +void RowBatch::VLogRows(const string& context) { + if (!VLOG_ROW_IS_ON) return; + VLOG_ROW << context << ": #rows=" << num_rows_; + for (int i = 0; i < num_rows_; ++i) { + VLOG_ROW << PrintRow(GetRow(i), row_desc_); + } +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index af1896e..f529c3d 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -299,6 +299,10 @@ class RowBatch { Status ResizeAndAllocateTupleBuffer(RuntimeState* state, int64_t* buffer_size, uint8_t** buffer); + /// Helper function to log the batch's rows if VLOG_ROW is enabled. 'context' is a + /// string to prepend to the log message. + void VLogRows(const std::string& context); + private: friend class RowBatchSerializeBaseline; friend class RowBatchSerializeBenchmark; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/util/stopwatch.h ---------------------------------------------------------------------- diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h index c42b523..0e73b6a 100644 --- a/be/src/util/stopwatch.h +++ b/be/src/util/stopwatch.h @@ -32,6 +32,10 @@ namespace impala { ScopedStopWatch<MonotonicStopWatch> \ MACRO_CONCAT(STOP_WATCH, __COUNTER__)(c) +#define CONDITIONAL_SCOPED_STOP_WATCH(c, enabled) \ + ScopedStopWatch<MonotonicStopWatch> \ + MACRO_CONCAT(STOP_WATCH, __COUNTER__)(c, enabled) + #define SCOPED_CONCURRENT_STOP_WATCH(c) \ ScopedStopWatch<ConcurrentStopWatch> \ MACRO_CONCAT(CONCURRENT_STOP_WATCH, __COUNTER__)(c) @@ -231,23 +235,25 @@ class ConcurrentStopWatch { }; /// Utility class that starts the stop watch in the constructor and stops the watch when -/// the object goes out of scope. +/// the object goes out of scope. If the optional argument 'enabled' is false, the +/// stopwatch is not updated. /// 'T' must implement the StopWatch interface "Start", "Stop". template<class T> class ScopedStopWatch { public: - ScopedStopWatch(T* sw) : - sw_(sw) { + ScopedStopWatch(T* sw, bool enabled = true) : + sw_(sw), enabled_(enabled) { DCHECK(sw != NULL); - sw_->Start(); + if (enabled_) sw_->Start(); } ~ScopedStopWatch() { - sw_->Stop(); + if (enabled_) sw_->Stop(); } private: T* sw_; + bool enabled_; }; }
