This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-3.4.2 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 82103101826309138d22864d04137da2df15f0c3 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Thu Sep 24 15:28:46 2020 +0200 IMPALA-9952: Fix page index filtering for empty pages As IMPALA-4371 and IMPALA-10186 points out, Impala might write empty data pages. It usually does that when it has to write a bigger page than the current page size. If we really need to write empty data pages is a different question, but we need to handle them correctly as there are already such files out there. The corresponding Parquet offset index entries to empty data pages are invalid PageLocation objects with 'compressed_page_size' = 0. Before this commit Impala didn't ignore the empty page locations, but generated a warning. Since invalid page index doesn't fail a scan by default, Impala continued scanning the file with semi-initialized page filtering. This resulted in 'Top level rows aren't in sync' error, or a crash in DEBUG builds. With this commit Impala ignores empty data pages and still able to filter the rest of the pages. Also, if the page index is corrupt for some other reason, Impala correctly resets the page filtering logic and falls back to regular scanning. Testing: * Added unit test for empty data pages * Added e2e test for empty data pages * Added e2e test for invalid page index Change-Id: I4db493fc7c383ed5ef492da29c9b15eeb3d17bb0 Reviewed-on: http://gerrit.cloudera.org:8080/16503 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/parquet/hdfs-parquet-scanner.cc | 38 ++++++---- be/src/exec/parquet/hdfs-parquet-scanner.h | 17 +++-- be/src/exec/parquet/parquet-common-test.cc | 10 ++- be/src/exec/parquet/parquet-common.cc | 46 +++++++++--- testdata/data/README | 17 +++++ testdata/data/alltypes_empty_pages.parquet | Bin 0 -> 22929 bytes testdata/data/alltypes_invalid_pages.parquet | Bin 0 -> 25288 bytes .../queries/QueryTest/parquet-page-index.test | 83 +++++++++++++++++++++ tests/query_test/test_parquet_stats.py | 24 +++--- 9 files changed, 195 insertions(+), 40 deletions(-) diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index 7b3f4d197..dbbe6c583 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -622,12 +622,11 @@ Status HdfsParquetScanner::NextRowGroup() { // Evaluate page index. if (!min_max_conjunct_evals_.empty() && state_->query_options().parquet_read_page_index) { - bool filter_pages; - Status page_index_status = ProcessPageIndex(&filter_pages); + Status page_index_status = ProcessPageIndex(); if (!page_index_status.ok()) { RETURN_IF_ERROR(state_->LogOrReturnError(page_index_status.msg())); } - if (filter_pages && candidate_ranges_.empty()) { + if (filter_pages_ && candidate_ranges_.empty()) { // Page level statistics filtered the whole row group. It can happen when there // is a gap in the data between the pages and the user's predicate hit that gap. // E.g. column chunk 'A' has two pages with statistics {min: 0, max: 5}, @@ -704,24 +703,28 @@ bool HdfsParquetScanner::ReadStatFromIndex(const ColumnStatsReader& stats_reader return false; } -Status HdfsParquetScanner::ProcessPageIndex(bool* filter_pages) { - MonotonicStopWatch single_process_page_index_timer; - single_process_page_index_timer.Start(); +void HdfsParquetScanner::ResetPageFiltering() { + filter_pages_ = false; candidate_ranges_.clear(); - *filter_pages = false; for (auto& scalar_reader : scalar_readers_) scalar_reader->ResetPageFiltering(); +} + +Status HdfsParquetScanner::ProcessPageIndex() { + MonotonicStopWatch single_process_page_index_timer; + single_process_page_index_timer.Start(); + ResetPageFiltering(); RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_)); if (page_index_.IsEmpty()) return Status::OK(); // We can release the raw page index buffer when we exit this function. const auto scope_exit = MakeScopeExitTrigger([this](){page_index_.Release();}); - RETURN_IF_ERROR(EvaluatePageIndex(filter_pages)); - RETURN_IF_ERROR(ComputeCandidatePagesForColumns(filter_pages)); + RETURN_IF_ERROR(EvaluatePageIndex()); + RETURN_IF_ERROR(ComputeCandidatePagesForColumns()); single_process_page_index_timer.Stop(); process_page_index_stats_->UpdateCounter(single_process_page_index_timer.ElapsedTime()); return Status::OK(); } -Status HdfsParquetScanner::EvaluatePageIndex(bool* filter_pages) { +Status HdfsParquetScanner::EvaluatePageIndex() { parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; vector<RowRange> skip_ranges; @@ -792,13 +795,16 @@ Status HdfsParquetScanner::EvaluatePageIndex(bool* filter_pages) { } } if (!ComputeCandidateRanges(row_group.num_rows, &skip_ranges, &candidate_ranges_)) { - return Status(Substitute("Invalid offset index in Parquet file $0.", filename())); + ResetPageFiltering(); + return Status(Substitute( + "Invalid offset index in Parquet file $0. Page index filtering is disabled.", + filename())); } - *filter_pages = true; + filter_pages_ = true; return Status::OK(); } -Status HdfsParquetScanner::ComputeCandidatePagesForColumns(bool* filter_pages) { +Status HdfsParquetScanner::ComputeCandidatePagesForColumns() { if (candidate_ranges_.empty()) return Status::OK(); parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; @@ -806,8 +812,10 @@ Status HdfsParquetScanner::ComputeCandidatePagesForColumns(bool* filter_pages) { const auto& page_locations = scalar_reader->offset_index_.page_locations; if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows, &scalar_reader->candidate_data_pages_)) { - *filter_pages = false; - return Status(Substitute("Invalid offset index in Parquet file $0.", filename())); + ResetPageFiltering(); + return Status(Substitute( + "Invalid offset index in Parquet file $0. Page index filtering is disabled.", + filename())); } } for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h index 9ed3cd2c7..3d99afdae 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.h +++ b/be/src/exec/parquet/hdfs-parquet-scanner.h @@ -413,6 +413,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// pages in a column chunk. boost::scoped_ptr<MemPool> dictionary_pool_; + /// True, if we filter pages based on the Parquet page index. + bool filter_pages_ = false; + /// Contains the leftover ranges after evaluating the page index. /// If all rows were eliminated, then the row group is skipped immediately after /// evaluating the page index. @@ -515,18 +518,22 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// to be OK as well. Status NextRowGroup() WARN_UNUSED_RESULT; + /// Resets page index filtering state, i.e. clears 'candidate_ranges_' and resets + /// scalar readers' page filtering as well. + void ResetPageFiltering(); + /// High-level function for initializing page filtering for the scalar readers. - /// Sets 'filter_pages' to true if found any page to filter out. - Status ProcessPageIndex(bool* filter_pages); + /// Sets 'filter_pages_' to true if found any page to filter out. + Status ProcessPageIndex(); /// Evaluates 'min_max_conjunct_evals_' against the column index and determines the row /// ranges that might contain data we are looking for. - /// Sets 'filter_pages' to true if found any page to filter out. - Status EvaluatePageIndex(bool* filter_pages); + /// Sets 'filter_pages_' to true if found any page to filter out. + Status EvaluatePageIndex(); /// Based on 'candidate_ranges_' it determines the candidate pages for each /// scalar reader. - Status ComputeCandidatePagesForColumns(bool* filter_pages); + Status ComputeCandidatePagesForColumns(); /// Check that the scalar readers agree on the top-level row being scanned. Status CheckPageFiltering(); diff --git a/be/src/exec/parquet/parquet-common-test.cc b/be/src/exec/parquet/parquet-common-test.cc index b67603a3f..8a266ebc6 100644 --- a/be/src/exec/parquet/parquet-common-test.cc +++ b/be/src/exec/parquet/parquet-common-test.cc @@ -70,6 +70,10 @@ void ValidatePages(const vector<int64_t>& first_row_indexes, const RangeVec& ran for (int64_t first_row_index : first_row_indexes) { parquet::PageLocation page_loc; page_loc.first_row_index = first_row_index; + if (first_row_index != -1) { + // first_row_index == -1 means empty page. + page_loc.compressed_page_size = 42; + } page_locations.push_back(page_loc); } vector<int> candidate_pages; @@ -106,9 +110,13 @@ TEST(ParquetCommon, ComputeCandidatePages) { {{0LL, 9LL + INT_MAX}}, 100LL + INT_MAX, {0}); ValidatePages({0LL, 10LL + UINT_MAX, 20LL + UINT_MAX, 50LL + UINT_MAX, 70LL + UINT_MAX}, {{0LL, 9LL + UINT_MAX}}, 100LL + UINT_MAX, {0}); + // Empty pages are ignored + ValidatePages({0, -1}, {{0, 10}}, 15, {0}); + ValidatePages({-1, 0}, {{0, 10}}, 15, {1}); + ValidatePages({-1, 0, -1, -1, 10, -1}, {{0, 10}}, 15, {1, 4}); // Error cases: // Negative first row index. - ValidatePagesError({-1, 0, 10}, {{0, 10}}, 10, {0}); + ValidatePagesError({-2, 0, 10}, {{0, 10}}, 15, {0}); // First row index greater then number of rows. ValidatePagesError({5, 10, 15}, {{0, 10}}, 10, {0}); // First row indexes are not in order. diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc index 285ef0d63..c5db93591 100644 --- a/be/src/exec/parquet/parquet-common.cc +++ b/be/src/exec/parquet/parquet-common.cc @@ -118,17 +118,32 @@ bool ComputeCandidateRanges(const int64_t num_rows, vector<RowRange>* skip_range return true; } +inline bool IsValidPageLocation(const parquet::PageLocation& page_loc, + const int64_t num_rows) { + return page_loc.offset >= 0 && + page_loc.first_row_index >= 0 && + page_loc.first_row_index < num_rows; +} + static bool ValidatePageLocations(const vector<parquet::PageLocation>& page_locations, const int64_t num_rows) { + int last_valid_idx = -1; for (int i = 0; i < page_locations.size(); ++i) { auto& page_loc = page_locations[i]; - if (page_loc.first_row_index < 0 || page_loc.first_row_index >= num_rows) { - return false; + if (!IsValidPageLocation(page_loc, num_rows)) { + // Skip page locations for empty pages. + if (page_loc.compressed_page_size == 0) { + continue; + } else { + return false; + } } - if (i + 1 < page_locations.size()) { - auto& next_page_loc = page_locations[i+1]; - if (page_loc.first_row_index >= next_page_loc.first_row_index) return false; + if (last_valid_idx != -1) { + auto& last_valid_page = page_locations[last_valid_idx]; + // 'first_row_index' must have progressed in a non-empty page. + if (page_loc.first_row_index <= last_valid_page.first_row_index) return false; } + last_valid_idx = i; } return true; } @@ -147,11 +162,21 @@ bool ComputeCandidatePages( if (!ValidatePageLocations(page_locations, num_rows)) return false; int range_idx = 0; - for (int i = 0; i < page_locations.size(); ++i) { - auto& page_location = page_locations[i]; + int page_idx = 0; + while (page_idx < page_locations.size()) { + auto& page_location = page_locations[page_idx]; + if (page_location.compressed_page_size == 0) { + ++page_idx; + continue; + } + int next_page_idx = page_idx + 1; + while (next_page_idx < page_locations.size() && + page_locations[next_page_idx].compressed_page_size == 0) { + ++next_page_idx; + } int64_t page_start = page_location.first_row_index; - int64_t page_end = i != page_locations.size() - 1 ? - page_locations[i + 1].first_row_index - 1 : + int64_t page_end = next_page_idx < page_locations.size() ? + page_locations[next_page_idx].first_row_index - 1 : num_rows - 1; while (range_idx < candidate_ranges.size() && candidate_ranges[range_idx].last < page_start) { @@ -159,8 +184,9 @@ bool ComputeCandidatePages( } if (range_idx >= candidate_ranges.size()) break; if (RangesIntersect(candidate_ranges[range_idx], {page_start, page_end})) { - candidate_pages->push_back(i); + candidate_pages->push_back(page_idx); } + page_idx = next_page_idx; } // When there are candidate ranges, then we should have at least one candidate page. if (!candidate_ranges.empty() && candidate_pages->empty()) return false; diff --git a/testdata/data/README b/testdata/data/README index 7dca8b0db..ae139c0d8 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -494,3 +494,20 @@ the same indexing hash but a newer version depends on the time of writing. `ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-38-282_20200112194529.parquet` If the impala table was refreshed after this file was written, impala will only query on the file with latest version. + +alltypes_empty_pages.parquet +Parquet file that contians empty data pages. Needed to test IMPALA-9952. +Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I modified +HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page: +int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true; +Also modified HdfsParquetTableWriter::NewPage() to randomly insert empty pages: +if (r ... ) pages_.push_back(DataPage()); + +alltypes_invalid_pages.parquet +Parquet file that contains invalid data pages Needed to test IMPALA-9952. +Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I modified +HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page: +int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true; +Also modified HdfsParquetTableWriter::BaseColumnWriter::Flush to randomly invalidate +the offset index: +if (r ... ) location.offset = -1; diff --git a/testdata/data/alltypes_empty_pages.parquet b/testdata/data/alltypes_empty_pages.parquet new file mode 100644 index 000000000..289ac1b4d Binary files /dev/null and b/testdata/data/alltypes_empty_pages.parquet differ diff --git a/testdata/data/alltypes_invalid_pages.parquet b/testdata/data/alltypes_invalid_pages.parquet new file mode 100644 index 000000000..8fa8e649d Binary files /dev/null and b/testdata/data/alltypes_invalid_pages.parquet differ diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test index 47c4eaa1e..7c44e214c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test @@ -217,3 +217,86 @@ DECIMAL, DECIMAL ---- RUNTIME_PROFILE aggregation(SUM, NumStatsFilteredRowGroups): 1 ==== +---- QUERY +# Query table with empty data pages +select * from alltypes_empty_pages where id = 109 +---- RESULTS +109,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/11/09','9',2009-01-11 01:49:04.860000000,2009,1 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 186 +==== +---- QUERY +# Query table with empty data pages +select * from alltypes_empty_pages where id = 51 +---- RESULTS +51,false,1,1,1,10,1.100000023841858,10.1,'01/06/09','1',2009-01-06 00:51:02.250000000,2009,1 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 186 +==== +---- QUERY +# Query table with empty data pages +select * from alltypes_empty_pages where id = 491 +---- RESULTS +491,false,1,1,1,10,1.100000023841858,10.1,'02/19/09','1',2009-02-19 03:01:08.100000000,2009,2 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 203 +==== +---- QUERY +# Query table with invalid offset index. +set abort_on_error=1; +select * from alltypes_invalid_pages where id = 109 +---- CATCH +Invalid offset index in Parquet file +==== +---- QUERY +# Only query columns with valid offset index. +set abort_on_error=1; +select id, bool_col, tinyint_col from alltypes_invalid_pages where id > 51 and id < 55 +---- RESULTS +52,true,2 +53,false,3 +54,true,4 +---- TYPES +INT, BOOLEAN, TINYINT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 60 +==== +---- QUERY +# Query single column with invalid offset index. +set abort_on_error=1; +select sum(smallint_col) from alltypes_invalid_pages where smallint_col = 9; +---- CATCH +Invalid offset index in Parquet file +==== +---- QUERY +# Query table with invalid offset index. +set abort_on_error=0; +select * from alltypes_invalid_pages where id = 109 +---- ERRORS +Invalid offset index in Parquet file __HDFS_FILENAME__ Page index filtering is disabled. +---- RESULTS +109,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/11/09','9',2009-01-11 01:49:04.860000000,2009,1 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 0 +==== +---- QUERY +# Query single column with invalid offset index. +set abort_on_error=0; +select sum(smallint_col) from alltypes_invalid_pages where smallint_col = 9; +---- ERRORS +Invalid offset index in Parquet file __HDFS_FILENAME__ Page index filtering is disabled. +---- RESULTS +450 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +aggregation(SUM, NumStatsFilteredPages): 0 +==== diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py index a64cb88a7..5087fa204 100644 --- a/tests/query_test/test_parquet_stats.py +++ b/tests/query_test/test_parquet_stats.py @@ -18,6 +18,7 @@ import os import pytest import shlex +from copy import deepcopy from subprocess import check_call from tests.common.file_utils import ( @@ -78,22 +79,27 @@ class TestParquetStats(ImpalaTestSuite): """Test that using the Parquet page index works well. The various test files contain queries that exercise the page selection and value-skipping logic against columns with different types and encodings.""" + new_vector = deepcopy(vector) + del new_vector.get_value('exec_option')['abort_on_error'] create_table_from_parquet(self.client, unique_database, 'decimals_1_10') create_table_from_parquet(self.client, unique_database, 'nested_decimals') create_table_from_parquet(self.client, unique_database, 'double_nested_decimals') create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages') create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages_plain') + create_table_from_parquet(self.client, unique_database, 'alltypes_empty_pages') + create_table_from_parquet(self.client, unique_database, 'alltypes_invalid_pages') - for batch_size in [0, 1]: - vector.get_value('exec_option')['batch_size'] = batch_size - self.run_test_case('QueryTest/parquet-page-index', vector, unique_database) - self.run_test_case('QueryTest/nested-types-parquet-page-index', vector, + for batch_size in [1]: + new_vector.get_value('exec_option')['batch_size'] = batch_size + self.run_test_case('QueryTest/parquet-page-index', new_vector, unique_database) + self.run_test_case('QueryTest/nested-types-parquet-page-index', new_vector, unique_database) - self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages', vector, - unique_database) - self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain', vector, + self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages', new_vector, unique_database) + self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain', + new_vector, unique_database) for batch_size in [0, 32]: - vector.get_value('exec_option')['batch_size'] = batch_size - self.run_test_case('QueryTest/parquet-page-index-large', vector, unique_database) + new_vector.get_value('exec_option')['batch_size'] = batch_size + self.run_test_case('QueryTest/parquet-page-index-large', new_vector, + unique_database)
