This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.1.2 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 794eb1ba4a6d459379dee91c4274be3f40bd16ac Author: zhangyifan27 <[email protected]> AuthorDate: Fri Feb 3 16:43:08 2023 +0800 IMPALA-11081: Fix incorrect results in partition key scan This patch fixes incorrect results caused by short-circuit partition key scan in the case where a Parquet/ORC file contains multiple blocks. IMPALA-8834 introduced the optimization that generating only one scan range that corresponding to the first block per file. Backends only issue footer ranges for Parquet/ORC files for file-metadata-only queries(see HdfsScanner::IssueFooterRanges()), which leads to incorrect results if the first block doesn't include a file footer. This bug is fixed by returning a scan range corresponding to the last block for Parquet/ORC files to make sure it contains a file footer. Testing: - Added e2e tests to verify the fix. Backport Notes: - Trivial conflicts in HdfsScanNode.java and test_partition_metadata.py Change-Id: I17331ed6c26a747e0509dcbaf427cd52808943b1 Reviewed-on: http://gerrit.cloudera.org:8080/19471 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/planner/HdfsScanNode.java | 10 ++++- tests/common/test_dimensions.py | 15 ++++++++ tests/metadata/test_partition_metadata.py | 9 +---- tests/query_test/test_queries.py | 45 +++++++++++++++++++++- 4 files changed, 70 insertions(+), 9 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 78577769d..265ed00a8 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1324,7 +1324,15 @@ public class HdfsScanNode extends ScanNode { Preconditions.checkArgument(fileDesc.getNumFileBlocks() > 0); boolean fileDescMissingDiskIds = false; long fileMaxScanRangeBytes = 0; - for (int i = 0; i < fileDesc.getNumFileBlocks(); ++i) { + int i = 0; + if (isPartitionKeyScan_ && (partition.getFileFormat().isParquetBased() + || partition.getFileFormat() == HdfsFileFormat.ORC)) { + // IMPALA-8834 introduced the optimization for partition key scan by generating + // one scan range for each HDFS file. With Parquet and ORC, we start with the last + // block is to get a scan range that contains a file footer for short-circuiting. + i = fileDesc.getNumFileBlocks() - 1; + } + for (; i < fileDesc.getNumFileBlocks(); ++i) { FbFileBlock block = fileDesc.getFbFileBlock(i); int replicaHostCount = FileBlock.getNumReplicaHosts(block); if (replicaHostCount == 0) { diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py index 5a5788aac..48af54227 100644 --- a/tests/common/test_dimensions.py +++ b/tests/common/test_dimensions.py @@ -27,6 +27,21 @@ from tests.util.filesystem_utils import ( WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR'] +# Map from the test dimension file_format string to the SQL "STORED AS" or "STORED BY" +# argument. +FILE_FORMAT_TO_STORED_AS_MAP = { + 'text': 'TEXTFILE', + 'seq': 'SEQUENCEFILE', + 'rc': 'RCFILE', + 'orc': 'ORC', + 'parquet': 'PARQUET', + 'hudiparquet': 'HUDIPARQUET', + 'avro': 'AVRO', + 'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'", + 'kudu': "KUDU", + 'iceberg': "ICEBERG" +} + # Describes the configuration used to execute a single tests. Contains both the details # of what specific table format to target along with the exec options (num_nodes, etc) # to use when running the query. diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index 363014761..e86751084 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -20,14 +20,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfGCS, SkipIfCOS, SkipIfLocal) from tests.common.test_dimensions import (create_single_exec_option_dimension, - create_uncompressed_text_dimension) + create_uncompressed_text_dimension, FILE_FORMAT_TO_STORED_AS_MAP) from tests.util.filesystem_utils import get_fs_path, WAREHOUSE, FILESYSTEM_PREFIX -# Map from the test dimension file_format string to the SQL "STORED AS" -# argument. -STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro', - 'seq': 'sequencefile' } - # Tests specific to partition metadata. # TODO: Split up the DDL tests and move some of the partition-specific tests # here. @@ -60,7 +55,7 @@ class TestPartitionMetadata(ImpalaTestSuite): # Create the table self.client.execute( "create table %s (i int) partitioned by(j int) stored as %s location '%s'" - % (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION)) + % (FQ_TBL_NAME, FILE_FORMAT_TO_STORED_AS_MAP[file_format], TBL_LOCATION)) # Point both partitions to the same location. self.client.execute("alter table %s add partition (j=1) location '%s/p'" diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index afd7354e9..0a907046f 100644 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -26,7 +26,9 @@ from tests.common.skip import SkipIfEC, SkipIfCatalogV2, SkipIfNotHdfsMinicluste from tests.common.test_dimensions import ( create_uncompressed_text_dimension, create_exec_option_dimension_from_dict, create_client_protocol_dimension, hs2_parquet_constraint, - extend_exec_option_dimension) + extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP) +from tests.util.filesystem_utils import get_fs_path +from subprocess import check_call class TestQueries(ImpalaTestSuite): @@ -329,6 +331,47 @@ class TestPartitionKeyScans(ImpalaTestSuite): def test_partition_key_scans_with_joins(self, vector): self.run_test_case('QueryTest/partition-key-scans-with-joins', vector) + +class TestPartitionKeyScansWithMultipleBlocks(ImpalaTestSuite): + """Tests for queries that exercise partition key scan optimisation with data files + that contain multiple blocks.""" + @classmethod + def add_test_dimensions(cls): + super(TestPartitionKeyScansWithMultipleBlocks, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format not in ('kudu', 'hbase')) + + @classmethod + def get_workload(cls): + return 'functional-query' + + def _build_alltypes_multiblocks_table(self, vector, unique_database): + file_format = vector.get_value('table_format').file_format + db_suffix = vector.get_value('table_format').db_suffix() + src_tbl_name = 'functional' + db_suffix + '.alltypes' + src_tbl_loc = self._get_table_location(src_tbl_name, vector) + source_file = src_tbl_loc + '/year=2010/month=12/*' + tbl_loc = get_fs_path("/test-warehouse/%s.db/alltypes_multiblocks" + % (unique_database)) + file_path = tbl_loc + "/year=2010/month=12" + + check_call(['hdfs', 'dfs', '-mkdir', '-p', file_path]) + self.client.execute("""create table if not exists %s.alltypes_multiblocks + like functional.alltypes stored as %s location '%s';""" + % (unique_database, FILE_FORMAT_TO_STORED_AS_MAP[file_format], tbl_loc)) + + # set block size to 1024 so the target file occupies multiple blocks + check_call(['hdfs', 'dfs', '-Ddfs.block.size=1024', '-cp', '-f', '-d', + source_file, file_path]) + self.client.execute("alter table %s.alltypes_multiblocks recover partitions" + % (unique_database)) + + def test_partition_key_scans_with_multiple_blocks_table(self, vector, unique_database): + self._build_alltypes_multiblocks_table(vector, unique_database) + result = self.execute_query_expect_success(self.client, + "SELECT max(year) FROM %s.alltypes_multiblocks" % (unique_database)) + assert int(result.get_data()) == 2010 + class TestTopNReclaimQuery(ImpalaTestSuite): """Test class to validate that TopN periodically reclaims tuple pool memory and runs with a lower memory footprint."""
