This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fe32aa8f7 Kafka Connect: Surface commit failures instead of silently 
swallowing them (#16237)
2fe32aa8f7 is described below

commit 2fe32aa8f77ebaee5359cabd4592300e1df69e72
Author: Anupam Yadav <[email protected]>
AuthorDate: Mon May 18 13:44:30 2026 -0700

    Kafka Connect: Surface commit failures instead of silently swallowing them 
(#16237)
    
    * Kafka Connect: Surface commit failures instead of silently swallowing them
    
    Narrow the catch around doCommit() and rethrow on full-commit
    failures. Partial-commit failures (triggered by commit timeout) are
    logged at WARN and swallowed since the coordinator will retry on
    the next cycle.
    
    This ensures commit failures surface to operators by terminating
    the coordinator thread, which transitions the Connect task to FAILED.
    
    Fixes #15878
    
    * Retrigger CI
    
    * Retrigger CI (attempt 2)
---
 .../iceberg/connect/channel/Coordinator.java       | 17 +++++---
 .../iceberg/connect/channel/TestCoordinator.java   | 48 ++++++++++++++++++----
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
index c986f8afc2..4b46b941f4 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java
@@ -150,12 +150,17 @@ class Coordinator extends Channel {
   private void commit(boolean partialCommit) {
     try {
       doCommit(partialCommit);
-    } catch (Exception e) {
-      LOG.warn(
-          "Coordinator {} failed to commit for commit {}, will try again next 
cycle",
-          taskId,
-          commitState.currentCommitId(),
-          e);
+    } catch (RuntimeException e) {
+      if (partialCommit) {
+        LOG.warn(
+            "Partial commit {} failed for task {}, will retry",
+            commitState.currentCommitId(),
+            taskId,
+            e);
+      } else {
+        LOG.error("Commit {} failed for task {}", 
commitState.currentCommitId(), taskId, e);
+        throw e;
+      }
     } finally {
       commitState.endCurrentCommit();
     }
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
index ed370fcdad..0b5553e127 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java
@@ -19,12 +19,16 @@
 package org.apache.iceberg.connect.channel;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.time.OffsetDateTime;
 import java.util.List;
 import java.util.UUID;
+import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DataOperations;
@@ -45,6 +49,8 @@ import org.apache.iceberg.connect.events.PayloadType;
 import org.apache.iceberg.connect.events.StartCommit;
 import org.apache.iceberg.connect.events.TableReference;
 import org.apache.iceberg.connect.events.TopicPartitionOffset;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types.StructType;
@@ -135,14 +141,35 @@ public class TestCoordinator extends ChannelTestBase {
             .withRecordCount(5)
             .build();
 
-    coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
+    assertThatThrownBy(
+            () -> coordinatorTest(ImmutableList.of(badDataFile), 
ImmutableList.of(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot find partition spec");
 
-    // no commit messages sent
     assertThat(producer.history()).hasSize(1);
-
     assertThat(table.snapshots()).isEmpty();
   }
 
+  @Test
+  public void testCommitFailedExceptionPropagates() {
+    Table spiedTable = spy(table);
+    AppendFiles spiedAppend = spy(table.newAppend());
+    doThrow(new CommitFailedException("Glue detected concurrent update"))
+        .when(spiedAppend)
+        .commit();
+    when(spiedTable.newAppend()).thenReturn(spiedAppend);
+    when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);
+
+    assertThatThrownBy(
+            () ->
+                coordinatorTest(
+                    ImmutableList.of(EventTestUtil.createDataFile()),
+                    ImmutableList.of(),
+                    EventTestUtil.now()))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Glue detected concurrent update");
+  }
+
   private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
     byte[] bytes = producer.history().get(idx).value();
     Event commitTable = AvroUtil.decode(bytes);
@@ -289,13 +316,18 @@ public class TestCoordinator extends ChannelTestBase {
     Snapshot firstSnapshot = table.currentSnapshot();
     assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, 
"{\"0\":7}");
 
-    // Trigger commit to the table
-    coordinatorTest(
-        ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), 
EventTestUtil.now());
+    // Trigger commit to the table - should throw ValidationException
+    assertThatThrownBy(
+            () ->
+                coordinatorTest(
+                    ImmutableList.of(EventTestUtil.createDataFile()),
+                    ImmutableList.of(),
+                    EventTestUtil.now()))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("stale offsets");
 
-    // Assert that the table was not updated and offsets remain
     table.refresh();
     assertThat(table.snapshots()).hasSize(2);
-    assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, 
"{\"0\":7}");
+    
assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP,
 "{\"0\":7}");
   }
 }

Reply via email to