This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 998a4b1677b73c24de1af23931c02badbd9b57a0 Author: Bäm <[email protected]> AuthorDate: Fri Feb 6 12:15:52 2026 +0100 [feat][io] implement pip-297 for jdbc sinks (#25195) (cherry picked from commit 6f4ac21eeed7bf9a8d4306c17468ce55b74f84a1) --- .../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 25 +++++++- .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 74 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index ca33b3cfdab..73ba6b712f0 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import lombok.AllArgsConstructor; @@ -50,6 +51,11 @@ import org.apache.pulsar.io.core.SinkContext; */ @Slf4j public abstract class JdbcAbstractSink<T> implements Sink<T> { + + private enum State { + OPEN, FAILED, CLOSED + } + // ----- Runtime fields protected JdbcSinkConfig jdbcSinkConfig; @Getter @@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { private AtomicBoolean isFlushing; private int batchSize; private ScheduledExecutorService flushExecutor; + private SinkContext sinkContext; + private final AtomicReference<State> state = new AtomicReference<>(State.OPEN); @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + this.sinkContext = sinkContext; jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext); jdbcSinkConfig.validate(); @@ -148,6 +157,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { @Override public void close() throws Exception { + state.set(State.CLOSED); if (flushExecutor != null) { int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2; flushExecutor.shutdown(); @@ -310,8 +320,9 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { connection.rollback(); } } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to rollback transaction", ex); } + fatal(e); } isFlushing.set(false); @@ -385,4 +396,16 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { return true; } + /** + * Signal a fatal exception to the framework. + * This will cause the function instance to terminate properly. + * + * @param e the fatal exception + */ + private void fatal(Exception e) { + if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) { + sinkContext.fatal(e); + } + } + } diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index 901ac9f1e39..7cab33df309 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -18,11 +18,16 @@ */ package org.apache.pulsar.io.jdbc; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -56,6 +61,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.io.core.SinkContext; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -133,7 +139,9 @@ public class SqliteJdbcSinkTest { @AfterMethod(alwaysRun = true) public void tearDown() throws Exception { - jdbcSink.close(); + if (jdbcSink != null) { + jdbcSink.close(); + } sqliteUtils.tearDown(); } @@ -860,6 +868,70 @@ public class SqliteJdbcSinkTest { } } + /** + * Test that fatal() is called when an unrecoverable exception occurs during flush. + * This verifies the PIP-297 implementation for proper termination of the sink. + * + * The test works by: + * 1. Opening the sink with a valid table (so open() succeeds) + * 2. Using reflection to replace the insertStatement with a mock that throws SQLException + * 3. Writing a record to trigger flush + * 4. Verifying that fatal() was called with the exception + */ + @Test + public void testFatalCalledOnFlushException() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map<String, Object> conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); // Use valid table so open() succeeds + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SinkContext mockSinkContext = mock(SinkContext.class); + AtomicReference<Throwable> fatalException = new AtomicReference<>(); + doAnswer(invocation -> { + fatalException.set(invocation.getArgument(0)); + return null; + }).when(mockSinkContext).fatal(any(Throwable.class)); + + SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink(); + try { + sinkWithContext.open(conf, mockSinkContext); + + // Create a mock PreparedStatement that throws SQLException on execute() + PreparedStatement mockStatement = mock(PreparedStatement.class); + SQLException simulatedException = new SQLException("Simulated database connection failure"); + doThrow(simulatedException).when(mockStatement).execute(); + doThrow(simulatedException).when(mockStatement).executeBatch(); + + // Use reflection to replace the insertStatement with our mock + FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true); + + Foo insertObj = new Foo("f1", "f2", 1); + Map<String, String> props = Maps.newHashMap(); + props.put("ACTION", "INSERT"); + CompletableFuture<Boolean> future = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(insertObj, props, future)); + + // Wait for the flush to complete and fail + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + verify(mockSinkContext).fatal(any(Throwable.class)); + Assert.assertNotNull(fatalException.get()); + Assert.assertTrue(fatalException.get() instanceof SQLException); + Assert.assertEquals(fatalException.get().getMessage(), "Simulated database connection failure"); + }); + + // Verify the record was failed (not acked) + Assert.assertFalse(future.get(1, TimeUnit.SECONDS)); + } finally { + sinkWithContext.close(); + } + } + @SuppressWarnings("unchecked") private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties, CompletableFuture<Boolean> future) {
