Repository: incubator-impala Updated Branches: refs/heads/master 03734fe87 -> 924066a4f
IMPALA-5611: KuduPartitionExpr holds onto memory unnecessarily IMPALA-3742 introduced KuduPartitionExpr, which takes a row and passes it to the Kudu client to determine what partitionit belongs to. The DataStreamSender never frees the local allocations for the Kudu partition exprs causing it to hang on to memory longer than it needs to. This patch also fixes two other related issues: - DataStreamSender was dropping the Status from AddRow in the Kudu branch. Adds 'RETURN_IF_ERROR' and 'WARN_UNUSED_RESULT' - Changes the HASH case in DataStreamSender to call FreeLocalAllocations on a per-batch basis, instead of a per-row basis. Testing: - Added an e2e test that runs a large insert with a mem limit that failed with oom previously. Change-Id: Ia661eb8bed114070728a1497ccf7ed6893237e5e Reviewed-on: http://gerrit.cloudera.org:8080/7346 Reviewed-by: Dan Hecht <[email protected]> Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4e178390 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4e178390 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4e178390 Branch: refs/heads/master Commit: 4e17839033f931f98e0c3ec46d99b250b0bb4660 Parents: 03734fe Author: Thomas Tauber-Marshall <[email protected]> Authored: Fri Jun 30 12:00:08 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jul 6 21:45:39 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/data-stream-sender.cc | 14 +++++++------- .../queries/QueryTest/kudu_insert.test | 6 ++++++ 2 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e178390/be/src/runtime/data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc index aeadb56..16cf87e 100644 --- a/be/src/runtime/data-stream-sender.cc +++ b/be/src/runtime/data-stream-sender.cc @@ -82,21 +82,21 @@ class DataStreamSender::Channel : public CacheLineAligned { // Initialize channel. // Returns OK if successful, error indication otherwise. - Status Init(RuntimeState* state); + Status Init(RuntimeState* state) WARN_UNUSED_RESULT; // Copies a single row into this channel's output buffer and flushes buffer // if it reaches capacity. // Returns error status if any of the preceding rpcs failed, OK otherwise. - Status AddRow(TupleRow* row); + Status AddRow(TupleRow* row) WARN_UNUSED_RESULT; // Asynchronously sends a row batch. // Returns the status of the most recently finished TransmitData // rpc (or OK if there wasn't one that hasn't been reported yet). - Status SendBatch(TRowBatch* batch); + Status SendBatch(TRowBatch* batch) WARN_UNUSED_RESULT; // Return status of last TransmitData rpc (initiated by the most recent call // to either SendBatch() or SendCurrentBatch()). - Status GetSendStatus(); + Status GetSendStatus() WARN_UNUSED_RESULT; // Waits for the rpc thread pool to finish the current rpc. void WaitForRpc(); @@ -105,7 +105,7 @@ class DataStreamSender::Channel : public CacheLineAligned { void Teardown(RuntimeState* state); // Flushes any buffered row batches and sends the EOS RPC to close the channel. - Status FlushAndSendEos(RuntimeState* state); + Status FlushAndSendEos(RuntimeState* state) WARN_UNUSED_RESULT; int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; } TRowBatch* thrift_batch() { return &thrift_batch_; } @@ -452,7 +452,7 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { partition = next_unknown_partition_; ++next_unknown_partition_; } - channels_[partition % num_channels]->AddRow(row); + RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row)); } } else { DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED); @@ -474,11 +474,11 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) { hash_val = RawValue::GetHashValueFnv( partition_val, partition_exprs_[i]->type(), hash_val); } - ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_); RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row)); } } COUNTER_ADD(total_sent_rows_counter_, batch->num_rows()); + ScalarExprEvaluator::FreeLocalAllocations(partition_expr_evals_); RETURN_IF_ERROR(state->CheckQueryState()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4e178390/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test index 7572fe2..71b09fc 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test @@ -430,3 +430,9 @@ insert into multiple_partition_cols select bigint_col, null, string_col NumModifiedRows: 0 NumRowErrors: 1 ==== +---- QUERY +# IMPALA-5611 - test a large insert with a memory limit +set mem_limit=400m; +create table kudu_test primary key(a, b) partition by hash(a, b) partitions 8 stored as kudu as +select l_orderkey a, concat(l_comment, l_comment, l_comment) b from tpch.lineitem +==== \ No newline at end of file
