This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 998a4b1677b73c24de1af23931c02badbd9b57a0
Author: Bäm <[email protected]>
AuthorDate: Fri Feb 6 12:15:52 2026 +0100

    [feat][io] implement pip-297 for jdbc sinks (#25195)
    
    (cherry picked from commit 6f4ac21eeed7bf9a8d4306c17468ce55b74f84a1)
---
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    | 25 +++++++-
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 74 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index ca33b3cfdab..73ba6b712f0 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
@@ -50,6 +51,11 @@ import org.apache.pulsar.io.core.SinkContext;
  */
 @Slf4j
 public abstract class JdbcAbstractSink<T> implements Sink<T> {
+
+    private enum State {
+        OPEN, FAILED, CLOSED
+    }
+
     // ----- Runtime fields
     protected JdbcSinkConfig jdbcSinkConfig;
     @Getter
@@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> 
{
     private AtomicBoolean isFlushing;
     private int batchSize;
     private ScheduledExecutorService flushExecutor;
+    private SinkContext sinkContext;
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.OPEN);
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        this.sinkContext = sinkContext;
         jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
         jdbcSinkConfig.validate();
 
@@ -148,6 +157,7 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
     @Override
     public void close() throws Exception {
+        state.set(State.CLOSED);
         if (flushExecutor != null) {
             int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
             flushExecutor.shutdown();
@@ -310,8 +320,9 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
                         connection.rollback();
                     }
                 } catch (Exception ex) {
-                    throw new RuntimeException(ex);
+                    log.error("Failed to rollback transaction", ex);
                 }
+                fatal(e);
             }
 
             isFlushing.set(false);
@@ -385,4 +396,16 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
         return true;
     }
 
+    /**
+     * Signal a fatal exception to the framework.
+     * This will cause the function instance to terminate properly.
+     *
+     * @param e the fatal exception
+     */
+    private void fatal(Exception e) {
+        if (sinkContext != null && state.compareAndSet(State.OPEN, 
State.FAILED)) {
+            sinkContext.fatal(e);
+        }
+    }
+
 }
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 901ac9f1e39..7cab33df309 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -18,11 +18,16 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -56,6 +61,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.io.core.SinkContext;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -133,7 +139,9 @@ public class SqliteJdbcSinkTest {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws Exception {
-        jdbcSink.close();
+        if (jdbcSink != null) {
+            jdbcSink.close();
+        }
         sqliteUtils.tearDown();
     }
 
@@ -860,6 +868,70 @@ public class SqliteJdbcSinkTest {
         }
     }
 
+    /**
+     * Test that fatal() is called when an unrecoverable exception occurs 
during flush.
+     * This verifies the PIP-297 implementation for proper termination of the 
sink.
+     *
+     * The test works by:
+     * 1. Opening the sink with a valid table (so open() succeeds)
+     * 2. Using reflection to replace the insertStatement with a mock that 
throws SQLException
+     * 3. Writing a record to trigger flush
+     * 4. Verifying that fatal() was called with the exception
+     */
+    @Test
+    public void testFatalCalledOnFlushException() throws Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);  // Use valid table so open() 
succeeds
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        conf.put("batchSize", 1);
+
+        SinkContext mockSinkContext = mock(SinkContext.class);
+        AtomicReference<Throwable> fatalException = new AtomicReference<>();
+        doAnswer(invocation -> {
+            fatalException.set(invocation.getArgument(0));
+            return null;
+        }).when(mockSinkContext).fatal(any(Throwable.class));
+
+        SqliteJdbcAutoSchemaSink sinkWithContext = new 
SqliteJdbcAutoSchemaSink();
+        try {
+            sinkWithContext.open(conf, mockSinkContext);
+
+            // Create a mock PreparedStatement that throws SQLException on 
execute()
+            PreparedStatement mockStatement = mock(PreparedStatement.class);
+            SQLException simulatedException = new SQLException("Simulated 
database connection failure");
+            doThrow(simulatedException).when(mockStatement).execute();
+            doThrow(simulatedException).when(mockStatement).executeBatch();
+
+            // Use reflection to replace the insertStatement with our mock
+            FieldUtils.writeField(sinkWithContext, "insertStatement", 
mockStatement, true);
+
+            Foo insertObj = new Foo("f1", "f2", 1);
+            Map<String, String> props = Maps.newHashMap();
+            props.put("ACTION", "INSERT");
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            sinkWithContext.write(createMockFooRecord(insertObj, props, 
future));
+
+            // Wait for the flush to complete and fail
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> 
{
+                verify(mockSinkContext).fatal(any(Throwable.class));
+                Assert.assertNotNull(fatalException.get());
+                Assert.assertTrue(fatalException.get() instanceof 
SQLException);
+                Assert.assertEquals(fatalException.get().getMessage(), 
"Simulated database connection failure");
+            });
+
+            // Verify the record was failed (not acked)
+            Assert.assertFalse(future.get(1, TimeUnit.SECONDS));
+        } finally {
+            sinkWithContext.close();
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private Record<GenericObject> createMockFooRecord(Foo record, Map<String, 
String> actionProperties,
                                                         
CompletableFuture<Boolean> future) {

Reply via email to