This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new bf7c2088d IMPALA-11986: (part 1) Optimize partition key scans for
Iceberg tables
bf7c2088d is described below
commit bf7c2088dd5495a763ff9a381970f99e6101cd4b
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Feb 17 18:24:59 2026 +0100
IMPALA-11986: (part 1) Optimize partition key scans for Iceberg tables
This patch optimizes queries that only scan IDENTITY-partitioned
columns. The optimization only applies, if:
* All materialized aggregate expressions have distinct semantics
(e.g. MIN, MAX, NDV). In other words, this optimization will work
for COUNT(DISTINCT c) but not COUNT(c).
* All materialized columns are IDENTITY-partitioned in all partition
specs (this can be relaxed later)
If the above conditions are met, then each data file (without deletes)
only produce a single record. The rest of the table (data files with
deletes and delete files) are scanned normally.
Testing:
* added e2e tests
Change-Id: I32f78ee60ac4a410e91cf0e858199dd39d2e9afe
Reviewed-on: http://gerrit.cloudera.org:8080/23985
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../org/apache/impala/planner/HdfsScanNode.java | 2 +-
.../org/apache/impala/planner/IcebergScanNode.java | 11 +-
.../apache/impala/planner/IcebergScanPlanner.java | 62 +++++++--
.../QueryTest/iceberg-partition-key-scans.test | 140 +++++++++++++++++++++
tests/query_test/test_iceberg.py | 3 +
5 files changed, 206 insertions(+), 12 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index f2e598c06..256c36d1f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -268,7 +268,7 @@ public class HdfsScanNode extends ScanNode {
// True if this is a scan that only returns partition keys and is only
required to
// return at least one of each of the distinct values of the partition keys.
- private final boolean isPartitionKeyScan_;
+ protected final boolean isPartitionKeyScan_;
// Conjuncts that can be evaluated while materializing the items (tuples) of
// collection-typed slots. Maps from tuple descriptor to the conjuncts bound
by that
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index ed8e5808c..d6a67a0bd 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -89,19 +89,20 @@ public class IcebergScanNode extends HdfsScanNode {
public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
int numPartitions,
- List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, long
snapshotId) {
+ List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, long
snapshotId,
+ boolean isPartitionKeyScan) {
this(id, tblRef, conjuncts, aggInfo, fileDescs, numPartitions,
nonIdentityConjuncts,
- skippedConjuncts, null, snapshotId);
+ skippedConjuncts, null, snapshotId, isPartitionKeyScan);
}
public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
int numPartitions,
List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, PlanNodeId
deleteId,
- long snapshotId) {
+ long snapshotId, boolean isPartitionKeyScan) {
super(id, tblRef.getDesc(), conjuncts,
getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef,
- aggInfo, null, false);
+ aggInfo, null, isPartitionKeyScan);
// Hdfs table transformed from iceberg table only has one partition
Preconditions.checkState(partitions_.size() == 1);
@@ -143,6 +144,8 @@ public class IcebergScanNode extends HdfsScanNode {
cardinality_,
iceFd.getFbFileMetadata().icebergMetadata().recordCount());
}
}
+ } else if (isPartitionKeyScan_) {
+ cardinality_ = fileDescs_.size();
} else {
for (IcebergFileDescriptor fd : fileDescs_) {
cardinality_ = MathUtil.addCardinalities(
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 2d0bbd87f..c470029ef 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -52,6 +52,7 @@ import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.IcebergExpressionCollector;
+import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.analysis.MultiAggregateInfo;
import org.apache.impala.analysis.SlotDescriptor;
@@ -247,6 +248,7 @@ public class IcebergScanPlanner {
}
private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
+ boolean isPartitionKeyScan = IsPartitionKeyScan();
if (noDeleteFiles()) {
Preconditions.checkState(!tblRef_.optimizeCountStarForIcebergV2());
// If there are no delete files we can just create a single SCAN node.
@@ -255,7 +257,7 @@ public class IcebergScanPlanner {
aggInfo_, dataFilesWithoutDeletes_,
getIceTable().getContentFileStore().getNumPartitions(),
nonIdentityConjuncts_,
- getSkippedConjuncts(), snapshotId_);
+ getSkippedConjuncts(), snapshotId_, isPartitionKeyScan);
ret.init(analyzer_);
return ret;
}
@@ -280,7 +282,7 @@ public class IcebergScanPlanner {
IcebergScanNode dataScanNode = new IcebergScanNode(
ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_,
dataFilesWithoutDeletes_,
getIceTable().getContentFileStore().getNumPartitions(),
- nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_);
+ nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_,
isPartitionKeyScan);
dataScanNode.init(analyzer_);
List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
SlotRef::new).collect(Collectors.toList());
@@ -299,6 +301,48 @@ public class IcebergScanPlanner {
return unionNode;
}
+ private boolean IsPartitionKeyScan() {
+ if (tblRef_.optimizeCountStarForIcebergV2()) return false;
+ boolean allAggsDistinct = aggInfo_ != null && aggInfo_.hasAllDistinctAgg();
+ if (!allAggsDistinct) return false;
+ TupleDescriptor tDesc = tblRef_.getDesc();
+ if (!tDesc.hasMaterializedSlots()) return true;
+
+ FeIcebergTable iceTable = getIceTable();
+ for (SlotDescriptor slotDesc: tDesc.getSlots()) {
+ if (!slotDesc.isMaterialized()) continue;
+ IcebergColumn column = (IcebergColumn) slotDesc.getColumn();
+ if (column == null) continue;
+ // We check all partition specs here. We are a bit stricter than
necessary,
+ // because old partition specs might no longer have any data.
+ // TODO: later we could group data files (without deletes) into
categories:
+ // - files eligible for partition key scan
+ // - files non-eligible for partition key scans
+ // Then we could do the following plan:
+ // UNION ALL
+ // / | \
+ // / | \
+ // / | \
+ // PARTITION SCAN ICEBERG
+ // KEY WITHOUT DELETE
+ // SCAN DELETES NODE
+ // / \
+ // / \
+ // SCAN SCAN
+ // data delete
+ // files files
+ // Later PARTITION KEY SCAN could be a UNION NODE that produces the
partition keys,
+ // see SingleNodePlanner.createOptimizedPartitionUnionNode().
+ for (IcebergPartitionSpec spec : iceTable.getPartitionSpecs()) {
+ if (IcebergUtil.getPartitionTransformType(column, spec) !=
+ TIcebergPartitionTransformType.IDENTITY) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
private PlanNode createPositionJoinNode() throws ImpalaException {
Preconditions.checkState(positionDeletesRecordCount_ != 0);
Preconditions.checkState(dataFilesWithDeletesSumPaths_ != 0);
@@ -320,7 +364,8 @@ public class IcebergScanPlanner {
IcebergScanNode dataScanNode = new IcebergScanNode(
dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
getIceTable().getContentFileStore().getNumPartitions(),
- nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId,
snapshotId_);
+ nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId,
snapshotId_,
+ false /*isPartitionKeyScan*/);
dataScanNode.init(analyzer_);
IcebergScanNode deleteScanNode = new IcebergScanNode(
deleteScanNodeId,
@@ -331,7 +376,8 @@ public class IcebergScanPlanner {
getIceTable().getContentFileStore().getNumPartitions(),
Collections.emptyList(), /*nonIdentityConjuncts*/
Collections.emptyList(), /*skippedConjuncts*/
- snapshotId_);
+ snapshotId_,
+ false /*isPartitionKeyScan*/);
deleteScanNode.init(analyzer_);
// Now let's create the JOIN node
@@ -533,7 +579,8 @@ public class IcebergScanPlanner {
IcebergScanNode dataScanNode = new IcebergScanNode(
dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
getIceTable().getContentFileStore().getNumPartitions(),
- nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_);
+ nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_,
+ false /*isPartitionKeyScan*/);
addAllSlotsForEqualityDeletes(tblRef_);
dataScanNode.init(analyzer_);
@@ -573,8 +620,9 @@ public class IcebergScanPlanner {
Lists.newArrayList(equalityDeleteFiles),
getIceTable().getContentFileStore().getNumPartitions(),
Collections.emptyList(), /*nonIdentityConjuncts*/
- Collections.emptyList(),
- snapshotId_); /*skippedConjuncts*/
+ Collections.emptyList(), /*skippedConjuncts*/
+ snapshotId_,
+ false /*isPartitionKeyScan*/);
deleteScanNode.init(analyzer_);
Pair<List<BinaryPredicate>, List<Expr>> equalityJoinConjuncts =
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
new file mode 100644
index 000000000..6bbff42b0
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
@@ -0,0 +1,140 @@
+====
+---- QUERY
+CREATE TABLE ice_store_sales PARTITIONED BY SPEC (ss_store_sk)
+STORED BY ICEBERG
+AS SELECT * FROM tpcds_parquet.store_sales;
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select count(distinct ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+6
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select min(ss_store_sk), max(ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+1,10
+---- TYPES
+INT,INT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+# Partition key scan optimization cannot be applied with non-partition column
in select list.
+select min(ss_store_sk), max(ss_store_sk), max(ss_sold_date_sk)
+from ice_store_sales;
+---- RESULTS
+1,10,2452642
+---- TYPES
+INT,INT,INT
+---- RUNTIME_PROFILE
+ tuple-ids=0 row-size=8B cardinality=2.88M
+====
+---- QUERY
+select distinct typeof(ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+'INT'
+---- TYPES
+STRING
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales
+where ss_store_sk % 2 = 0;
+---- RESULTS
+2
+8
+4
+10
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=1
+====
+---- QUERY
+select count(*) from (select distinct ss_store_sk from ice_store_sales limit
3) v;
+---- RESULTS
+3
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+DELETE FROM ice_store_sales WHERE ss_store_sk = 1
+AND ss_sold_date_sk % 199 = 0;
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+ partition key scan
+ tuple-ids=0 row-size=24B cardinality=6
+ | tuple-ids=0 row-size=24B cardinality=456.90K
+====
+---- QUERY
+ALTER TABLE ice_store_sales SET PARTITION SPEC (ss_sold_date_sk);
+====
+---- QUERY
+# Now partition key scan optimization cannot be applied on 'ss_store_sk'.
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+ tuple-ids=0 row-size=24B cardinality=2.42M
+ | tuple-ids=0 row-size=24B cardinality=456.90K
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index ba56794a4..c3ab38f58 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2264,6 +2264,9 @@ class TestIcebergV2Table(IcebergTestSuite):
tbl_name, second_snapshot.get_snapshot_id()))
assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
+ def test_partition_key_scans(self, vector, unique_database):
+ self.run_test_case('QueryTest/iceberg-partition-key-scans', vector,
unique_database)
+
def test_table_repair(self, unique_database):
tbl_name = 'tbl_with_removed_files'
db_tbl = unique_database + "." + tbl_name