This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7ef2e9dd Spark: Disable aggregate pushdown for incremental scan 
(#7626)
9d7ef2e9dd is described below

commit 9d7ef2e9dd27375c2b8bd7a143e4f623118185f8
Author: Huaxin Gao <[email protected]>
AuthorDate: Wed May 17 08:44:51 2023 -0700

    Spark: Disable aggregate pushdown for incremental scan (#7626)
---
 .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java |  5 +++++
 .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
 .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java |  5 +++++
 .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
 4 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
       return false;
     }
 
+    if (readConf.startSnapshotId() != null) {
+      LOG.info("Skipping aggregate pushdown: incremental scan is not 
supported");
+      return false;
+    }
+
     // If group by expression is the same as the partition, the statistics 
information can still
     // be used to calculate min/max/count, will enable aggregate push down in 
next phase.
     // TODO: enable aggregate push down for partition col group by expression
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..9f4eab5bb9 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -300,17 +300,17 @@ public class TestDataSourceOptions {
     Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), 
result);
 
     // test (2nd snapshot, 3rd snapshot] incremental scan.
-    List<SimpleRecord> result1 =
+    Dataset<Row> resultDf =
         spark
             .read()
             .format("iceberg")
             .option("start-snapshot-id", snapshotIds.get(2).toString())
             .option("end-snapshot-id", snapshotIds.get(1).toString())
-            .load(tableLocation)
-            .orderBy("id")
-            .as(Encoders.bean(SimpleRecord.class))
-            .collectAsList();
+            .load(tableLocation);
+    List<SimpleRecord> result1 =
+        
resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), 
result1);
+    Assert.assertEquals("Unprocessed count should match record count", 1, 
resultDf.count());
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
       return false;
     }
 
+    if (readConf.startSnapshotId() != null) {
+      LOG.info("Skipping aggregate pushdown: incremental scan is not 
supported");
+      return false;
+    }
+
     // If group by expression is the same as the partition, the statistics 
information can still
     // be used to calculate min/max/count, will enable aggregate push down in 
next phase.
     // TODO: enable aggregate push down for partition col group by expression
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 44400b5ad4..a14e7b500e 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -297,17 +297,17 @@ public class TestDataSourceOptions {
     Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), 
result);
 
     // test (2nd snapshot, 3rd snapshot] incremental scan.
-    List<SimpleRecord> result1 =
+    Dataset<Row> resultDf =
         spark
             .read()
             .format("iceberg")
             .option("start-snapshot-id", snapshotIds.get(2).toString())
             .option("end-snapshot-id", snapshotIds.get(1).toString())
-            .load(tableLocation)
-            .orderBy("id")
-            .as(Encoders.bean(SimpleRecord.class))
-            .collectAsList();
+            .load(tableLocation);
+    List<SimpleRecord> result1 =
+        
resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), 
result1);
+    Assert.assertEquals("Unprocessed count should match record count", 1, 
resultDf.count());
   }
 
   @Test

Reply via email to