singhpk234 commented on code in PR #14510:
URL: https://github.com/apache/iceberg/pull/14510#discussion_r2505953261


##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java:
##########
@@ -229,4 +230,45 @@ public void testCoordinatorRunning() {
     sourceConsumer.rebalance(ImmutableList.of(tp1));
     assertThat(mockIcebergSinkTask.isCoordinatorRunning()).isFalse();
   }
+
+  @Test
+  public void testCoordinatorCommittedOffsetValidation() {
+    // This test demonstrates that the Coordinator's validateAndCommit method
+    // prevents commits when another independent commit has updated the offsets
+    // during the commit process
+
+    // Set the initial offsets
+    table
+        .newAppend()
+        .appendFile(EventTestUtil.createDataFile())
+        .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":1}")
+        .commit();
+
+    Table frozenTable = catalog.loadTable(TABLE_IDENTIFIER);
+
+    // return the original table state on the first load, so that the update 
will happen
+    // during the commit refresh
+    
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(frozenTable).thenCallRealMethod();
+
+    // Independently update the offsets
+    table
+        .newAppend()
+        .appendFile(EventTestUtil.createDataFile())
+        .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}")
+        .commit();
+
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(2);
+    Snapshot firstSnapshot = table.currentSnapshot();
+    assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, 
"{\"0\":7}");
+
+    // Trigger commit to the table
+    coordinatorTest(
+        ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), 
EventTestUtil.now());
+
+    // Assert that the table was not updated and offsets remain
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(2);

Review Comment:
   I see, I was recommending asserting actual snapshot-ids instead of count to 
make sure nothing committed, mostly thinking of a snapshot might have been 
overwritten ? (now that i think its very unlikely) :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to