IMPALA-6461 : Micro-optimizations to DataStreamSender::Send

While analyzing performance of partition exchange operator,
I noticed that there is dependency and a function call per row in the hot path.

Optimizations in this change are:
1) Remove the data dependency between computing the hash and the channel
2) Inline DataStreamSender::Channel::AddRow
3) Save partition_exprs_.size() to save a couple of instructions

This translates to improving CPI for DataStreamSender::Send by 10%

Change-Id: I642a9dad531a29d4838a3537ab0e04320a69960d
Reviewed-on: http://gerrit.cloudera.org:8080/9221
Reviewed-by: Mostafa Mokhtar <mmokh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bf426527
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bf426527
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bf426527

Branch: refs/heads/2.x
Commit: bf42652748f2dad2fd343574ba205eb88089549c
Parents: 9d7b210
Author: mmokhtar <mmokh...@cloudera.com>
Authored: Mon Feb 5 17:05:02 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 22 02:52:03 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-sender.cc      | 77 ++++++++++++++++++--------
 be/src/runtime/krpc-data-stream-sender.cc | 77 ++++++++++++++++++--------
 be/src/runtime/row-batch.h                |  4 ++
 3 files changed, 110 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index c572467..30bf5b6 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -87,7 +87,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   // Copies a single row into this channel's output buffer and flushes buffer
   // if it reaches capacity.
   // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status AddRow(TupleRow* row) WARN_UNUSED_RESULT;
+  Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Asynchronously sends a row batch.
   // Returns the status of the most recently finished TransmitData
@@ -243,7 +243,7 @@ void DataStreamSender::Channel::WaitForRpc() {
   }
 }
 
-Status DataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it; but first wait for an ongoing
     // transmission to finish before modifying thrift_batch_
@@ -443,16 +443,29 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't coorespond to a partition, e.g. it's outside the 
given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
+      const int batch_window_size = min(num_rows - batch_start, 
hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            
*reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          //  e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
@@ -460,20 +473,36 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and 
remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result 
in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between 
computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not 
result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 4866e4e..6c0ad01 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -129,7 +129,7 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // it reaches capacity. This call may block if the row batch's capacity is 
reached
   // and the preceding RPC is still in progress. Returns error status if 
serialization
   // failed or if the preceding RPC failed. Return OK otherwise.
-  Status AddRow(TupleRow* row);
+  Status ALWAYS_INLINE AddRow(TupleRow* row);
 
   // Shutdowns the channel and frees the row batch allocation. Any in-flight 
RPC will
   // be cancelled. It's expected that clients normally call FlushAndSendEos() 
before
@@ -474,7 +474,7 @@ Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
   return Status::OK();
 }
 
-Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it.
     RETURN_IF_ERROR(SendCurrentBatch());
@@ -660,16 +660,29 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't correspond to a partition, e.g. it's outside the 
given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            
*reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          // e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
@@ -677,20 +690,36 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and 
remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result 
in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between 
computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not 
result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aad5ebe..3bde4d1 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -385,6 +385,10 @@ class RowBatch {
   // in order to leave room for variable-length data.
   static const int FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
 
+  // Batch size to compute hash, keep it small to avoid large stack 
allocations.
+  // 16 provided the same speedup compared to operating over a full batch.
+  static const int HASH_BATCH_SIZE = 16;
+
   /// Allocates a buffer large enough for the fixed-length portion of 
'capacity_' rows in
   /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the 
allocation would
   /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least 
one row.

Reply via email to