Repository: incubator-impala
Updated Branches:
  refs/heads/master 03734fe87 -> 924066a4f


IMPALA-5611: KuduPartitionExpr holds onto memory unnecessarily

IMPALA-3742 introduced KuduPartitionExpr, which takes a row and passes
it to the Kudu client to determine what partitionit belongs to.

The DataStreamSender never frees the local allocations for the Kudu
partition exprs causing it to hang on to memory longer than it needs to.

This patch also fixes two other related issues:
- DataStreamSender was dropping the Status from AddRow in the Kudu
  branch. Adds 'RETURN_IF_ERROR' and 'WARN_UNUSED_RESULT'
- Changes the HASH case in DataStreamSender to call FreeLocalAllocations
  on a per-batch basis, instead of a per-row basis.

Testing:
- Added an e2e test that runs a large insert with a mem limit that
  failed with oom previously.

Change-Id: Ia661eb8bed114070728a1497ccf7ed6893237e5e
Reviewed-on: http://gerrit.cloudera.org:8080/7346
Reviewed-by: Dan Hecht <[email protected]>
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4e178390
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4e178390
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4e178390

Branch: refs/heads/master
Commit: 4e17839033f931f98e0c3ec46d99b250b0bb4660
Parents: 03734fe
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Fri Jun 30 12:00:08 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jul 6 21:45:39 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-sender.cc                  | 14 +++++++-------
 .../queries/QueryTest/kudu_insert.test                |  6 ++++++
 2 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e178390/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index aeadb56..16cf87e 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -82,21 +82,21 @@ class DataStreamSender::Channel : public CacheLineAligned {
 
   // Initialize channel.
   // Returns OK if successful, error indication otherwise.
-  Status Init(RuntimeState* state);
+  Status Init(RuntimeState* state) WARN_UNUSED_RESULT;
 
   // Copies a single row into this channel's output buffer and flushes buffer
   // if it reaches capacity.
   // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status AddRow(TupleRow* row);
+  Status AddRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Asynchronously sends a row batch.
   // Returns the status of the most recently finished TransmitData
   // rpc (or OK if there wasn't one that hasn't been reported yet).
-  Status SendBatch(TRowBatch* batch);
+  Status SendBatch(TRowBatch* batch) WARN_UNUSED_RESULT;
 
   // Return status of last TransmitData rpc (initiated by the most recent call
   // to either SendBatch() or SendCurrentBatch()).
-  Status GetSendStatus();
+  Status GetSendStatus() WARN_UNUSED_RESULT;
 
   // Waits for the rpc thread pool to finish the current rpc.
   void WaitForRpc();
@@ -105,7 +105,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   void Teardown(RuntimeState* state);
 
   // Flushes any buffered row batches and sends the EOS RPC to close the 
channel.
-  Status FlushAndSendEos(RuntimeState* state);
+  Status FlushAndSendEos(RuntimeState* state) WARN_UNUSED_RESULT;
 
   int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
   TRowBatch* thrift_batch() { return &thrift_batch_; }
@@ -452,7 +452,7 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
         partition = next_unknown_partition_;
         ++next_unknown_partition_;
       }
-      channels_[partition % num_channels]->AddRow(row);
+      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
@@ -474,11 +474,11 @@ Status DataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
         hash_val = RawValue::GetHashValueFnv(
             partition_val, partition_exprs_[i]->type(), hash_val);
       }
-      ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_);
       RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
+  ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_);
   RETURN_IF_ERROR(state->CheckQueryState());
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e178390/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 7572fe2..71b09fc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -430,3 +430,9 @@ insert into multiple_partition_cols select bigint_col, 
null, string_col
 NumModifiedRows: 0
 NumRowErrors: 1
 ====
+---- QUERY
+# IMPALA-5611 - test a large insert with a memory limit
+set mem_limit=400m;
+create table kudu_test primary key(a, b) partition by hash(a, b) partitions 8 
stored as kudu as
+select l_orderkey a, concat(l_comment, l_comment, l_comment) b from 
tpch.lineitem
+====
\ No newline at end of file

Reply via email to