This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 13030f840a23e67c5e9923e8b1abab3b717c106a Author: Riza Suminto <[email protected]> AuthorDate: Thu Feb 8 12:14:50 2024 -0800 IMPALA-12796: Add is_footer_only in TFileSplitGeneratorSpec Several tests in test_scanners.py failed by wrong row counts with S3 target filesystem after IMPALA-12631. S3 filesystem does not have block. Planner will produce TFileSplitGeneratorSpec instead of TScanRangeLocationList, and IMPALA-12631 miss to address necessary changes in TFileSplitGeneratorSpec. Meanwhile, it already changed the behavior of hdfs-parquet-scanner.cc. For each scan range, the new code will loop file_metadata_.row_groups, while the old code just take one entry of file_metadata_.row_groups after calling NextRowGroup(). This patch address the issue by adding is_footer_only field in TFileSplitGeneratorSpec schedule accordingly in schedule.cc. This also add field 'is_footer_scanner_' in hdfs-columnar-scanner.h to check that optimized count star only applied with footer range. Testing: - Pass core tests with S3 target filesystem. Change-Id: Iaa6e3c14debe68cf601131c6594774c8c695923e Reviewed-on: http://gerrit.cloudera.org:8080/21021 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-columnar-scanner.cc | 5 ++++ be/src/exec/hdfs-columnar-scanner.h | 4 +++ be/src/exec/parquet/hdfs-parquet-scanner.cc | 1 + be/src/scheduling/scheduler-test-util.cc | 1 + be/src/scheduling/scheduler-test-util.h | 10 +++++-- be/src/scheduling/scheduler-test.cc | 21 +++++++++++++++ be/src/scheduling/scheduler.cc | 13 +++++++-- common/thrift/PlanNodes.thrift | 4 +++ .../org/apache/impala/planner/HdfsScanNode.java | 31 +++++++++++----------- 9 files changed, 71 insertions(+), 19 deletions(-) diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc index 518091612..7a1ad7744 100644 --- a/be/src/exec/hdfs-columnar-scanner.cc +++ b/be/src/exec/hdfs-columnar-scanner.cc @@ -81,6 +81,11 @@ HdfsColumnarScanner::~HdfsColumnarScanner() {} Status HdfsColumnarScanner::Open(ScannerContext* context) { RETURN_IF_ERROR(HdfsScanner::Open(context)); + // Memorize 'is_footer_scanner_' here since 'stream_' can be released early. + const io::ScanRange* range = stream_->scan_range(); + is_footer_scanner_ = + range->offset() + range->bytes_to_read() >= stream_->file_desc()->file_length; + RuntimeProfile* profile = scan_node_->runtime_profile(); num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile); num_scanners_with_no_reads_counter_ = diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h index a1d0b2903..00276d5be 100644 --- a/be/src/exec/hdfs-columnar-scanner.h +++ b/be/src/exec/hdfs-columnar-scanner.h @@ -60,6 +60,10 @@ class HdfsColumnarScanner : public HdfsScanner { /// top-level tuples. See AssembleRows() in the derived classes. boost::scoped_ptr<ScratchTupleBatch> scratch_batch_; + /// Indicate whether this is a footer scanner or not. + /// Assigned in HdfsColumnarScanner::Open(). + bool is_footer_scanner_ = false; + /// Scan range for the metadata. const io::ScanRange* metadata_range_ = nullptr; diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index d69e985d4..6d41dc147 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -435,6 +435,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { DCHECK(parse_status_.ok()) << parse_status_.GetDetail(); if (scan_node_->optimize_parquet_count_star()) { // Populate the single slot with the Parquet num rows statistic. + DCHECK(is_footer_scanner_); int64_t tuple_buf_size; uint8_t* tuple_buf; int capacity = 1; diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 14e7eb620..97c32e724 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -411,6 +411,7 @@ void Plan::BuildScanRangeSpec(const TableName& table_name, thrift_spec->__set_file_desc(thrift_file); thrift_spec->__set_max_block_size(spec.block_size); thrift_spec->__set_is_splittable(spec.is_splittable); + thrift_spec->__set_is_footer_only(spec.is_footer_only); int32_t partition_path_hash = static_cast<int32_t>(HashUtil::Hash(partition_path.data(), partition_path.length(), 0)); thrift_spec->__set_partition_path_hash(partition_path_hash); diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h index 2b6cab078..7696e2c2b 100644 --- a/be/src/scheduling/scheduler-test-util.h +++ b/be/src/scheduling/scheduler-test-util.h @@ -202,8 +202,12 @@ struct Block { struct FileSplitGeneratorSpec { FileSplitGeneratorSpec() {} - FileSplitGeneratorSpec(int64_t length, int64_t block, bool splittable) - : length(length), block_size(block), is_splittable(splittable) {} + FileSplitGeneratorSpec( + int64_t length, int64_t block, bool splittable, bool is_footer_only = false) + : length(length), + block_size(block), + is_splittable(splittable), + is_footer_only(is_footer_only) {} /// Length of file for which to generate file splits. int64_t length = DEFAULT_FILE_SIZE; @@ -213,6 +217,8 @@ struct FileSplitGeneratorSpec { bool is_splittable = true; + bool is_footer_only = false; + static const int64_t DEFAULT_FILE_SIZE; static const int64_t DEFAULT_BLOCK_SIZE; }; diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index 3a132a784..b1ee871df 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -642,6 +642,27 @@ TEST_F(SchedulerTest, TestGeneratedVariableSizeSplit) { EXPECT_EQ(300, result.NumTotalAssignedBytes()); } +TEST_F(SchedulerTest, TestGeneratedVariableSizeSplitFooterOnly) { + Cluster cluster; + + cluster.AddHosts(3, true, true); + + Schema schema(cluster); + schema.AddFileSplitGeneratorSpecs( + "T", {{100, 100, false, false}, {100, 1, true, true}, {100, 10, true, true}}); + + Plan plan(schema); + plan.AddTableScan("T"); + plan.SetRandomReplica(true); + + Result result(plan); + SchedulerWrapper scheduler(plan); + ASSERT_OK(scheduler.Compute(&result)); + + EXPECT_EQ(3, result.NumTotalAssignments()); + EXPECT_EQ(111, result.NumTotalAssignedBytes()); +} + TEST_F(SchedulerTest, TestBlockAndGenerateSplit) { Cluster cluster; diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 7f16a4058..287407389 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -121,8 +121,17 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec long scan_range_offset = 0; long remaining = fb_desc->length(); - long scan_range_length = std::min(spec.max_block_size, fb_desc->length()); - if (!spec.is_splittable) scan_range_length = fb_desc->length(); + long scan_range_length = fb_desc->length(); + + if (spec.is_splittable) { + scan_range_length = std::min(spec.max_block_size, fb_desc->length()); + if (spec.is_footer_only) { + scan_range_offset = fb_desc->length() - scan_range_length; + remaining = scan_range_length; + } + } else { + DCHECK(!spec.is_footer_only); + } while (remaining > 0) { THdfsFileSplit hdfs_scan_range; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index c957a5af7..cfee32a5a 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -271,6 +271,10 @@ struct TFileSplitGeneratorSpec { // Hash of the partition path 5: required i32 partition_path_hash + + // True if only footer range (the last block in file) is needed. + // If True, is_splittable must also be True as well. + 6: required bool is_footer_only } // Specification of an individual data range which is held in its entirety diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index ed926adef..60e4abc04 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1253,23 +1253,24 @@ public class HdfsScanNode extends ScanNode { totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), Long::sum); } + // If parquet count star optimization is enabled, we only need the + // 'RowGroup.num_rows' in file metadata, thus only the scan range that contains + // a file footer is required. + // IMPALA-8834 introduced the optimization for partition key scan by generating + // one scan range for each HDFS file. With Parquet and ORC, we only need to get + // the scan range that contains a file footer for short-circuiting. + boolean isFooterOnly = countStarSlot_ != null + || (isPartitionKeyScan_ + && (partition.getFileFormat().isParquetBased() + || partition.getFileFormat() == HdfsFileFormat.ORC)); + if (!fsHasBlocks) { Preconditions.checkState(fileDesc.getNumFileBlocks() == 0); generateScanRangeSpecs( - partition, partitionLocation, fileDesc, scanRangeBytesLimit); + partition, partitionLocation, fileDesc, scanRangeBytesLimit, isFooterOnly); } else { // Skips files that have no associated blocks. if (fileDesc.getNumFileBlocks() == 0) continue; - // If parquet count star optimization is enabled, we only need the - // 'RowGroup.num_rows' in file metadata, thus only the scan range that contains - // a file footer is required. - // IMPALA-8834 introduced the optimization for partition key scan by generating - // one scan range for each HDFS file. With Parquet and ORC, we only need to get - // the scan range that contains a file footer for short-circuiting. - boolean isFooterOnly = countStarSlot_ != null - || (isPartitionKeyScan_ - && (partition.getFileFormat().isParquetBased() - || partition.getFileFormat() == HdfsFileFormat.ORC)); Pair<Boolean, Long> result = transformBlocksToScanRanges(partition, partitionLocation, fsType, fileDesc, fsHasBlocks, scanRangeBytesLimit, analyzer, isFooterOnly); @@ -1371,17 +1372,17 @@ public class HdfsScanNode extends ScanNode { * FeFsPartition can be expensive. */ private void generateScanRangeSpecs(FeFsPartition partition, String partitionLocation, - FileDescriptor fileDesc, long maxBlockSize) { + FileDescriptor fileDesc, long maxBlockSize, boolean isFooterOnly) { Preconditions.checkArgument(fileDesc.getNumFileBlocks() == 0); Preconditions.checkArgument(maxBlockSize > 0); if (fileDesc.getFileLength() <= 0) return; boolean splittable = partition.getFileFormat().isSplittable( HdfsCompression.fromFileName(fileDesc.getPath())); + isFooterOnly &= splittable; // Hashing must use String.hashCode() for consistency. int partitionHash = partitionLocation.hashCode(); - TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec( - fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(), - partitionHash); + TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(fileDesc.toThrift(), + maxBlockSize, splittable, partition.getId(), partitionHash, isFooterOnly); scanRangeSpecs_.addToSplit_specs(splitSpec); long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength()); if (splittable && !isPartitionKeyScan_) {
