singhpk234 commented on code in PR #13824:
URL: https://github.com/apache/iceberg/pull/13824#discussion_r2296227662
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws
Exception {
List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
+ @TestTemplate
+ public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"),
+ List.of(4L, 3L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowReadStreamWithCompositeReadLimit() throws
Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L),
+ Trigger.AvailableNow());
+ }
Review Comment:
Can we club them with tests above ? the only difference is `trigger`
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -523,6 +523,11 @@ public ReadLimit getDefaultReadLimit() {
}
}
+ @Override
+ public void prepareForTriggerAvailableNow() {
Review Comment:
shouldn't we finding out the TargetOffset till what we want to process to
here ? Or are we relying on latestOffset(_,_) to do the right thing ?
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsTriggerAvailableNow.java#L38
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws
Exception {
List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
+ @TestTemplate
+ public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"),
+ List.of(4L, 3L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowReadStreamWithCompositeReadLimit() throws
Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L),
+ Trigger.AvailableNow());
+ }
+
+ @TestTemplate
+ public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws
Exception {
+ File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
+ File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
+ File output = temp.resolve("junit").toFile();
+
+ DataStreamWriter querySource =
+ spark
+ .readStream()
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .format("parquet")
+ .trigger(Trigger.AvailableNow())
+ .option("path", output.getPath());
+
+ List<SimpleRecord> expected = Lists.newArrayList();
+ for (List<List<SimpleRecord>> expectedCheckpoint :
+ TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+ // New data was added while the stream was down
Review Comment:
down ? or not started ?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -523,6 +523,11 @@ public ReadLimit getDefaultReadLimit() {
}
}
+ @Override
+ public void prepareForTriggerAvailableNow() {
+ LOG.info("The streaming query reports to use Trigger.AvailableNow");
Review Comment:
IMHO it would be better if we log more info about the data we are going to
process.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]