IMPALA-7704: Revert "IMPALA-7644: Hide Parquet page index writing with feature flag"
The fix for IMPALA-7644 introduced ASAN issues detailed in IMPALA-7704. Reverting for now. This reverts commit 843683ed6c2ef41c7c25e9fa4af68801dbdd1a78. Change-Id: Icf0a64d6ec747275e3ecd6e801e054f81095591a Reviewed-on: http://gerrit.cloudera.org:8080/11671 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Michael Ho <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/af76186e Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/af76186e Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/af76186e Branch: refs/heads/master Commit: af76186e013607cb64baf151c039e4f6aaab4350 Parents: 97f0282 Author: Joe McDonnell <[email protected]> Authored: Fri Oct 12 11:15:07 2018 -0700 Committer: Joe McDonnell <[email protected]> Committed: Sat Oct 13 03:26:03 2018 +0000 ---------------------------------------------------------------------- be/src/common/global-flags.cc | 6 - be/src/exec/hdfs-parquet-table-writer.cc | 100 ++--- .../queries/QueryTest/stats-extrapolation.test | 14 +- tests/custom_cluster/test_parquet_page_index.py | 371 ------------------ tests/query_test/test_parquet_page_index.py | 372 +++++++++++++++++++ 5 files changed, 417 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/common/global-flags.cc ---------------------------------------------------------------------- diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index ac76b53..2ea1ca5 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -239,12 +239,6 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1, "The fraction of tables to invalidate when CatalogdTableInvalidator considers the " "old GC generation to be almost full."); -DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true, Impala " - "will write the Parquet page index. It is not advised to use it in a production " - "environment, only for testing and development. This flag is meant to be temporary. " - "We plan to remove this flag once Impala is able to read the page index and has " - "better test coverage around it."); - // ++========================++ // || Startup flag graveyard || // ++========================++ http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 13137e5..8aa4f7a 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -83,8 +83,6 @@ using namespace apache::thrift; // the columns and run that function over row batches. // TODO: we need to pass in the compression from the FE/metadata -DECLARE_bool(enable_parquet_page_index_writing_debug_only); - namespace impala { // Base class for column writers. This contains most of the logic except for @@ -207,58 +205,6 @@ class HdfsParquetTableWriter::BaseColumnWriter { protected: friend class HdfsParquetTableWriter; - Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) { - if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) { - return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_, - "Failed to allocate memory for Parquet page index.", new_memory_allocation); - } - page_index_memory_consumption_ += new_memory_allocation; - return Status::OK(); - } - - Status ReserveOffsetIndex(int64_t capacity) { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - RETURN_IF_ERROR( - AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation))); - offset_index_.page_locations.reserve(capacity); - return Status::OK(); - } - - void AddLocationToOffsetIndex(const parquet::PageLocation& location) { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return; - offset_index_.page_locations.push_back(location); - } - - Status AddPageStatsToColumnIndex() { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - parquet::Statistics page_stats; - page_stats_base_->EncodeToThrift(&page_stats); - // If pages_stats contains min_value and max_value, then append them to min_values_ - // and max_values_ and also mark the page as not null. In case min and max values are - // not set, push empty strings to maintain the consistency of the index and mark the - // page as null. Always push the null_count. - string min_val; - string max_val; - if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) { - Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH, - &min_val); - Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH, - &max_val); - if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false; - column_index_.null_pages.push_back(false); - } else { - DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value); - column_index_.null_pages.push_back(true); - DCHECK_EQ(page_stats.null_count, num_values_); - } - RETURN_IF_ERROR( - AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity())); - column_index_.min_values.emplace_back(std::move(min_val)); - column_index_.max_values.emplace_back(std::move(max_val)); - column_index_.null_counts.push_back(page_stats.null_count); - return Status::OK(); - } - // Encodes value into the current page output buffer and updates the column statistics // aggregates. Returns true if the value was appended successfully to the current page. // Returns false if the value was not appended to the current page and the caller can @@ -699,10 +645,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, *first_data_page = *file_pos; int64_t current_row_group_index = 0; - RETURN_IF_ERROR(ReserveOffsetIndex(num_data_pages_)); + offset_index_.page_locations.resize(num_data_pages_); // Write data pages - for (const DataPage& page : pages_) { + for (int i = 0; i < num_data_pages_; ++i) { + DataPage& page = pages_[i]; parquet::PageLocation location; if (page.header.data_page_header.num_values == 0) { @@ -710,7 +657,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, location.offset = -1; location.compressed_page_size = 0; location.first_row_index = -1; - AddLocationToOffsetIndex(location); + offset_index_.page_locations[i] = location; continue; } @@ -730,7 +677,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size // also includes the size of the page header. location.compressed_page_size = page.header.compressed_page_size + len; - AddLocationToOffsetIndex(location); + offset_index_.page_locations[i] = location; // Write the page data RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size)); @@ -807,7 +754,37 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { } DCHECK(page_stats_base_ != nullptr); - RETURN_IF_ERROR(AddPageStatsToColumnIndex()); + parquet::Statistics page_stats; + page_stats_base_->EncodeToThrift(&page_stats); + { + // If pages_stats contains min_value and max_value, then append them to min_values_ + // and max_values_ and also mark the page as not null. In case min and max values are + // not set, push empty strings to maintain the consistency of the index and mark the + // page as null. Always push the null_count. + string min_val; + string max_val; + if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) { + Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH, + &min_val); + Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH, + &max_val); + if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false; + column_index_.null_pages.push_back(false); + } else { + DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value); + column_index_.null_pages.push_back(true); + DCHECK_EQ(page_stats.null_count, num_values_); + } + int64_t new_memory_allocation = min_val.capacity() + max_val.capacity(); + if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) { + return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_, + "Failed to allocate memory for Parquet page index.", new_memory_allocation); + } + page_index_memory_consumption_ += new_memory_allocation; + column_index_.min_values.emplace_back(std::move(min_val)); + column_index_.max_values.emplace_back(std::move(max_val)); + column_index_.null_counts.push_back(page_stats.null_count); + } // Update row group statistics from page statistics. DCHECK(row_group_stats_base_ != nullptr); @@ -1160,7 +1137,6 @@ Status HdfsParquetTableWriter::Finalize() { RETURN_IF_ERROR(FlushCurrentRowGroup()); RETURN_IF_ERROR(WritePageIndex()); - for (auto& column : columns_) column->Reset(); RETURN_IF_ERROR(WriteFileFooter()); stats_.__set_parquet_stats(parquet_insert_stats_); COUNTER_ADD(parent_->rows_inserted_counter(), row_count_); @@ -1273,8 +1249,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { } Status HdfsParquetTableWriter::WritePageIndex() { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - // Currently Impala only write Parquet files with a single row group. The current // page index logic depends on this behavior as it only keeps one row group's // statistics in memory. @@ -1310,6 +1284,8 @@ Status HdfsParquetTableWriter::WritePageIndex() { row_group->columns[i].__set_offset_index_length(len); file_pos_ += len; } + // Reset column writers. + for (auto& column : columns_) column->Reset(); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test index 3b25427..8e95168 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test @@ -33,17 +33,17 @@ show table stats alltypes YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION ---- RESULTS '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1' -'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2' -'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3' +'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2' +'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3' '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4' -'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5' +'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5' '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6' -'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7' -'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8' +'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7' +'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8' '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9' -'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10' +'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10' '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11' -'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12' +'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12' 'Total','',3650,3650,12,regex:.*B,'0B','','','','' ---- TYPES STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/custom_cluster/test_parquet_page_index.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_parquet_page_index.py b/tests/custom_cluster/test_parquet_page_index.py deleted file mode 100644 index 0d2a750..0000000 --- a/tests/custom_cluster/test_parquet_page_index.py +++ /dev/null @@ -1,371 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Targeted Impala insert tests - -import os - -from collections import namedtuple -from subprocess import check_call -from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType - -from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfLocal -from tests.util.filesystem_utils import get_fs_path -from tests.util.get_parquet_metadata import ( - decode_stats_value, - get_parquet_metadata, - read_serialized_object -) - -PAGE_INDEX_MAX_STRING_LENGTH = 64 - - [email protected]_file_size -class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite): - """Since PARQUET-922 page statistics can be written before the footer. - The tests in this class checks if Impala writes the page indices correctly. - It is temporarily a custom cluster test suite because we need to set the - enable_parquet_page_index_writing command-line flag for the Impala daemon - in order to make it write the page index. - TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by - default, this test suite should be moved back to query tests. - """ - @classmethod - def get_workload(cls): - return 'functional-query' - - @classmethod - def add_test_dimensions(cls): - super(CustomClusterTestSuite, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint( - lambda v: v.get_value('table_format').file_format == 'parquet') - - def _get_row_group_from_file(self, parquet_file): - """Returns namedtuples that contain the schema, stats, offset_index, column_index, - and page_headers for each column in the first row group in file 'parquet_file'. Fails - if the file contains multiple row groups. - """ - ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index', - 'column_index', 'page_headers']) - - file_meta_data = get_parquet_metadata(parquet_file) - assert len(file_meta_data.row_groups) == 1 - # We only support flat schemas, the additional element is the root element. - schemas = file_meta_data.schema[1:] - row_group = file_meta_data.row_groups[0] - assert len(schemas) == len(row_group.columns) - row_group_index = [] - with open(parquet_file) as file_handle: - for column, schema in zip(row_group.columns, schemas): - column_index_offset = column.column_index_offset - column_index_length = column.column_index_length - column_index = None - if column_index_offset and column_index_length: - column_index = read_serialized_object(ColumnIndex, file_handle, - column_index_offset, column_index_length) - column_meta_data = column.meta_data - stats = None - if column_meta_data: - stats = column_meta_data.statistics - - offset_index_offset = column.offset_index_offset - offset_index_length = column.offset_index_length - offset_index = None - page_headers = [] - if offset_index_offset and offset_index_length: - offset_index = read_serialized_object(OffsetIndex, file_handle, - offset_index_offset, offset_index_length) - for page_loc in offset_index.page_locations: - page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset, - page_loc.compressed_page_size) - page_headers.append(page_header) - - column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers) - row_group_index.append(column_info) - return row_group_index - - def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir): - """Returns a list of column infos (containing the schema, stats, offset_index, - column_index, and page_headers) for the first row group in all parquet files in - 'hdfs_path'. - """ - row_group_indexes = [] - check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath]) - for root, subdirs, files in os.walk(tmpdir.strpath): - for f in files: - parquet_file = os.path.join(root, str(f)) - row_group_indexes.append(self._get_row_group_from_file(parquet_file)) - return row_group_indexes - - def _validate_page_locations(self, page_locations): - """Validate that the page locations are in order.""" - for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]): - assert previous_loc.offset < current_loc.offset - assert previous_loc.first_row_index < current_loc.first_row_index - - def _validate_null_stats(self, index_size, column_info): - """Validates the statistics stored in null_pages and null_counts.""" - column_index = column_info.column_index - column_stats = column_info.stats - assert column_index.null_pages is not None - assert len(column_index.null_pages) == index_size - assert column_index.null_counts is not None - assert len(column_index.null_counts) == index_size - - for page_is_null, null_count, page_header in zip(column_index.null_pages, - column_index.null_counts, column_info.page_headers): - assert page_header.type == PageType.DATA_PAGE - num_values = page_header.data_page_header.num_values - assert not page_is_null or null_count == num_values - - if column_stats: - assert column_stats.null_count == sum(column_index.null_counts) - - def _validate_min_max_values(self, index_size, column_info): - """Validate min/max values of the pages in a column chunk.""" - column_index = column_info.column_index - min_values = column_info.column_index.min_values - assert len(min_values) == index_size - max_values = column_info.column_index.max_values - assert len(max_values) == index_size - - if not column_info.stats: - return - - column_min_value_str = column_info.stats.min_value - column_max_value_str = column_info.stats.max_value - if column_min_value_str is None or column_max_value_str is None: - # If either is None, then both need to be None. - assert column_min_value_str is None and column_max_value_str is None - # No min and max value, all pages need to be null - for idx, null_page in enumerate(column_index.null_pages): - assert null_page, "Page {} of column {} is not null, \ - but doesn't have min and max values!".format(idx, column_index.schema.name) - # Everything is None, no further checks needed. - return - - column_min_value = decode_stats_value(column_info.schema, column_min_value_str) - for null_page, page_min_str in zip(column_index.null_pages, min_values): - if not null_page: - page_min_value = decode_stats_value(column_info.schema, page_min_str) - # If type is str, page_min_value might have been truncated. - if isinstance(page_min_value, basestring): - assert page_min_value >= column_min_value[:len(page_min_value)] - else: - assert page_min_value >= column_min_value - - column_max_value = decode_stats_value(column_info.schema, column_max_value_str) - for null_page, page_max_str in zip(column_index.null_pages, max_values): - if not null_page: - page_max_value = decode_stats_value(column_info.schema, page_max_str) - # If type is str, page_max_value might have been truncated and incremented. - if (isinstance(page_max_value, basestring) and - len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH): - max_val_prefix = page_max_value.rstrip('\0') - assert max_val_prefix[:-1] <= column_max_value - else: - assert page_max_value <= column_max_value - - def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values): - """Check if the ordering of the values reflects the value of 'ordering'.""" - - def is_sorted(l, reverse=False): - if not reverse: - return all(a <= b for a, b in zip(l, l[1:])) - else: - return all(a >= b for a, b in zip(l, l[1:])) - - # Filter out null pages and decode the actual min/max values. - actual_min_values = [decode_stats_value(schema, min_val) - for min_val, is_null in zip(min_values, null_pages) - if not is_null] - actual_max_values = [decode_stats_value(schema, max_val) - for max_val, is_null in zip(max_values, null_pages) - if not is_null] - - # For ASCENDING and DESCENDING, both min and max values need to be sorted. - if ordering == BoundaryOrder.ASCENDING: - assert is_sorted(actual_min_values) - assert is_sorted(actual_max_values) - elif ordering == BoundaryOrder.DESCENDING: - assert is_sorted(actual_min_values, reverse=True) - assert is_sorted(actual_max_values, reverse=True) - else: - assert ordering == BoundaryOrder.UNORDERED - # For UNORDERED, min and max values cannot be both sorted. - assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values) - assert (not is_sorted(actual_min_values, reverse=True) or - not is_sorted(actual_max_values, reverse=True)) - - def _validate_boundary_order(self, column_info): - """Validate that min/max values are really in the order specified by - boundary order. - """ - column_index = column_info.column_index - self._validate_ordering(column_index.boundary_order, column_info.schema, - column_index.null_pages, column_index.min_values, column_index.max_values) - - def _validate_parquet_page_index(self, hdfs_path, tmpdir): - """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup - index in that file is in the valid format. - """ - row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir) - for columns in row_group_indexes: - for column_info in columns: - try: - index_size = len(column_info.offset_index.page_locations) - assert index_size > 0 - self._validate_page_locations(column_info.offset_index.page_locations) - # IMPALA-7304: Impala doesn't write column index for floating-point columns - # until PARQUET-1222 is resolved. - if column_info.schema.type in [4, 5]: - assert column_info.column_index is None - continue - self._validate_null_stats(index_size, column_info) - self._validate_min_max_values(index_size, column_info) - self._validate_boundary_order(column_info) - except AssertionError as e: - e.args += ("Validation failed on column {}.".format(column_info.schema.name),) - raise - - def _ctas_table_and_verify_index(self, vector, unique_database, source_table, - tmpdir, sorting_column=None): - """Copies 'source_table' into a parquet table and makes sure that the index - in the resulting parquet file is valid. - """ - table_name = "test_hdfs_parquet_table_writer" - qualified_table_name = "{0}.{1}".format(unique_database, table_name) - hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, - table_name)) - # Setting num_nodes = 1 ensures that the query is executed on the coordinator, - # resulting in a single parquet file being written. - vector.get_value('exec_option')['num_nodes'] = 1 - self.execute_query("drop table if exists {0}".format(qualified_table_name)) - if sorting_column is None: - query = ("create table {0} stored as parquet as select * from {1}").format( - qualified_table_name, source_table) - else: - query = ("create table {0} sort by({1}) stored as parquet as select * from {2}" - ).format(qualified_table_name, sorting_column, source_table) - self.execute_query(query, vector.get_value('exec_option')) - self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table)) - - def _create_string_table_with_values(self, vector, unique_database, table_name, - values_sql): - """Creates a parquet table that has a single string column, then invokes an insert - statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')". - It returns the HDFS path for the table. - """ - qualified_table_name = "{0}.{1}".format(unique_database, table_name) - self.execute_query("drop table if exists {0}".format(qualified_table_name)) - vector.get_value('exec_option')['num_nodes'] = 1 - query = ("create table {0} (str string) stored as parquet").format( - qualified_table_name) - self.execute_query(query, vector.get_value('exec_option')) - self.execute_query("insert into {0} values {1}".format(qualified_table_name, - values_sql), vector.get_value('exec_option')) - return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, - table_name)) - - @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only") - def test_ctas_tables(self, vector, unique_database, tmpdir): - """Test different Parquet files created via CTAS statements.""" - - # Test that writing a parquet file populates the rowgroup indexes with the correct - # values. - self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes", - tmpdir) - - # Test that writing a parquet file populates the rowgroup indexes with the correct - # values, using decimal types. - self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl", - tmpdir) - - # Test that writing a parquet file populates the rowgroup indexes with the correct - # values, using char types. - self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats", - tmpdir) - - # Test that we don't write min/max values in the index for null columns. - # Ensure null_count is set for columns with null values. - self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable", - tmpdir) - - # Test that when a ColumnChunk is written across multiple pages, the index is - # valid. - self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer", - tmpdir) - self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders", - tmpdir) - - # Test that when the schema has a sorting column, the index is valid. - self._ctas_table_and_verify_index(vector, unique_database, - "functional_parquet.zipcode_incomes", tmpdir, "id") - - # Test table with wide row. - self._ctas_table_and_verify_index(vector, unique_database, - "functional_parquet.widerow", tmpdir) - - # Test tables with wide rows and many columns. - self._ctas_table_and_verify_index(vector, unique_database, - "functional_parquet.widetable_250_cols", tmpdir) - self._ctas_table_and_verify_index(vector, unique_database, - "functional_parquet.widetable_500_cols", tmpdir) - self._ctas_table_and_verify_index(vector, unique_database, - "functional_parquet.widetable_1000_cols", tmpdir) - - @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only") - def test_max_string_values(self, vector, unique_database, tmpdir): - """Test string values that are all 0xFFs or end with 0xFFs.""" - - # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH. - short_tbl = "short_tbl" - short_hdfs_path = self._create_string_table_with_values(vector, unique_database, - short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1)) - self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl)) - - # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH. - fit_tbl = "fit_tbl" - fit_hdfs_path = self._create_string_table_with_values(vector, unique_database, - fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH)) - self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl)) - - # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we - # should not write page statistics. - too_long_tbl = "too_long_tbl" - too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database, - too_long_tbl, "(rpad('', {0}, chr(255)))".format( - PAGE_INDEX_MAX_STRING_LENGTH + 1)) - row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path, - tmpdir.join(too_long_tbl)) - column = row_group_indexes[0][0] - assert column.column_index is None - # We always write the offset index - assert column.offset_index is not None - - # Test string with value that starts with 'aaa' following with 0xFFs and its length is - # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'. - aaa_tbl = "aaa_tbl" - aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database, - aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1)) - row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path, - tmpdir.join(aaa_tbl)) - column = row_group_indexes[0][0] - assert len(column.column_index.max_values) == 1 - max_value = column.column_index.max_values[0] - assert max_value == 'aab' http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/query_test/test_parquet_page_index.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py new file mode 100644 index 0000000..6235819 --- /dev/null +++ b/tests/query_test/test_parquet_page_index.py @@ -0,0 +1,372 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Targeted Impala insert tests + +import os + +from collections import namedtuple +from subprocess import check_call +from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIfLocal +from tests.util.filesystem_utils import get_fs_path +from tests.util.get_parquet_metadata import ( + decode_stats_value, + get_parquet_metadata, + read_serialized_object +) + +PAGE_INDEX_MAX_STRING_LENGTH = 64 + + [email protected]_file_size +class TestHdfsParquetTableIndexWriter(ImpalaTestSuite): + """Since PARQUET-922 page statistics can be written before the footer. + The tests in this class checks if Impala writes the page indices correctly. + """ + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + def _get_row_group_from_file(self, parquet_file): + """Returns namedtuples that contain the schema, stats, offset_index, column_index, + and page_headers for each column in the first row group in file 'parquet_file'. Fails + if the file contains multiple row groups. + """ + ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index', + 'column_index', 'page_headers']) + + file_meta_data = get_parquet_metadata(parquet_file) + assert len(file_meta_data.row_groups) == 1 + # We only support flat schemas, the additional element is the root element. + schemas = file_meta_data.schema[1:] + row_group = file_meta_data.row_groups[0] + assert len(schemas) == len(row_group.columns) + row_group_index = [] + with open(parquet_file) as file_handle: + for column, schema in zip(row_group.columns, schemas): + column_index_offset = column.column_index_offset + column_index_length = column.column_index_length + column_index = None + if column_index_offset and column_index_length: + column_index = read_serialized_object(ColumnIndex, file_handle, + column_index_offset, column_index_length) + column_meta_data = column.meta_data + stats = None + if column_meta_data: + stats = column_meta_data.statistics + + offset_index_offset = column.offset_index_offset + offset_index_length = column.offset_index_length + offset_index = None + page_headers = [] + if offset_index_offset and offset_index_length: + offset_index = read_serialized_object(OffsetIndex, file_handle, + offset_index_offset, offset_index_length) + for page_loc in offset_index.page_locations: + page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset, + page_loc.compressed_page_size) + page_headers.append(page_header) + + column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers) + row_group_index.append(column_info) + return row_group_index + + def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir): + """Returns a list of column infos (containing the schema, stats, offset_index, + column_index, and page_headers) for the first row group in all parquet files in + 'hdfs_path'. + """ + row_group_indexes = [] + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath]) + for root, subdirs, files in os.walk(tmpdir.strpath): + for f in files: + parquet_file = os.path.join(root, str(f)) + row_group_indexes.append(self._get_row_group_from_file(parquet_file)) + return row_group_indexes + + def _validate_page_locations(self, page_locations): + """Validate that the page locations are in order.""" + for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]): + assert previous_loc.offset < current_loc.offset + assert previous_loc.first_row_index < current_loc.first_row_index + + def _validate_null_stats(self, index_size, column_info): + """Validates the statistics stored in null_pages and null_counts.""" + column_index = column_info.column_index + column_stats = column_info.stats + assert column_index.null_pages is not None + assert len(column_index.null_pages) == index_size + assert column_index.null_counts is not None + assert len(column_index.null_counts) == index_size + + for page_is_null, null_count, page_header in zip(column_index.null_pages, + column_index.null_counts, column_info.page_headers): + assert page_header.type == PageType.DATA_PAGE + num_values = page_header.data_page_header.num_values + assert not page_is_null or null_count == num_values + + if column_stats: + assert column_stats.null_count == sum(column_index.null_counts) + + def _validate_min_max_values(self, index_size, column_info): + """Validate min/max values of the pages in a column chunk.""" + column_index = column_info.column_index + min_values = column_info.column_index.min_values + assert len(min_values) == index_size + max_values = column_info.column_index.max_values + assert len(max_values) == index_size + + if not column_info.stats: + return + + column_min_value_str = column_info.stats.min_value + column_max_value_str = column_info.stats.max_value + if column_min_value_str is None or column_max_value_str is None: + # If either is None, then both need to be None. + assert column_min_value_str is None and column_max_value_str is None + # No min and max value, all pages need to be null + for idx, null_page in enumerate(column_index.null_pages): + assert null_page, "Page {} of column {} is not null, \ + but doesn't have min and max values!".format(idx, column_index.schema.name) + # Everything is None, no further checks needed. + return + + column_min_value = decode_stats_value(column_info.schema, column_min_value_str) + for null_page, page_min_str in zip(column_index.null_pages, min_values): + if not null_page: + page_min_value = decode_stats_value(column_info.schema, page_min_str) + # If type is str, page_min_value might have been truncated. + if isinstance(page_min_value, basestring): + assert page_min_value >= column_min_value[:len(page_min_value)] + else: + assert page_min_value >= column_min_value + + column_max_value = decode_stats_value(column_info.schema, column_max_value_str) + for null_page, page_max_str in zip(column_index.null_pages, max_values): + if not null_page: + page_max_value = decode_stats_value(column_info.schema, page_max_str) + # If type is str, page_max_value might have been truncated and incremented. + if (isinstance(page_max_value, basestring) and + len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH): + max_val_prefix = page_max_value.rstrip('\0') + assert max_val_prefix[:-1] <= column_max_value + else: + assert page_max_value <= column_max_value + + def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values): + """Check if the ordering of the values reflects the value of 'ordering'.""" + + def is_sorted(l, reverse=False): + if not reverse: + return all(a <= b for a, b in zip(l, l[1:])) + else: + return all(a >= b for a, b in zip(l, l[1:])) + + # Filter out null pages and decode the actual min/max values. + actual_min_values = [decode_stats_value(schema, min_val) + for min_val, is_null in zip(min_values, null_pages) + if not is_null] + actual_max_values = [decode_stats_value(schema, max_val) + for max_val, is_null in zip(max_values, null_pages) + if not is_null] + + # For ASCENDING and DESCENDING, both min and max values need to be sorted. + if ordering == BoundaryOrder.ASCENDING: + assert is_sorted(actual_min_values) + assert is_sorted(actual_max_values) + elif ordering == BoundaryOrder.DESCENDING: + assert is_sorted(actual_min_values, reverse=True) + assert is_sorted(actual_max_values, reverse=True) + else: + assert ordering == BoundaryOrder.UNORDERED + # For UNORDERED, min and max values cannot be both sorted. + assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values) + assert (not is_sorted(actual_min_values, reverse=True) or + not is_sorted(actual_max_values, reverse=True)) + + def _validate_boundary_order(self, column_info): + """Validate that min/max values are really in the order specified by + boundary order. + """ + column_index = column_info.column_index + self._validate_ordering(column_index.boundary_order, column_info.schema, + column_index.null_pages, column_index.min_values, column_index.max_values) + + def _validate_parquet_page_index(self, hdfs_path, tmpdir): + """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup + index in that file is in the valid format. + """ + row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir) + for columns in row_group_indexes: + for column_info in columns: + try: + index_size = len(column_info.offset_index.page_locations) + assert index_size > 0 + self._validate_page_locations(column_info.offset_index.page_locations) + # IMPALA-7304: Impala doesn't write column index for floating-point columns + # until PARQUET-1222 is resolved. + if column_info.schema.type in [4, 5]: + assert column_info.column_index is None + continue + self._validate_null_stats(index_size, column_info) + self._validate_min_max_values(index_size, column_info) + self._validate_boundary_order(column_info) + except AssertionError as e: + e.args += ("Validation failed on column {}.".format(column_info.schema.name),) + raise + + def _ctas_table_and_verify_index(self, vector, unique_database, source_table, + tmpdir, sorting_column=None): + """Copies 'source_table' into a parquet table and makes sure that the index + in the resulting parquet file is valid. + """ + table_name = "test_hdfs_parquet_table_writer" + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, + table_name)) + # Setting num_nodes = 1 ensures that the query is executed on the coordinator, + # resulting in a single parquet file being written. + vector.get_value('exec_option')['num_nodes'] = 1 + self.execute_query("drop table if exists {0}".format(qualified_table_name)) + if sorting_column is None: + query = ("create table {0} stored as parquet as select * from {1}").format( + qualified_table_name, source_table) + else: + query = ("create table {0} sort by({1}) stored as parquet as select * from {2}" + ).format(qualified_table_name, sorting_column, source_table) + self.execute_query(query, vector.get_value('exec_option')) + self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table)) + + def _create_string_table_with_values(self, vector, unique_database, table_name, + values_sql): + """Creates a parquet table that has a single string column, then invokes an insert + statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')". + It returns the HDFS path for the table. + """ + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + self.execute_query("drop table if exists {0}".format(qualified_table_name)) + vector.get_value('exec_option')['num_nodes'] = 1 + query = ("create table {0} (str string) stored as parquet").format(qualified_table_name) + self.execute_query(query, vector.get_value('exec_option')) + self.execute_query("insert into {0} values {1}".format(qualified_table_name, + values_sql), vector.get_value('exec_option')) + return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, + table_name)) + + def test_write_index_alltypes(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes", + tmpdir) + + def test_write_index_decimals(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values, using decimal types. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl", + tmpdir) + + def test_write_index_chars(self, vector, unique_database, tmpdir): + """Test that writing a parquet file populates the rowgroup indexes with the correct + values, using char types. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats", + tmpdir) + + def test_write_index_null(self, vector, unique_database, tmpdir): + """Test that we don't write min/max values in the index for null columns. + Ensure null_count is set for columns with null values. + """ + self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable", + tmpdir) + + def test_write_index_multi_page(self, vector, unique_database, tmpdir): + """Test that when a ColumnChunk is written across multiple pages, the index is + valid. + """ + self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer", + tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders", + tmpdir) + + def test_write_index_sorting_column(self, vector, unique_database, tmpdir): + """Test that when the schema has a sorting column, the index is valid.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.zipcode_incomes", tmpdir, "id") + + def test_write_index_wide_table(self, vector, unique_database, tmpdir): + """Test table with wide row.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widerow", tmpdir) + + def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir): + """Test tables with wide rows and many columns.""" + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_250_cols", tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_500_cols", tmpdir) + self._ctas_table_and_verify_index(vector, unique_database, + "functional_parquet.widetable_1000_cols", tmpdir) + + def test_max_string_values(self, vector, unique_database, tmpdir): + """Test string values that are all 0xFFs or end with 0xFFs.""" + + # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH. + short_tbl = "short_tbl" + short_hdfs_path = self._create_string_table_with_values(vector, unique_database, + short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1)) + self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl)) + + # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH. + fit_tbl = "fit_tbl" + fit_hdfs_path = self._create_string_table_with_values(vector, unique_database, + fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH)) + self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl)) + + # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we + # should not write page statistics. + too_long_tbl = "too_long_tbl" + too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database, + too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1)) + row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path, + tmpdir.join(too_long_tbl)) + column = row_group_indexes[0][0] + assert column.column_index is None + # We always write the offset index + assert column.offset_index is not None + + # Test string with value that starts with 'aaa' following with 0xFFs and its length is + # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'. + aaa_tbl = "aaa_tbl" + aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database, + aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1)) + row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path, + tmpdir.join(aaa_tbl)) + column = row_group_indexes[0][0] + assert len(column.column_index.max_values) == 1 + max_value = column.column_index.max_values[0] + assert max_value == 'aab'
