This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d731489b43 Spark 3.5: Support Trigger AvailableNow in Structured
Streaming (#13824)
d731489b43 is described below
commit d731489b436179b38fc322e89bb42a24b31851ac
Author: Alex Prosak <[email protected]>
AuthorDate: Mon Sep 8 13:59:23 2025 -0700
Spark 3.5: Support Trigger AvailableNow in Structured Streaming (#13824)
---
.../spark/source/SparkMicroBatchStream.java | 25 +++-
.../spark/source/TestStructuredStreamingRead3.java | 148 +++++++++++++++++++++
2 files changed, 169 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index c6f7e1faca..4b79da8133 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -64,11 +64,11 @@ import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
-import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
+import
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SparkMicroBatchStream implements MicroBatchStream,
SupportsAdmissionControl {
+public class SparkMicroBatchStream implements MicroBatchStream,
SupportsTriggerAvailableNow {
private static final Joiner SLASH = Joiner.on("/");
private static final Logger LOG =
LoggerFactory.getLogger(SparkMicroBatchStream.class);
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE =
Types.StructType.of();
@@ -89,6 +89,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
+ private StreamingOffset lastOffsetForTriggerAvailableNow;
SparkMicroBatchStream(
JavaSparkContext sparkContext,
@@ -384,6 +385,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
validateCurrentSnapshotExists(curSnapshot, startingOffset);
+ // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+ long latestSnapshotId =
+ lastOffsetForTriggerAvailableNow != null
+ ? lastOffsetForTriggerAvailableNow.snapshotId()
+ : table.currentSnapshot().snapshotId();
+
int startPosOfSnapOffset = (int) startingOffset.position();
boolean scanAllFiles = startingOffset.shouldScanAllFiles();
@@ -439,8 +446,8 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
LOG.warn("Failed to close task iterable", ioe);
}
}
- // if the currentSnapShot was also the mostRecentSnapshot then break
- if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ // if the currentSnapShot was also the latestSnapshot then break
+ if (curSnapshot.snapshotId() == latestSnapshotId) {
break;
}
@@ -524,6 +531,16 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
}
}
+ @Override
+ public void prepareForTriggerAvailableNow() {
+ LOG.info("The streaming query reports to use Trigger.AvailableNow");
+
+ lastOffsetForTriggerAvailableNow =
+ (StreamingOffset) latestOffset(initialOffset,
ReadLimit.allAvailable());
+
+ LOG.info("lastOffset for Trigger.AvailableNow is {}",
lastOffsetForTriggerAvailableNow.json());
+ }
+
private static class InitialOffsetStore {
private final Table table;
private final FileIO io;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index c197832948..5a68b7a414 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.Trigger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -159,6 +160,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L),
+ Trigger.AvailableNow());
}
@TestTemplate
@@ -167,6 +173,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
List.of(3L, 2L, 2L));
+
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L),
+ Trigger.AvailableNow());
}
@TestTemplate
@@ -176,6 +187,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L),
+ Trigger.AvailableNow());
+
// soft limit of 1 is being enforced, the stream is not blocked.
StreamingQuery query =
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
@@ -191,6 +207,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"),
List.of(3L, 2L, 2L));
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L),
+ Trigger.AvailableNow());
+
StreamingQuery query =
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"));
@@ -204,6 +225,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
+
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"),
+ List.of(4L, 3L),
+ Trigger.AvailableNow());
}
@TestTemplate
@@ -214,6 +240,121 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
+
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws
Exception {
+ File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
+ File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
+ File output = temp.resolve("junit").toFile();
+
+ DataStreamWriter querySource =
+ spark
+ .readStream()
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .format("parquet")
+ .trigger(Trigger.AvailableNow())
+ .option("path", output.getPath());
+
+ List<SimpleRecord> expected = Lists.newArrayList();
+ for (List<List<SimpleRecord>> expectedCheckpoint :
+ TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+ // New data was added while the stream was not running
+ appendDataAsMultipleSnapshots(expectedCheckpoint);
+
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+ try {
+ StreamingQuery query = querySource.start();
+
+ // Query should terminate on its own after processing all available
data
+ assertThat(query.awaitTermination(60000)).isTrue();
+
+ // Check output
+ List<SimpleRecord> actual =
+ spark
+ .read()
+ .load(output.getPath())
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+
+ // Restarting immediately should not reprocess data
+ query = querySource.start();
+ assertThat(query.awaitTermination(60000)).isTrue();
+ assertThat(query.recentProgress().length).isEqualTo(1);
+ assertThat(query.recentProgress()[0].sources()[0].startOffset())
+ .isEqualTo(query.recentProgress()[0].sources()[0].endOffset());
+ } finally {
+ stopStreams();
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testTriggerAvailableNowDoesNotProcessNewDataWhileRunning()
throws Exception {
+ List<List<SimpleRecord>> expectedData = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expectedData);
+
+ long expectedRecordCount =
expectedData.stream().mapToLong(List::size).sum();
+
+ table.refresh();
+ long expectedSnapshotId = table.currentSnapshot().snapshotId();
+
+ String sinkTable = "availablenow_sink";
+ StreamingQuery query =
+ spark
+ .readStream()
+ .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .format("memory")
+ .queryName(sinkTable)
+ .trigger(Trigger.AvailableNow())
+ .start();
+
+ assertThat(query.isActive()).isTrue();
+
+ // Add new data while the stream is running
+ List<SimpleRecord> newDataDuringStreamSnap1 =
+ Lists.newArrayList(
+ new SimpleRecord(100, "hundred"),
+ new SimpleRecord(101, "hundred-one"),
+ new SimpleRecord(102, "hundred-two"));
+ List<SimpleRecord> newDataDuringStreamSnap2 =
+ Lists.newArrayList(
+ new SimpleRecord(200, "two-hundred"), new SimpleRecord(201,
"two-hundred-one"));
+ appendData(newDataDuringStreamSnap1);
+ appendData(newDataDuringStreamSnap2);
+
+ // Query should terminate on its own after processing all available data
till expectedSnapshotId
+ assertThat(query.awaitTermination(60000)).isTrue();
+
+ List<SimpleRecord> actualResults =
+ spark
+ .sql("SELECT * FROM " + sinkTable)
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ long endOffsetSnapshotId =
+
StreamingOffset.fromJson(query.lastProgress().sources()[0].endOffset()).snapshotId();
+
+ // Verify the stream processed only up to the snapshot present when started
+ assertThat(expectedSnapshotId).isEqualTo(endOffsetSnapshotId);
+
+ // Verify only the initial data was processed
+ assertThat(actualResults.size()).isEqualTo(expectedRecordCount);
+
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedData));
}
@TestTemplate
@@ -754,11 +895,18 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
private void assertMicroBatchRecordSizes(
Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
throws TimeoutException {
+ assertMicroBatchRecordSizes(options, expectedMicroBatchRecordSize,
Trigger.ProcessingTime(0L));
+ }
+
+ private void assertMicroBatchRecordSizes(
+ Map<String, String> options, List<Long> expectedMicroBatchRecordSize,
Trigger trigger)
+ throws TimeoutException {
Dataset<Row> ds =
spark.readStream().options(options).format("iceberg").load(tableName);
List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
ds.writeStream()
.options(options)
+ .trigger(trigger)
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>)
(dataset, batchId) -> {