singhpk234 commented on code in PR #13824:
URL: https://github.com/apache/iceberg/pull/13824#discussion_r2296227662


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"),
+        List.of(4L, 3L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithCompositeReadLimit() throws 
Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }

Review Comment:
   Can we club them with tests above ? the only difference is `trigger`



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -523,6 +523,11 @@ public ReadLimit getDefaultReadLimit() {
     }
   }
 
+  @Override
+  public void prepareForTriggerAvailableNow() {

Review Comment:
   shouldn't we finding out the TargetOffset till what we want to process to 
here ? Or are we relying on latestOffset(_,_) to do the right thing ? 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsTriggerAvailableNow.java#L38



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java:
##########
@@ -216,6 +217,87 @@ public void testReadStreamWithCompositeReadLimit() throws 
Exception {
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxFiles2() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithMaxRows4() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"),
+        List.of(4L, 3L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowReadStreamWithCompositeReadLimit() throws 
Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L),
+        Trigger.AvailableNow());
+  }
+
+  @TestTemplate
+  public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
+    File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
+    File output = temp.resolve("junit").toFile();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .trigger(Trigger.AvailableNow())
+            .option("path", output.getPath());
+
+    List<SimpleRecord> expected = Lists.newArrayList();
+    for (List<List<SimpleRecord>> expectedCheckpoint :
+        TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
+
+      // New data was added while the stream was down

Review Comment:
   down ? or not started ?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -523,6 +523,11 @@ public ReadLimit getDefaultReadLimit() {
     }
   }
 
+  @Override
+  public void prepareForTriggerAvailableNow() {
+    LOG.info("The streaming query reports to use Trigger.AvailableNow");

Review Comment:
   IMHO it would be better if we log more info about the data we are going to 
process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to