This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d54b75ccf14a42471214926b2ba7e217cf7e3f1f Author: Xuebin Su <[email protected]> AuthorDate: Thu Dec 11 17:18:59 2025 +0800 IMPALA-14619: Reset levels_readahead_ for late materialization Previously, `BaseScalarColumnReader::levels_readahead_` was not reset when the reader did not do page filtering. If a query selected the last row containing a collection value in a row group, `levels_readahead_` would be set and would not be reset when advancing to the next row group without page filtering. As a result, trying to skip collection values at the start of the next row group would cause a check failure. This patch fixes the failure by resetting `levels_readahead_` in `BaseScalarColumnReader::Reset()`, which is always called when advancing to the next row group. `levels_readahead_` is also moved out of the "Members used for page filtering" section as the variable is also used in late materialization. Testing: - Added an E2E test for the fix. Change-Id: Idac138ffe4e1a9260f9080a97a1090b467781d00 Reviewed-on: http://gerrit.cloudera.org:8080/23779 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/parquet/parquet-column-readers.cc | 1 + be/src/exec/parquet/parquet-column-readers.h | 26 +++++++++++----------- .../parquet-late-materialization-unique-db.test | 13 +++++++++++ .../test_parquet_late_materialization.py | 2 ++ 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index 6f31df188..b131b2f2d 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -1067,6 +1067,7 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc, pos_current_value_ = ParquetLevel::INVALID_POS; row_group_first_row_ = row_group_first_row; current_row_ = -1; + levels_readahead_ = false; vector<ScanRange::SubRange> sub_ranges; CreateSubRanges(&sub_ranges); diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h index 46c929280..73eb45a0d 100644 --- a/be/src/exec/parquet/parquet-column-readers.h +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -452,6 +452,19 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// processed the first (zeroeth) row. int64_t current_row_ = -1; + /// This flag is needed for the proper tracking of the last processed row. + /// The batched and non-batched interfaces behave differently. E.g. when using the + /// batched interface you don't need to invoke NextLevels() in advance, while you need + /// to do that for the non-batched interface. In fact, the batched interface doesn't + /// call NextLevels() at all. It directly reads the levels then the corresponding value + /// in a loop. On the other hand, the non-batched interface (ReadValue()) expects that + /// the levels for the next value are already read via NextLevels(). And after reading + /// the value it calls NextLevels() to read the levels of the next value. Hence, the + /// levels are always read ahead in this case. + /// Returns true, if we read ahead def and rep levels. In this case 'current_row_' + /// points to the row we'll process next, not to the row we already processed. + bool levels_readahead_ = false; + ///////////////////////////////////////// /// BEGIN: Members used for page filtering /// They are not set when we don't filter out pages at all. @@ -475,19 +488,6 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// rows and increment this field. int current_row_range_ = 0; - /// This flag is needed for the proper tracking of the last processed row. - /// The batched and non-batched interfaces behave differently. E.g. when using the - /// batched interface you don't need to invoke NextLevels() in advance, while you need - /// to do that for the non-batched interface. In fact, the batched interface doesn't - /// call NextLevels() at all. It directly reads the levels then the corresponding value - /// in a loop. On the other hand, the non-batched interface (ReadValue()) expects that - /// the levels for the next value are already read via NextLevels(). And after reading - /// the value it calls NextLevels() to read the levels of the next value. Hence, the - /// levels are always read ahead in this case. - /// Returns true, if we read ahead def and rep levels. In this case 'current_row_' - /// points to the row we'll process next, not to the row we already processed. - bool levels_readahead_ = false; - /// END: Members used for page filtering ///////////////////////////////////////// diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test index 42385efdd..0f459277e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test @@ -40,3 +40,16 @@ select count(unnest(arr)) from nested_decimals n where d_38 = 1; aggregation(SUM, NumPagesSkippedByLateMaterialization): 0 aggregation(SUM, NumTopLevelValuesSkipped): 17 ==== +---- QUERY +# Selects the last row in a row group and then skips the first row in the next +# row group. +select count(o_orderkey) +from customer_nested_multiblock_multipage t +left join t.c_orders +where cast(c_custkey as string) like '100'; +---- RESULTS +20 +---- RUNTIME_PROFILE +aggregation(SUM, NumPagesSkippedByLateMaterialization): 0 +aggregation(SUM, NumTopLevelValuesSkipped): 299 +==== diff --git a/tests/query_test/test_parquet_late_materialization.py b/tests/query_test/test_parquet_late_materialization.py index cac2473e4..d3ae848d4 100644 --- a/tests/query_test/test_parquet_late_materialization.py +++ b/tests/query_test/test_parquet_late_materialization.py @@ -37,5 +37,7 @@ class TestParquetLateMaterialization(ImpalaTestSuite): def test_parquet_late_materialization_unique_db(self, vector, unique_database): create_table_from_parquet(self.client, unique_database, 'decimals_1_10') create_table_from_parquet(self.client, unique_database, 'nested_decimals') + create_table_from_parquet( + self.client, unique_database, 'customer_nested_multiblock_multipage') self.run_test_case('QueryTest/parquet-late-materialization-unique-db', vector, unique_database)
