This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new e4fa1b8c8 IMPALA-12847: Expose computeScanRangeLocations and
computeStats
e4fa1b8c8 is described below
commit e4fa1b8c8f4f2935d1c630628e39aefbbfd2a3c3
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Feb 26 09:44:47 2024 -0800
IMPALA-12847: Expose computeScanRangeLocations and computeStats
After IMPALA-12631, HdfsScanNode.computeScanRangeLocations() need check
if countStarSlot_ is null or not to schedule footer range only.
HdfsScanNode.computeScanRangeLocations() is called along with
HdfsScanNode.init() call.
An external frontend that has its own count star slot analysis and
initialization will need to recompute scan range assignment and stats
after HdfsScanNode.init(). Therefore, computeScanRangeLocations() and
computeStats() should be made idempotent after init() and exposed to
subclasses.
This patch decouple countStarSlot_ initialization from
computeScanRangeLocations() and raise access level of
computeScanRangeLocations() from private to protected.
Testing:
- Pass core tests.
Change-Id: Ia621309c67455bb599f71bec9efc1f67fc085022
Reviewed-on: http://gerrit.cloudera.org:8080/21077
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../org/apache/impala/planner/HdfsScanNode.java | 134 +++++++++++++--------
1 file changed, 84 insertions(+), 50 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 6dc5590f9..82aea8319 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -212,7 +212,13 @@ public class HdfsScanNode extends ScanNode {
private final FeFsTable tbl_;
// List of partitions to be scanned. Partitions have been pruned.
- protected final List<? extends FeFsPartition> partitions_;
+ protected final List<FeFsPartition> partitions_;
+
+ // List of paritions that has been reduced through sampling.
+ // Only initialized at checkSamplingAndCountStar() if sampling is True.
+ // Accessors must fallback to partitions_ if sampledPartitions_ stays null
after
+ // checkSamplingAndCountStar().
+ protected List<FeFsPartition> sampledPartitions_ = null;
// Parameters for table sampling. Null if not sampling.
private final TableSampleClause sampleParams_;
@@ -233,8 +239,10 @@ public class HdfsScanNode extends ScanNode {
private Map<FileSystemUtil.FsType, Long> totalFilesPerFsEC_ = new
TreeMap<>();
private Map<FileSystemUtil.FsType, Long> totalBytesPerFsEC_ = new
TreeMap<>();
- // File formats scanned. Set in computeScanRangeLocations().
- protected Set<HdfsFileFormat> fileFormats_;
+ // File formats scanned. Set in checkSamplingAndCountStar().
+ // HdfsFileFormat.ICEBERG is always excluded from this set.
+ // Populated in checkSamplingAndCountStar().
+ protected Set<HdfsFileFormat> fileFormats_ = new HashSet<>();
// Whether all formats scanned are Parquet. Set in
computeScanRangeLocations().
private boolean allParquet_ = false;
@@ -334,6 +342,7 @@ public class HdfsScanNode extends ScanNode {
protected SlotDescriptor countStarSlot_ = null;
// Sampled file descriptors if table sampling is used. Grouped by partition
id.
+ // Initialized in checkSamplingAndCountStar();
Map<Long, List<FileDescriptor>> sampledFiles_ = null;
// Conjuncts used to trim the set of partitions passed to this node.
@@ -354,7 +363,7 @@ public class HdfsScanNode extends ScanNode {
super(id, desc, createDisplayName(hdfsTblRef.getTable()));
tbl_ = (FeFsTable)desc.getTable();
conjuncts_ = conjuncts;
- partitions_ = partitions;
+ partitions_ = new ArrayList<>(partitions);
partitionConjuncts_ = partConjuncts;
sampleParams_ = hdfsTblRef.getSampleParams();
replicaPreference_ = hdfsTblRef.getReplicaPreference();
@@ -372,7 +381,6 @@ public class HdfsScanNode extends ScanNode {
throw new IllegalStateException(error.toString());
}
isPartitionKeyScan_ = isPartitionKeyScan;
- fileFormats_ = new HashSet<>();
}
/**
@@ -419,12 +427,18 @@ public class HdfsScanNode extends ScanNode {
return canApplyCountStarOptimization(analyzer);
}
+ // Return sampledPartitions_ if not null. Otherwise, return partitions_.
+ private List<FeFsPartition> getSampledOrRawPartitions() {
+ return sampledPartitions_ == null ? partitions_ : sampledPartitions_;
+ }
+
/**
* Populate collectionConjuncts_ and scanRanges_.
*/
@Override
public void init(Analyzer analyzer) throws ImpalaException {
conjuncts_ = orderConjunctsByCost(conjuncts_);
+ checkSamplingAndCountStar(analyzer);
checkForSupportedFileFormats();
assignCollectionConjuncts(analyzer);
@@ -463,6 +477,51 @@ public class HdfsScanNode extends ScanNode {
assignedConjuncts_ = analyzer.getAssignedConjuncts();
}
+ /**
+ * Initialize sampledFiles_, sampledPartitions_, fileFormats_, and
countStarSlot_.
+ * @param analyzer Analyzer object used to init this class.
+ */
+ private void checkSamplingAndCountStar(Analyzer analyzer) {
+ if (sampleParams_ != null) {
+ long percentBytes = sampleParams_.getPercentBytes();
+ long randomSeed;
+ if (sampleParams_.hasRandomSeed()) {
+ randomSeed = sampleParams_.getRandomSeed();
+ } else {
+ randomSeed = System.currentTimeMillis();
+ }
+ // Pass a minimum sample size of 0 because users cannot set a minimum
sample size
+ // for scans directly. For compute stats, a minimum sample size can be
set, and
+ // the sampling percent is adjusted to reflect it.
+ sampledFiles_ = getFilesSample(percentBytes, 0, randomSeed);
+ }
+
+ if (sampledFiles_ != null) {
+ // Initialize sampledPartitions_.
+ sampledPartitions_ = new ArrayList<>();
+ for (FeFsPartition partition : partitions_) {
+ Preconditions.checkState(partition.getId() >= 0);
+ if (sampledFiles_.get(partition.getId()) != null) {
+ sampledPartitions_.add(partition);
+ }
+ }
+ }
+
+ // Populate fileFormats_.
+ for (FeFsPartition partition : getSampledOrRawPartitions()) {
+ if (partition.getFileFormat() != HdfsFileFormat.ICEBERG) {
+ fileFormats_.add(partition.getFileFormat());
+ }
+ }
+
+ // Initialize countStarSlot_.
+ if (canApplyCountStarOptimization(analyzer, fileFormats_)) {
+ Preconditions.checkState(desc_.getPath().destTable() != null);
+ Preconditions.checkState(collectionConjuncts_.isEmpty());
+ countStarSlot_ = applyCountStarOptimization(analyzer);
+ }
+ }
+
/**
* Throws NotImplementedException if we do not support scanning the
partition.
* Specifically:
@@ -1099,48 +1158,14 @@ public class HdfsScanNode extends ScanNode {
/**
* Computes scan ranges (i.e. hdfs splits) plus their storage locations,
including
* volume ids, based on the given maximum number of bytes each scan range
should scan.
- * If 'sampleParams_' is not null, generates a sample and computes the scan
ranges
- * based on the sample.
+ * If 'sampledPartitions_' is not null, computes the scan ranges based on
the sample
+ * partitions.
*
* Initializes members with information about files and scan ranges, e.g.
- * totalFilesPerFs_, fileFormats_, etc.
+ * scanRangeSpecs_, generatedScanRangeCount_, totalFilesPerFs_, etc.
*/
- private void computeScanRangeLocations(Analyzer analyzer)
+ protected void computeScanRangeLocations(Analyzer analyzer)
throws ImpalaRuntimeException {
- if (sampleParams_ != null) {
- long percentBytes = sampleParams_.getPercentBytes();
- long randomSeed;
- if (sampleParams_.hasRandomSeed()) {
- randomSeed = sampleParams_.getRandomSeed();
- } else {
- randomSeed = System.currentTimeMillis();
- }
- // Pass a minimum sample size of 0 because users cannot set a minimum
sample size
- // for scans directly. For compute stats, a minimum sample size can be
set, and
- // the sampling percent is adjusted to reflect it.
- sampledFiles_ = getFilesSample(percentBytes, 0, randomSeed);
- }
-
- // Get partitions in the sample and initialize fileFormats_.
- List<FeFsPartition> partitions = new ArrayList<>();
- for (FeFsPartition partition : partitions_) {
- Preconditions.checkState(partition.getId() >= 0);
- if (sampledFiles_ != null && sampledFiles_.get(partition.getId()) ==
null) {
- // If we are sampling, check whether this partition is included in the
sample.
- continue;
- }
- partitions.add(partition);
- if (partition.getFileFormat() != HdfsFileFormat.ICEBERG) {
- fileFormats_.add(partition.getFileFormat());
- }
- }
-
- if (canApplyCountStarOptimization(analyzer, fileFormats_)) {
- Preconditions.checkState(desc_.getPath().destTable() != null);
- Preconditions.checkState(collectionConjuncts_.isEmpty());
- countStarSlot_ = applyCountStarOptimization(analyzer);
- }
-
long scanRangeBytesLimit =
analyzer.getQueryCtx().client_request.getQuery_options()
.getMax_scan_range_length();
if (RuntimeEnv.INSTANCE.hasTableScanRangeLimit() && desc_.getTableName()
!= null) {
@@ -1151,17 +1176,25 @@ public class HdfsScanNode extends ScanNode {
scanRangeBytesLimit = testLimit;
}
}
- scanRangeSpecs_ = new TScanRangeSpec();
+ // Initialize class fields that related to scan ranges.
+ scanRangeSpecs_ = new TScanRangeSpec();
+ generatedScanRangeCount_ = 0;
+ largestScanRangeBytes_ = 0;
+ maxScanRangeNumRows_ = -1;
+ numScanRangesNoDiskIds_ = 0;
+ numFilesNoDiskIds_ = 0;
+ numPartitionsNoDiskIds_ = 0;
numPartitionsPerFs_ = new TreeMap<>();
totalFilesPerFs_ = new TreeMap<>();
totalBytesPerFs_ = new TreeMap<>();
totalFilesPerFsEC_ = new TreeMap<>();
totalBytesPerFsEC_ = new TreeMap<>();
- largestScanRangeBytes_ = 0;
- maxScanRangeNumRows_ = -1;
- boolean allParquet = (partitions_.size() > 0) ? true : false;
- boolean allColumnarFormat = (partitions_.size() > 0) ? true : false;
+
+ Preconditions.checkState((sampleParams_ == null) == (sampledPartitions_ ==
null));
+ int partitionsSize = getSampledOrRawPartitions().size();
+ boolean allParquet = (partitionsSize > 0) ? true : false;
+ boolean allColumnarFormat = (partitionsSize > 0) ? true : false;
long simpleLimitNumRows = 0; // only used for the simple limit case
boolean isSimpleLimit = sampleParams_ == null &&
(analyzer.getQueryCtx().client_request.getQuery_options()
@@ -1178,7 +1211,7 @@ public class HdfsScanNode extends ScanNode {
String lastFsScheme = null;
String lastFsAuthority = null;
FileSystem lastFileSytem = null;
- for (FeFsPartition partition : partitions) {
+ for (FeFsPartition partition : getSampledOrRawPartitions()) {
// Save location to local variable beacuse getLocation() can be slow as
it needs to
// decompress the partition's location.
String partitionLocation = partition.getLocation();
@@ -2176,7 +2209,8 @@ public class HdfsScanNode extends ScanNode {
return;
}
- Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRangeSize);
+ Preconditions.checkState(0 < numNodes_);
+ Preconditions.checkState(numNodes_ <= scanRangeSize);
Preconditions.checkNotNull(desc_);
Preconditions.checkState(desc_.getTable() instanceof FeFsTable);
List<Long> columnReservations = null;