Guosmilesmile commented on code in PR #16324:
URL: https://github.com/apache/iceberg/pull/16324#discussion_r3239422967


##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java:
##########
@@ -87,4 +87,33 @@ void testMetadataFilesWithEmptyTable() throws Exception {
       
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
     }
   }
+
+  @Test
+  void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, 
tableLoader()))) {
+      testHarness.open();
+
+      // Add more snapshots AFTER the operator has been opened
+      insert(table, 2, "b");
+      insert(table, 3, "c");
+
+      OperatorTestBase.trigger(testHarness);
+
+      List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+      // Verify that manifest lists from ALL 3 snapshots are present, not just 
the first one.
+      // Without table.refresh() in processElement, only snapshot 1's files 
would be emitted.

Review Comment:
   Can we verify this based on numbers rather than comments?
   
   



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestListMetadataFiles.java:
##########
@@ -87,4 +87,33 @@ void testMetadataFilesWithEmptyTable() throws Exception {
       
assertThat(testHarness.getSideOutput(TaskResultAggregator.ERROR_STREAM)).isNull();
     }
   }
+
+  @Test
+  void testMetadataFilesIncludesSnapshotsAddedAfterOpen() throws Exception {
+    Table table = createTable();
+    insert(table, 1, "a");
+
+    try (OneInputStreamOperatorTestHarness<Trigger, String> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ListMetadataFiles(OperatorTestBase.DUMMY_TABLE_NAME, 0, 
tableLoader()))) {
+      testHarness.open();
+
+      // Add more snapshots AFTER the operator has been opened
+      insert(table, 2, "b");
+      insert(table, 3, "c");
+
+      OperatorTestBase.trigger(testHarness);
+
+      List<String> tableMetadataFiles = testHarness.extractOutputValues();
+
+      // Verify that manifest lists from ALL 3 snapshots are present, not just 
the first one.
+      // Without table.refresh() in processElement, only snapshot 1's files 
would be emitted.
+      table.refresh();
+      for (org.apache.iceberg.Snapshot snapshot : table.snapshots()) {

Review Comment:
   nit: import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to