Repository: incubator-impala
Updated Branches:
  refs/heads/master ffa7829b7 -> 3e2411f30


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index c493c1c..36dfce6 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -75,8 +75,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
  protected:
   virtual void AddToDebugString(int indentation_level, std::stringstream* out) 
const;
-  virtual Status InitGetNext(TupleRow* first_probe_row);
-  virtual Status ConstructBuildSide(RuntimeState* state);
+  virtual Status ProcessBuildInput(RuntimeState* state);
 
  private:
   class Partition;
@@ -354,8 +353,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// For each filter in filters_, allocate a bloom_filter from the 
fragment-local
   /// RuntimeFilterBank and store it in runtime_filters_ to populate during 
the build
-  /// phase. Returns false if filter construction is disabled.
-  bool AllocateRuntimeFilters(RuntimeState* state);
+  /// phase.
+  void AllocateRuntimeFilters(RuntimeState* state);
 
   /// Publish the runtime filters to the fragment-local
   /// RuntimeFilterBank. 'total_build_rows' is used to determine whether the 
computed

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/exec/row-batch-cache.h
----------------------------------------------------------------------
diff --git a/be/src/exec/row-batch-cache.h b/be/src/exec/row-batch-cache.h
index 53345f9..81ae25c 100644
--- a/be/src/exec/row-batch-cache.h
+++ b/be/src/exec/row-batch-cache.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_EXEC_ROW_BATCH_CACHE_H
 #define IMPALA_EXEC_ROW_BATCH_CACHE_H
 
+#include <memory>
 #include <vector>
 
 #include "runtime/row-batch.h"
@@ -40,16 +41,21 @@ class RowBatchCache {
       next_row_batch_idx_(0) {
   }
 
+  ~RowBatchCache() {
+    DCHECK_EQ(0, row_batches_.size());
+  }
+
   /// Returns the next batch from the cache. Expands the cache if necessary.
   RowBatch* GetNextBatch() {
     if (next_row_batch_idx_ >= row_batches_.size()) {
       // Expand the cache with a new row batch.
-      row_batches_.push_back(new RowBatch(row_desc_, batch_size_, 
mem_tracker_));
+      row_batches_.push_back(
+          std::make_unique<RowBatch>(row_desc_, batch_size_, mem_tracker_));
     } else {
       // Reset batch from the cache before returning it.
       row_batches_[next_row_batch_idx_]->Reset();
     }
-    return row_batches_[next_row_batch_idx_++];
+    return row_batches_[next_row_batch_idx_++].get();
   }
 
   /// Resets the cache such that subsequent calls to GetNextBatch() return 
batches from
@@ -57,10 +63,16 @@ class RowBatchCache {
   /// are invalid.
   void Reset() { next_row_batch_idx_ = 0; }
 
-  ~RowBatchCache() {
-    std::vector<RowBatch*>::iterator it;
-    for (it = row_batches_.begin(); it != row_batches_.end(); ++it) delete *it;
-    row_batches_.clear();
+  /// Delete and free resources associated with all cached batch objects. Must 
be called
+  /// before the RowBatchCache is destroyed.
+  void Clear() {
+    row_batches_.clear(); // unique_ptr automatically calls all destructors.
+  }
+
+  /// Return the last batch returned from GetNextBatch() since Reset(), or 
NULL if
+  /// GetNextBatch() has not yet been called.
+  RowBatch* GetLastBatchReturned() {
+    return next_row_batch_idx_ == 0 ? NULL : row_batches_[next_row_batch_idx_ 
- 1].get();
   }
 
  private:
@@ -70,7 +82,7 @@ class RowBatchCache {
   MemTracker* mem_tracker_; // not owned
 
   /// List of cached row-batch objects. The row-batch objects are owned by 
this cache.
-  std::vector<RowBatch*> row_batches_;
+  std::vector<std::unique_ptr<RowBatch>> row_batches_;
 
   /// Index of next row batch to return in GetRowBatch().
   int next_row_batch_idx_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index b210a75..33dc7a0 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -23,6 +23,7 @@
 #include "common/logging.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
 #include "runtime/tuple-row.h"
 #include "runtime/row-batch.h"
@@ -47,6 +48,7 @@ using boost::condition_variable;
 using namespace apache::thrift;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
+using strings::Substitute;
 
 namespace impala {
 
@@ -158,7 +160,7 @@ Status DataStreamSender::Channel::Init(RuntimeState* state) 
{
   runtime_state_ = state;
   // TODO: figure out how to size batch_
   int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1));
-  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker_.get()));
+  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
   return Status::OK();
 }
 
@@ -309,9 +311,9 @@ Status 
DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
   VLOG_RPC << "calling TransmitData(eos=true) to terminate channel.";
   rpc_status_ = DoTransmitDataRpc(&client, params, &res);
   if (!rpc_status_.ok()) {
-    stringstream msg;
-    msg << "TransmitData(eos=true) to " << address_ << " failed:\n" << 
rpc_status_.msg().msg();
-    return Status(rpc_status_.code(), msg.str());
+    return Status(rpc_status_.code(),
+       Substitute("TransmitData(eos=true) to $0 failed:\n $1",
+        TNetworkAddressToString(address_), rpc_status_.msg().msg()));
   }
   return Status(res.status);
 }
@@ -327,14 +329,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
     const RowDescriptor& row_desc, const TDataStreamSink& sink,
     const vector<TPlanFragmentDestination>& destinations,
     int per_channel_buffer_size)
-  : sender_id_(sender_id),
+  : DataSink(row_desc),
+    sender_id_(sender_id),
     pool_(pool),
-    row_desc_(row_desc),
     current_channel_idx_(0),
     flushed_(false),
     closed_(false),
     current_thrift_batch_(&thrift_batch1_),
-    profile_(NULL),
     serialize_batch_timer_(NULL),
     thrift_transmit_timer_(NULL),
     bytes_sent_counter_(NULL),
@@ -369,6 +370,10 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int 
sender_id,
   }
 }
 
+string DataStreamSender::GetName() {
+  return Substitute("DataStreamSender (dst_id=$0)", dest_node_id_);
+}
+
 DataStreamSender::~DataStreamSender() {
   // TODO: check that sender was either already closed() or there was an error
   // on some channel
@@ -377,18 +382,12 @@ DataStreamSender::~DataStreamSender() {
   }
 }
 
-Status DataStreamSender::Prepare(RuntimeState* state) {
-  DCHECK(state != NULL);
+Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* mem_tracker) 
{
+  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
   state_ = state;
-  stringstream title;
-  title << "DataStreamSender (dst_id=" << dest_node_id_ << ")";
-  profile_ = pool_->Add(new RuntimeProfile(pool_, title.str()));
   SCOPED_TIMER(profile_->total_time_counter());
 
-  mem_tracker_.reset(new MemTracker(profile(), -1, -1, "DataStreamSender",
-      state->instance_mem_tracker()));
-  RETURN_IF_ERROR(
-      Expr::Prepare(partition_expr_ctxs_, state, row_desc_, 
mem_tracker_.get()));
+  RETURN_IF_ERROR(Expr::Prepare(partition_expr_ctxs_, state, row_desc_, 
mem_tracker));
 
   bytes_sent_counter_ =
       ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
@@ -418,7 +417,7 @@ Status DataStreamSender::Open(RuntimeState* state) {
   return Expr::Open(partition_expr_ctxs_, state);
 }
 
-Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch, bool eos) {
+Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   DCHECK(!closed_);
   DCHECK(!flushed_);
 
@@ -486,10 +485,7 @@ void DataStreamSender::Close(RuntimeState* state) {
     channels_[i]->Teardown(state);
   }
   Expr::Close(partition_expr_ctxs_, state);
-  if (mem_tracker_.get() != NULL) {
-    mem_tracker_->UnregisterFromParent();
-    mem_tracker_.reset();
-  }
+  DataSink::Close(state);
   closed_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index 4a7b74e..9b9e7d0 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -63,9 +63,11 @@ class DataStreamSender : public DataSink {
     int per_channel_buffer_size);
   virtual ~DataStreamSender();
 
+  virtual std::string GetName();
+
   /// Must be called before other API calls, and before the codegen'd IR 
module is
   /// compiled (i.e. in an ExecNode's Prepare() function).
-  virtual Status Prepare(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
 
   /// Must be called before Send() or Close(), and after the codegen'd IR 
module is
   /// compiled (i.e. in an ExecNode's Open() function).
@@ -81,7 +83,7 @@ class DataStreamSender : public DataSink {
   /// Blocks until all rows in batch are placed in their appropriate outgoing
   /// buffers (ie, blocks if there are still in-flight rpcs from the last
   /// Send() call).
-  virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos);
+  virtual Status Send(RuntimeState* state, RowBatch* batch);
 
   /// Shutdown all existing channels to destination hosts. Further 
FlushFinal() calls are
   /// illegal after calling Close().
@@ -96,8 +98,6 @@ class DataStreamSender : public DataSink {
   /// broadcast to multiple receivers, they are counted once per receiver.
   int64_t GetNumDataBytesSent() const;
 
-  virtual RuntimeProfile* profile() { return profile_; }
-
  private:
   class Channel;
 
@@ -105,7 +105,6 @@ class DataStreamSender : public DataSink {
   int sender_id_;
   RuntimeState* state_;
   ObjectPool* pool_;
-  const RowDescriptor& row_desc_;
   bool broadcast_;  // if true, send all rows on all channels
   bool random_; // if true, round-robins row batches among channels
   int current_channel_idx_; // index of current channel to send to if random_ 
== true
@@ -126,14 +125,12 @@ class DataStreamSender : public DataSink {
   std::vector<ExprContext*> partition_expr_ctxs_;  // compute per-row 
partition values
   std::vector<Channel*> channels_;
 
-  RuntimeProfile* profile_; // Allocated from pool_
   RuntimeProfile::Counter* serialize_batch_timer_;
   /// The concurrent wall time spent sending data over the network.
   RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_;
   RuntimeProfile::Counter* bytes_sent_counter_;
   RuntimeProfile::Counter* uncompressed_bytes_counter_;
   RuntimeProfile::Counter* total_sent_rows_counter_;
-  boost::scoped_ptr<MemTracker> mem_tracker_;
 
   /// Throughput per time spent in TransmitData
   RuntimeProfile::Counter* network_throughput_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index a0b292f..dae4724 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -489,7 +489,7 @@ class DataStreamTest : public testing::Test {
     const TDataStreamSink& sink = GetSink(partition_type);
     DataStreamSender sender(
         &obj_pool_, sender_num, *row_desc_, sink, dest_, channel_buffer_size);
-    EXPECT_OK(sender.Prepare(&state));
+    EXPECT_OK(sender.Prepare(&state, &tracker_));
     EXPECT_OK(sender.Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
     SenderInfo& info = sender_info_[sender_num];
@@ -497,7 +497,7 @@ class DataStreamTest : public testing::Test {
     for (int i = 0; i < NUM_BATCHES; ++i) {
       GetNextBatch(batch.get(), &next_val);
       VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
-      info.status = sender.Send(&state, batch.get(), false);
+      info.status = sender.Send(&state, batch.get());
       if (!info.status.ok()) break;
     }
     VLOG_QUERY << "closing sender" << sender_num;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index a54db7a..35ac757 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -265,7 +265,9 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
         obj_pool(), request.fragment_ctx.fragment.output_sink,
         request.fragment_ctx.fragment.output_exprs,
         fragment_instance_ctx, row_desc(), &sink_));
-    RETURN_IF_ERROR(sink_->Prepare(runtime_state()));
+    sink_mem_tracker_.reset(new MemTracker(-1, -1, sink_->GetName(),
+        runtime_state_->instance_mem_tracker(), true));
+    RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
 
     RuntimeProfile* sink_profile = sink_->profile();
     if (sink_profile != NULL) {
@@ -364,35 +366,24 @@ Status PlanFragmentExecutor::Open() {
 }
 
 Status PlanFragmentExecutor::OpenInternal() {
-  {
-    SCOPED_TIMER(profile()->total_time_counter());
-    RETURN_IF_ERROR(plan_->Open(runtime_state_.get()));
-  }
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(plan_->Open(runtime_state_.get()));
   if (sink_.get() == NULL) return Status::OK();
 
-  RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));
   // If there is a sink, do all the work of driving it here, so that
   // when this returns the query has actually finished
+  RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));
   while (!done_) {
-    RowBatch* batch;
-    RETURN_IF_ERROR(GetNextInternal(&batch));
-    if (batch == NULL) break;
-
-    if (VLOG_ROW_IS_ON) {
-      VLOG_ROW << "OpenInternal: #rows=" << batch->num_rows();
-      for (int i = 0; i < batch->num_rows(); ++i) {
-        VLOG_ROW << PrintRow(batch->GetRow(i), row_desc());
-      }
-    }
-    SCOPED_TIMER(profile()->total_time_counter());
-    RETURN_IF_ERROR(sink_->Send(runtime_state(), batch, done_));
+    row_batch_->Reset();
+    RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), 
&done_));
+    if (VLOG_ROW_IS_ON) 
row_batch_->VLogRows("PlanFragmentExecutor::OpenInternal()");
+    COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
+    RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get()));
   }
 
   // Flush the sink *before* stopping the report thread. Flush may need to add 
some
   // important information to the last report that gets sent. (e.g. table 
sinks record the
   // files they have written to in this method)
-  //
-  SCOPED_TIMER(profile()->total_time_counter());
   RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
   return Status::OK();
 }
@@ -473,47 +464,36 @@ void PlanFragmentExecutor::StopReportThread() {
   report_thread_->Join();
 }
 
-// TODO: why can't we just put the total_time_counter() at the
-// beginning of Open() and GetNext(). This seems to really mess
-// the timer here, presumably because the data stream sender is
-// multithreaded and the timer we use gets confused.
 Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
-  VLOG_FILE << "GetNext(): instance_id="
-      << runtime_state_->fragment_instance_id();
-  Status status = GetNextInternal(batch);
+  SCOPED_TIMER(profile()->total_time_counter());
+  VLOG_FILE << "GetNext(): instance_id=" << 
runtime_state_->fragment_instance_id();
+
+  Status status = Status::OK();
+  row_batch_->Reset();
+  // Loop until we've got a non-empty batch, hit an error or exhausted the 
input.
+  while (!done_) {
+    status = plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_);
+    if (VLOG_ROW_IS_ON) 
row_batch_->VLogRows("PlanFragmentExecutor::GetNext()");
+    if (!status.ok()) break;
+    if (row_batch_->num_rows() > 0) break;
+    row_batch_->Reset();
+  }
   UpdateStatus(status);
+
   if (done_) {
     VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
         << " instance_id=" << PrintId(runtime_state_->fragment_instance_id());
     FragmentComplete();
-    // GetNext() uses *batch = NULL to signal the end.
-    if (*batch != NULL && (*batch)->num_rows() == 0) *batch = NULL;
+    // Once all rows are returned, signal that we're done with an empty batch.
+    *batch = row_batch_->num_rows() == 0 ? NULL : row_batch_.get();
+    return status;
   }
 
+  *batch = row_batch_.get();
+  COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
   return status;
 }
 
-Status PlanFragmentExecutor::GetNextInternal(RowBatch** batch) {
-  if (done_) {
-    *batch = NULL;
-    return Status::OK();
-  }
-
-  while (!done_) {
-    row_batch_->Reset();
-    SCOPED_TIMER(profile()->total_time_counter());
-    RETURN_IF_ERROR(
-        plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_));
-    *batch = row_batch_.get();
-    if (row_batch_->num_rows() > 0) {
-      COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
-      break;
-    }
-  }
-
-  return Status::OK();
-}
-
 void PlanFragmentExecutor::FragmentComplete() {
   // Check the atomic flag. If it is set, then a fragment complete report has 
already
   // been sent.
@@ -596,6 +576,10 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
 void PlanFragmentExecutor::Close() {
   if (closed_) return;
   row_batch_.reset();
+  if (sink_mem_tracker_ != NULL) {
+    sink_mem_tracker_->UnregisterFromParent();
+    sink_mem_tracker_.reset();
+  }
   // Prepare may not have been called, which sets runtime_state_
   if (runtime_state_.get() != NULL) {
     if (runtime_state_->query_resource_mgr() != NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h 
b/be/src/runtime/plan-fragment-executor.h
index 0369b4c..f4355ea 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -194,6 +194,8 @@ class PlanFragmentExecutor {
   /// returned via GetNext's row batch
   /// Created in Prepare (if required), owned by this object.
   boost::scoped_ptr<DataSink> sink_;
+  boost::scoped_ptr<MemTracker> sink_mem_tracker_;
+
   boost::scoped_ptr<RowBatch> row_batch_;
   boost::scoped_ptr<TRowBatch> thrift_batch_;
 
@@ -280,10 +282,6 @@ class PlanFragmentExecutor {
   /// have been stopped. sink_ will be set to NULL after successful execution.
   Status OpenInternal();
 
-  /// Executes GetNext() logic and returns resulting status.
-  /// sets done_ to true if the last row batch was returned.
-  Status GetNextInternal(RowBatch** batch);
-
   /// Stops report thread, if one is running. Blocks until report thread 
terminates.
   /// Idempotent.
   void StopReportThread();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 22e1a70..e602293 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -26,6 +26,7 @@
 #include "runtime/tuple-row.h"
 #include "util/compress.h"
 #include "util/decompress.h"
+#include "util/debug-util.h"
 #include "util/fixed-size-hash-table.h"
 #include "gen-cpp/Results_types.h"
 
@@ -454,4 +455,12 @@ Status 
RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state,
   return Status::OK();
 }
 
+void RowBatch::VLogRows(const string& context) {
+  if (!VLOG_ROW_IS_ON) return;
+  VLOG_ROW << context << ": #rows=" << num_rows_;
+  for (int i = 0; i < num_rows_; ++i) {
+    VLOG_ROW << PrintRow(GetRow(i), row_desc_);
+  }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index af1896e..f529c3d 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -299,6 +299,10 @@ class RowBatch {
   Status ResizeAndAllocateTupleBuffer(RuntimeState* state, int64_t* 
buffer_size,
        uint8_t** buffer);
 
+  /// Helper function to log the batch's rows if VLOG_ROW is enabled. 
'context' is a
+  /// string to prepend to the log message.
+  void VLogRows(const std::string& context);
+
  private:
   friend class RowBatchSerializeBaseline;
   friend class RowBatchSerializeBenchmark;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e2411f3/be/src/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index c42b523..0e73b6a 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -32,6 +32,10 @@ namespace impala {
   ScopedStopWatch<MonotonicStopWatch> \
   MACRO_CONCAT(STOP_WATCH, __COUNTER__)(c)
 
+#define CONDITIONAL_SCOPED_STOP_WATCH(c, enabled) \
+  ScopedStopWatch<MonotonicStopWatch> \
+  MACRO_CONCAT(STOP_WATCH, __COUNTER__)(c, enabled)
+
 #define SCOPED_CONCURRENT_STOP_WATCH(c) \
   ScopedStopWatch<ConcurrentStopWatch> \
   MACRO_CONCAT(CONCURRENT_STOP_WATCH, __COUNTER__)(c)
@@ -231,23 +235,25 @@ class ConcurrentStopWatch {
 };
 
 /// Utility class that starts the stop watch in the constructor and stops the 
watch when
-/// the object goes out of scope.
+/// the object goes out of scope. If the optional argument 'enabled' is false, 
the
+/// stopwatch is not updated.
 /// 'T' must implement the StopWatch interface "Start", "Stop".
 template<class T>
 class ScopedStopWatch {
  public:
-  ScopedStopWatch(T* sw) :
-    sw_(sw) {
+  ScopedStopWatch(T* sw, bool enabled = true) :
+    sw_(sw), enabled_(enabled) {
     DCHECK(sw != NULL);
-    sw_->Start();
+    if (enabled_) sw_->Start();
   }
 
   ~ScopedStopWatch() {
-    sw_->Stop();
+    if (enabled_) sw_->Stop();
   }
 
  private:
   T* sw_;
+  bool enabled_;
 };
 
 }

Reply via email to