This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d731489b43 Spark 3.5: Support Trigger AvailableNow in Structured 
Streaming (#13824)
d731489b43 is described below

commit d731489b436179b38fc322e89bb42a24b31851ac
Author: Alex Prosak <[email protected]>
AuthorDate: Mon Sep 8 13:59:23 2025 -0700

    Spark 3.5: Support Trigger AvailableNow in Structured Streaming (#13824)
---
 .../spark/source/SparkMicroBatchStream.java        |  25 +++-
 .../spark/source/TestStructuredStreamingRead3.java | 148 +++++++++++++++++++++
 2 files changed, 169 insertions(+), 4 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index c6f7e1faca..4b79da8133 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -64,11 +64,11 @@ import org.apache.spark.sql.connector.read.streaming.Offset;
 import org.apache.spark.sql.connector.read.streaming.ReadLimit;
 import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
 import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
-import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
+import 
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SparkMicroBatchStream implements MicroBatchStream, 
SupportsAdmissionControl {
+public class SparkMicroBatchStream implements MicroBatchStream, 
SupportsTriggerAvailableNow {
   private static final Joiner SLASH = Joiner.on("/");
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkMicroBatchStream.class);
   private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = 
Types.StructType.of();
@@ -89,6 +89,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
   private final int maxFilesPerMicroBatch;
   private final int maxRecordsPerMicroBatch;
   private final boolean cacheDeleteFilesOnExecutors;
+  private StreamingOffset lastOffsetForTriggerAvailableNow;
 
   SparkMicroBatchStream(
       JavaSparkContext sparkContext,
@@ -384,6 +385,12 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
     validateCurrentSnapshotExists(curSnapshot, startingOffset);
 
+    // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+    long latestSnapshotId =
+        lastOffsetForTriggerAvailableNow != null
+            ? lastOffsetForTriggerAvailableNow.snapshotId()
+            : table.currentSnapshot().snapshotId();
+
     int startPosOfSnapOffset = (int) startingOffset.position();
 
     boolean scanAllFiles = startingOffset.shouldScanAllFiles();
@@ -439,8 +446,8 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
           LOG.warn("Failed to close task iterable", ioe);
         }
       }
-      // if the currentSnapShot was also the mostRecentSnapshot then break
-      if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+      // if the currentSnapShot was also the latestSnapshot then break
+      if (curSnapshot.snapshotId() == latestSnapshotId) {
         break;
       }
 
@@ -524,6 +531,16 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     }
   }
 
+  @Override
+  public void prepareForTriggerAvailableNow() {
+    LOG.info("The streaming query reports to use Trigger.AvailableNow");
+
+    lastOffsetForTriggerAvailableNow =
+        (StreamingOffset) latestOffset(initialOffset, 
ReadLimit.allAvailable());
+
+    LOG.info("lastOffset for Trigger.AvailableNow is {}", 
lastOffsetForTriggerAvailableNow.json());
+  }
+
   private static class InitialOffsetStore {
     private final Table table;
     private final FileIO io;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c197832948..5a68b7a414 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.Trigger;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -159,6 +160,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
   }
 
   @TestTemplate
@@ -167,6 +173,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
         List.of(3L, 2L, 2L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
   }
 
   @TestTemplate
@@ -176,6 +187,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
 
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+
     // soft limit of 1 is being enforced, the stream is not blocked.
     StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
 
@@ -191,6 +207,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"2"),
         List.of(3L, 2L, 2L));
 
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
+
     StreamingQuery query =
         
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "2"));
 
@@ -204,6 +225,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"),
+        List.of(4L, 3L),
+        Trigger.AvailableNow());
   }
 
   @TestTemplate
@@ -214,6 +240,121 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
             SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
             SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was not running
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      try {
+        StreamingQuery query = querySource.start();
+
+        // Query should terminate on its own after processing all available 
data
+        assertThat(query.awaitTermination(60000)).isTrue();
+
+        // Check output
+        List<SimpleRecord> actual =
+            spark
+                .read()
+                .load(output.getPath())
+                .as(Encoders.bean(SimpleRecord.class))
+                .collectAsList();
+        
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+
+        // Restarting immediately should not reprocess data
+        query = querySource.start();
+        assertThat(query.awaitTermination(60000)).isTrue();
+        assertThat(query.recentProgress().length).isEqualTo(1);
+        assertThat(query.recentProgress()[0].sources()[0].startOffset())
+            .isEqualTo(query.recentProgress()[0].sources()[0].endOffset());
+      } finally {
+        stopStreams();
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning() 
throws Exception {
+    List<List<SimpleRecord>> expectedData = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expectedData);
+
+    long expectedRecordCount = 
expectedData.stream().mapToLong(List::size).sum();
+
+    table.refresh();
+    long expectedSnapshotId = table.currentSnapshot().snapshotId();
+
+    String sinkTable = "availablenow_sink";
+    StreamingQuery query =
+        spark
+            .readStream()
+            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .format("memory")
+            .queryName(sinkTable)
+            .trigger(Trigger.AvailableNow())
+            .start();
+
+    assertThat(query.isActive()).isTrue();
+
+    // Add new data while the stream is running
+    List<SimpleRecord> newDataDuringStreamSnap1 =
+        Lists.newArrayList(
+            new SimpleRecord(100, "hundred"),
+            new SimpleRecord(101, "hundred-one"),
+            new SimpleRecord(102, "hundred-two"));
+    List<SimpleRecord> newDataDuringStreamSnap2 =
+        Lists.newArrayList(
+            new SimpleRecord(200, "two-hundred"), new SimpleRecord(201, 
"two-hundred-one"));
+    appendData(newDataDuringStreamSnap1);
+    appendData(newDataDuringStreamSnap2);
+
+    // Query should terminate on its own after processing all available data 
till expectedSnapshotId
+    assertThat(query.awaitTermination(60000)).isTrue();
+
+    List<SimpleRecord> actualResults =
+        spark
+            .sql("SELECT * FROM " + sinkTable)
+            .as(Encoders.bean(SimpleRecord.class))
+            .collectAsList();
+    long endOffsetSnapshotId =
+        
StreamingOffset.fromJson(query.lastProgress().sources()[0].endOffset()).snapshotId();
+
+    // Verify the stream processed only up to the snapshot present when started
+    assertThat(expectedSnapshotId).isEqualTo(endOffsetSnapshotId);
+
+    // Verify only the initial data was processed
+    assertThat(actualResults.size()).isEqualTo(expectedRecordCount);
+    
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedData));
   }
 
   @TestTemplate
@@ -754,11 +895,18 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   private void assertMicroBatchRecordSizes(
       Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
       throws TimeoutException {
+    assertMicroBatchRecordSizes(options, expectedMicroBatchRecordSize, 
Trigger.ProcessingTime(0L));
+  }
+
+  private void assertMicroBatchRecordSizes(
+      Map<String, String> options, List<Long> expectedMicroBatchRecordSize, 
Trigger trigger)
+      throws TimeoutException {
     Dataset<Row> ds = 
spark.readStream().options(options).format("iceberg").load(tableName);
 
     List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
     ds.writeStream()
         .options(options)
+        .trigger(trigger)
         .foreachBatch(
             (VoidFunction2<Dataset<Row>, Long>)
                 (dataset, batchId) -> {

Reply via email to