This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7ef2e9dd Spark: Disable aggregate pushdown for incremental scan
(#7626)
9d7ef2e9dd is described below
commit 9d7ef2e9dd27375c2b8bd7a143e4f623118185f8
Author: Huaxin Gao <[email protected]>
AuthorDate: Wed May 17 08:44:51 2023 -0700
Spark: Disable aggregate pushdown for incremental scan (#7626)
---
.../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 5 +++++
.../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
.../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 5 +++++
.../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
4 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
return false;
}
+ if (readConf.startSnapshotId() != null) {
+ LOG.info("Skipping aggregate pushdown: incremental scan is not
supported");
+ return false;
+ }
+
// If group by expression is the same as the partition, the statistics
information can still
// be used to calculate min/max/count, will enable aggregate push down in
next phase.
// TODO: enable aggregate push down for partition col group by expression
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..9f4eab5bb9 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -300,17 +300,17 @@ public class TestDataSourceOptions {
Assert.assertEquals("Records should match", expectedRecords.subList(1, 4),
result);
// test (2nd snapshot, 3rd snapshot] incremental scan.
- List<SimpleRecord> result1 =
+ Dataset<Row> resultDf =
spark
.read()
.format("iceberg")
.option("start-snapshot-id", snapshotIds.get(2).toString())
.option("end-snapshot-id", snapshotIds.get(1).toString())
- .load(tableLocation)
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ .load(tableLocation);
+ List<SimpleRecord> result1 =
+
resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Records should match", expectedRecords.subList(2, 3),
result1);
+ Assert.assertEquals("Unprocessed count should match record count", 1,
resultDf.count());
}
@Test
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
return false;
}
+ if (readConf.startSnapshotId() != null) {
+ LOG.info("Skipping aggregate pushdown: incremental scan is not
supported");
+ return false;
+ }
+
// If group by expression is the same as the partition, the statistics
information can still
// be used to calculate min/max/count, will enable aggregate push down in
next phase.
// TODO: enable aggregate push down for partition col group by expression
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 44400b5ad4..a14e7b500e 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -297,17 +297,17 @@ public class TestDataSourceOptions {
Assert.assertEquals("Records should match", expectedRecords.subList(1, 4),
result);
// test (2nd snapshot, 3rd snapshot] incremental scan.
- List<SimpleRecord> result1 =
+ Dataset<Row> resultDf =
spark
.read()
.format("iceberg")
.option("start-snapshot-id", snapshotIds.get(2).toString())
.option("end-snapshot-id", snapshotIds.get(1).toString())
- .load(tableLocation)
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ .load(tableLocation);
+ List<SimpleRecord> result1 =
+
resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Records should match", expectedRecords.subList(2, 3),
result1);
+ Assert.assertEquals("Unprocessed count should match record count", 1,
resultDf.count());
}
@Test