This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6550486f0c Spark 3.4: Structured Streaming read limit support 
follow-up (#13099)
6550486f0c is described below

commit 6550486f0c640e15bb7c4d6c16b087397656213e
Author: Wing Yew Poon <wyp...@cloudera.com>
AuthorDate: Wed May 21 08:27:38 2025 -0700

    Spark 3.4: Structured Streaming read limit support follow-up (#13099)
    
    backports #12260 to Spark 3.4
---
 .../spark/source/SparkMicroBatchStream.java        | 52 +++++++++++++++++++---
 .../spark/source/TestStructuredStreamingRead3.java | 44 ++++++++++++++----
 2 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 5e10719c0c..d0da342c1c 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -58,9 +58,12 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.connector.read.InputPartition;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.connector.read.streaming.Offset;
 import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
 import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -309,6 +312,47 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     }
   }
 
+  private static int getMaxFiles(ReadLimit readLimit) {
+    if (readLimit instanceof ReadMaxFiles) {
+      return ((ReadMaxFiles) readLimit).maxFiles();
+    }
+
+    if (readLimit instanceof CompositeReadLimit) {
+      // We do not expect a CompositeReadLimit to contain a nested 
CompositeReadLimit.
+      // In fact, it should only be a composite of two or more of ReadMinRows, 
ReadMaxRows and
+      // ReadMaxFiles, with no more than one of each.
+      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
+      for (ReadLimit limit : limits) {
+        if (limit instanceof ReadMaxFiles) {
+          return ((ReadMaxFiles) limit).maxFiles();
+        }
+      }
+    }
+
+    // there is no ReadMaxFiles, so return the default
+    return Integer.MAX_VALUE;
+  }
+
+  private static int getMaxRows(ReadLimit readLimit) {
+    if (readLimit instanceof ReadMaxRows) {
+      long maxRows = ((ReadMaxRows) readLimit).maxRows();
+      return Math.toIntExact(maxRows);
+    }
+
+    if (readLimit instanceof CompositeReadLimit) {
+      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
+      for (ReadLimit limit : limits) {
+        if (limit instanceof ReadMaxRows) {
+          long maxRows = ((ReadMaxRows) limit).maxRows();
+          return Math.toIntExact(maxRows);
+        }
+      }
+    }
+
+    // there is no ReadMaxRows, so return the default
+    return Integer.MAX_VALUE;
+  }
+
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Offset latestOffset(Offset startOffset, ReadLimit limit) {
@@ -368,10 +412,8 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
           while (taskIter.hasNext()) {
             FileScanTask task = taskIter.next();
             if (curPos >= startPosOfSnapOffset) {
-              // TODO : use readLimit provided in function param, the 
readLimits are derived from
-              // these 2 properties.
-              if ((curFilesAdded + 1) > maxFilesPerMicroBatch
-                  || (curRecordCount + task.file().recordCount()) > 
maxRecordsPerMicroBatch) {
+              if ((curFilesAdded + 1) > getMaxFiles(limit)
+                  || (curRecordCount + task.file().recordCount()) > 
getMaxRows(limit)) {
                 shouldContinueReading = false;
                 break;
               }
@@ -431,7 +473,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
         && maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
       ReadLimit[] readLimits = new ReadLimit[2];
       readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
-      readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
+      readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch);
       return ReadLimit.compositeLimit(readLimits);
     } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
       return ReadLimit.maxFiles(maxFilesPerMicroBatch);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 2544a8f739..c0e47cbafb 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -151,8 +151,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   }
 
   @TestTemplate
-  public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
-      throws Exception {
+  public void testReadStreamWithMaxFiles1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
     assertThat(
@@ -162,8 +161,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   }
 
   @TestTemplate
-  public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
-      throws Exception {
+  public void testReadStreamWithMaxFiles2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
     assertThat(
@@ -173,8 +171,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   }
 
   @TestTemplate
-  public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
-      throws Exception {
+  public void testReadStreamWithMaxRows1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
     // only 1 micro-batch will be formed and we will read data partially
@@ -183,7 +180,8 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
                 
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
         .isEqualTo(1);
 
-    StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
+    StreamingQuery query =
+        
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "1"));
 
     // check answer correctness only 1 record read the micro-batch will be 
stuck
     List<SimpleRecord> actual = rowsAvailable(query);
@@ -193,8 +191,24 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   }
 
   @TestTemplate
-  public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
-      throws Exception {
+  public void testReadStreamWithMaxRows2() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
+        .isEqualTo(4);
+
+    StreamingQuery query =
+        
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "2"));
+
+    List<SimpleRecord> actual = rowsAvailable(query);
+    assertThat(actual)
+        
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
+  }
+
+  @TestTemplate
+  public void testReadStreamWithMaxRows4() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
     assertThat(
@@ -203,6 +217,18 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         .isEqualTo(2);
   }
 
+  @TestTemplate
+  public void testReadStreamWithCompositeReadLimit() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    assertThat(
+            microBatchCount(
+                ImmutableMap.of(
+                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
+        .isEqualTo(6);
+  }
+
   @TestTemplate
   public void testReadStreamOnIcebergThenAddData() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;

Reply via email to