Repository: incubator-impala Updated Branches: refs/heads/master 3a630a5d6 -> 90a6b3206
IMPALA-3964: Fix crash when a count(*) is performed on a nested collection. The Bug: Prior to this patch, a DCHECK was used to verify that the underlying memory pool for the scratch batch was empty in a count based scenario. For IMPALA-3964 (where a count(*) is performed on a nested collection), if a Parquet column chunk is compressed, upon reading each new data page it would be decompressed and eventually placed in to the underlying scratch batch memory pool causing the aforementioned DCHECK to fail. This was not picked up in the test suite as the TPCH nested Parquet data is not compressed. The Fix: Removed the erroneous DCHECK. Added logic to determine if any memory in the scratch batch needs to be freed (due to the transfer that occurs from the decompressed data pool), if so, it will be done. Augmented the load_nested.py script to snappy compress each of the tables within the 'tpch_nested_parquet' database. This is consistent with how the flat TPCH Parquet data set is stored. Regarding test coverage, there are already a number of tests that will perform nested collection counts against the tables in the 'tpch_nested_parquet' database. For uncompressed nested Parquet, the 'test_nested_types.py' test suite leverages the 'ComplexTypesTbl' table to provide good coverage. Change-Id: Id0955c85d18dfba4bd29a35ec95d0355da050607 Reviewed-on: http://gerrit.cloudera.org:8080/3940 Reviewed-by: Michael Ho <[email protected]> Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/90a6b320 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/90a6b320 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/90a6b320 Branch: refs/heads/master Commit: 90a6b3206e0b522650bfe21c5754c15d009f708c Parents: 3a630a5 Author: Christopher Channing <[email protected]> Authored: Wed Aug 10 13:45:26 2016 +0100 Committer: Internal Jenkins <[email protected]> Committed: Tue Aug 16 11:54:04 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 6 ++++-- testdata/bin/load_nested.py | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/90a6b320/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 1431689..613761b 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -578,14 +578,16 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { if (tuple_size == 0) { // We are materializing a collection with empty tuples. Add a NULL tuple to the // output batch per remaining scratch tuple and return. No need to evaluate - // filters/conjuncts or transfer memory ownership. + // filters/conjuncts. DCHECK(!has_filters); DCHECK(!has_conjuncts); - DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0); int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(), scratch_batch_->num_tuples - scratch_batch_->tuple_idx); memset(output_row, 0, num_tuples * sizeof(Tuple*)); scratch_batch_->tuple_idx += num_tuples; + // If compressed Parquet was read then there may be data left to free from the + // scratch batch (originating from the decompressed data pool). + if (scratch_batch_->AtEnd()) scratch_batch_->mem_pool()->FreeAll(); return num_tuples; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/90a6b320/testdata/bin/load_nested.py ---------------------------------------------------------------------- diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py index 380f887..f68f8c4 100755 --- a/testdata/bin/load_nested.py +++ b/testdata/bin/load_nested.py @@ -257,6 +257,7 @@ def load(): CREATE TABLE customer STORED AS PARQUET + TBLPROPERTIES('parquet.compression'='SNAPPY') AS SELECT * FROM tmp_customer; DROP TABLE tmp_orders_string; @@ -265,6 +266,7 @@ def load(): CREATE TABLE region STORED AS PARQUET + TBLPROPERTIES('parquet.compression'='SNAPPY') AS SELECT * FROM tmp_region; DROP TABLE tmp_region_string; @@ -272,6 +274,7 @@ def load(): CREATE TABLE supplier STORED AS PARQUET + TBLPROPERTIES('parquet.compression'='SNAPPY') AS SELECT * FROM tmp_supplier; DROP TABLE tmp_supplier;
