IMPALA-3311: fix string data coming out of aggs in subplans The problem: varlen data (e.g. strings) produced by aggregations is freed by FreeLocalAllocations() after passing up the output batch. This works for streaming operators or blocking operators that copy their input, but results in memory corruption when the output reaches non-copying blocking operators, e.g. SubplanNode and NestedLoopJoinNode.
The fix: this patch makes the PartitionedAggregationNode copy out produced string data if the node is in a subplan. Otherwise it calls MarkNeedsToReturn() on the output batch. Marking the batch would work in the subplan case as well, but would likely be less efficient since it would result in many small batches coming out of the subplan. The patch includes a test case. However, this test only exposes the problem with an ASAN build and the --disable_mem_pools flag, which we don't currently have automated testing for. Change-Id: Iada891504c261ba54f4eb8c9d7e4e5223668d7b9 Reviewed-on: http://gerrit.cloudera.org:8080/2929 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7767d300 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7767d300 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7767d300 Branch: refs/heads/master Commit: 7767d300a3f018c8c8b32fa72abe5c126900a2be Parents: cb37774 Author: Skye Wanderman-Milne <[email protected]> Authored: Thu May 12 17:03:12 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 23:06:36 2016 -0700 ---------------------------------------------------------------------- be/src/exec/partitioned-aggregation-node.cc | 55 ++++++++++++++++++++ be/src/exec/partitioned-aggregation-node.h | 14 +++++ be/src/exprs/agg-fn-evaluator.h | 1 + .../queries/QueryTest/nested-types-runtime.test | 16 ++++++ .../queries/subplan_aggregation.test | 11 ++++ 5 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 0bf51a9..b7dca61 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -360,6 +360,61 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + int first_row_idx = row_batch->num_rows(); + RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); + RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx)); + return Status::OK(); +} + +Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, + int first_row_idx) { + if (!needs_finalize_ && !needs_serialize_) return Status::OK(); + // String data returned by Serialize() or Finalize() is from local expr allocations in + // the agg function contexts, and will be freed on the next GetNext() call by + // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan + // tree via MarkNeedToReturn(). (See IMPALA-3311) + for (int i = 0; i < aggregate_evaluators_.size(); ++i) { + const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc(); + DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI"; + if (!slot_desc->type().IsVarLenStringType()) continue; + if (IsInSubplan()) { + // Copy string data to the row batch's pool. This is more efficient than + // MarkNeedToReturn() in a subplan since we are likely producing many small batches. + RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx, + row_batch->tuple_data_pool())); + } else { + row_batch->MarkNeedToReturn(); + break; + } + } + return Status::OK(); +} + +Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc, + RowBatch* row_batch, int first_row_idx, MemPool* pool) { + DCHECK(slot_desc->type().IsVarLenStringType()); + DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); + FOREACH_ROW(row_batch, first_row_idx, batch_iter) { + Tuple* tuple = batch_iter.Get()->GetTuple(0); + StringValue* sv = reinterpret_cast<StringValue*>( + tuple->GetSlot(slot_desc->tuple_offset())); + if (sv == NULL || sv->len == 0) continue; + char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len)); + if (new_ptr == NULL) { + Status s = Status::MemLimitExceeded(); + s.AddDetail(Substitute("Cannot perform aggregation at node with id $0." + " Failed to allocate $1 output bytes.", id_, sv->len)); + state_->SetMemLimitExceeded(); + return s; + } + memcpy(new_ptr, sv->ptr, sv->len); + sv->ptr = new_ptr; + } + return Status::OK(); +} + +Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, + RowBatch* row_batch, bool* eos) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 0b94511..ab560c5 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -390,6 +390,20 @@ class PartitionedAggregationNode : public ExecNode { /// a temporary buffer. boost::scoped_ptr<BufferedTupleStream> serialize_stream_; + /// Materializes 'row_batch' in either grouping or non-grouping case. + Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); + + /// Helper function called by GetNextInternal() to ensure that string data referenced in + /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the + /// first row that should be processed in 'row_batch'. + Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx); + + /// Copies string data from the specified slot into 'pool', and sets the StringValues' + /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from + /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type. + Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch, + int first_row_idx, MemPool* pool); + /// Constructs singleton output tuple, allocating memory from pool. Tuple* ConstructSingletonOutputTuple( const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exprs/agg-fn-evaluator.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 98fb4a1..e9598ea 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -118,6 +118,7 @@ class AggFnEvaluator { const std::string& fn_name() const { return fn_.name.function_name; } const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; } const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; } + const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; } static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs); std::string DebugString() const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test index 2e38d1d..35e27c5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test +++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test @@ -450,3 +450,19 @@ inner join t2.int_array a ---- TYPES bigint ==== +---- QUERY +# IMPALA-3311: test string data coming out of an agg in a subplan +select id, m from complextypestbl t, +(select min(cast(item as string)) m from t.int_array) v +---- RESULTS +1,'1' +2,'1' +3,'NULL' +4,'NULL' +5,'NULL' +6,'NULL' +7,'NULL' +8,'-1' +---- TYPES +BIGINT,STRING +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/perf-regression/queries/subplan_aggregation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/perf-regression/queries/subplan_aggregation.test b/testdata/workloads/perf-regression/queries/subplan_aggregation.test new file mode 100644 index 0000000..b9ea894 --- /dev/null +++ b/testdata/workloads/perf-regression/queries/subplan_aggregation.test @@ -0,0 +1,11 @@ +==== +---- QUERY: subplan_aggregation +-- Description: Agg in subplan produces string output that's fed to non-trivial parent +-- plan +-- Target test case: Regression test for IMPALA-3311 +select c_custkey, max(m) from customer c, +(select max(o_orderstatus) m from c.c_orders) v +group by c_custkey order by 1 limit 1 +---- RESULTS +---- TYPES +====
