IMPALA-5885: free runtime filter allocations in Parquet This fixes the parquet scanner to free local allocations in runtime filter contexts for every batch.
Testing: Added a regression test that runs out of memory before this fix. Ran core and ASAN builds. Change-Id: Iecdda6af12d5ca578f7d2cb393e9cb9f49438f09 Reviewed-on: http://gerrit.cloudera.org:8080/7931 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9ce691af Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9ce691af Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9ce691af Branch: refs/heads/master Commit: 9ce691af22d37d9e6914b6cc82dc241b9530e200 Parents: 915d30f Author: Tim Armstrong <[email protected]> Authored: Thu Aug 31 15:44:28 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Sep 6 00:10:45 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 11 +++++++++-- be/src/exec/hdfs-parquet-scanner.h | 5 +++++ be/src/exec/hdfs-scan-node.cc | 7 +------ .../queries/QueryTest/runtime_row_filters.test | 18 ++++++++++++++++++ 4 files changed, 33 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 03b9c70..f20818a 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -1029,11 +1029,18 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { // Store UDF error in thread local storage or make UDF return status so it can merge // with parse_status_. RETURN_IF_ERROR(state_->GetQueryStatus()); - // Free local expr allocations for this thread + // Free local expr allocations made when evaluating conjuncts for this batch. + FreeLocalAllocationsForConjuncts(); + return Status::OK(); +} + +void HdfsParquetScanner::FreeLocalAllocationsForConjuncts() { for (const auto& kv: conjunct_evals_map_) { ScalarExprEvaluator::FreeLocalAllocations(kv.second); } - return Status::OK(); + for (const FilterContext* filter_ctx : filter_ctxs_) { + filter_ctx->expr_eval->FreeLocalAllocations(); + } } int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 5f67036..754737d 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -650,6 +650,11 @@ class HdfsParquetScanner : public HdfsScanner { /// no values that pass the relevant conjuncts, then the row group can be skipped. Status EvalDictionaryFilters(const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT; + + /// Free local allocations made when evaluating conjuncts over each row. Does not free + /// local allocations made when evaluated conjuncts for row groups, pages, etc. Those + /// should be freed separately after they are evaluated. + void FreeLocalAllocationsForConjuncts(); }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 557d346..2dc8d7b 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -459,12 +459,7 @@ void HdfsScanNode::ScannerThread() { exit: runtime_state_->resource_pool()->ReleaseThreadToken(false); - if (filter_status.ok()) { - for (auto& ctx: filter_ctxs) { - ctx.expr_eval->FreeLocalAllocations(); - ctx.expr_eval->Close(runtime_state_); - } - } + for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_); filter_mem_pool.FreeAll(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9ce691af/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test index 06414ee..01e1055 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test @@ -301,3 +301,21 @@ from alltypes a join [SHUFFLE] alltypessmall c ---- RESULTS 108 ==== + + +---- QUERY +################################################### +# Test case 14: filter with expression that uses local allocations. +# IMPALA-5885: the parquet scanner should free local allocations from upper()/lower(). +# mem_limit is calibrated so that the query fails if allocations are not freed. +################################################### + +SET RUNTIME_FILTER_WAIT_TIME_MS=100000; +SET RUNTIME_FILTER_MODE=GLOBAL; +SET MEM_LIMIT=200MB; +select straight_join count(*) +from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 + on lower(upper(lower(upper(lower(l1.l_comment))))) = concat(l2.l_comment, 'foo') +---- RESULTS +0 +====
