This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f4ac21eeed [feat][io] implement pip-297 for jdbc sinks (#25195)
6f4ac21eeed is described below
commit 6f4ac21eeed7bf9a8d4306c17468ce55b74f84a1
Author: Bäm <[email protected]>
AuthorDate: Fri Feb 6 12:15:52 2026 +0100
[feat][io] implement pip-297 for jdbc sinks (#25195)
---
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 25 +++++++-
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 74 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 2 deletions(-)
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index ca33b3cfdab..73ba6b712f0 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
@@ -50,6 +51,11 @@ import org.apache.pulsar.io.core.SinkContext;
*/
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {
+
+ private enum State {
+ OPEN, FAILED, CLOSED
+ }
+
// ----- Runtime fields
protected JdbcSinkConfig jdbcSinkConfig;
@Getter
@@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T>
{
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;
+ private SinkContext sinkContext;
+ private final AtomicReference<State> state = new
AtomicReference<>(State.OPEN);
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ this.sinkContext = sinkContext;
jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
jdbcSinkConfig.validate();
@@ -148,6 +157,7 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
@Override
public void close() throws Exception {
+ state.set(State.CLOSED);
if (flushExecutor != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flushExecutor.shutdown();
@@ -310,8 +320,9 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
connection.rollback();
}
} catch (Exception ex) {
- throw new RuntimeException(ex);
+ log.error("Failed to rollback transaction", ex);
}
+ fatal(e);
}
isFlushing.set(false);
@@ -385,4 +396,16 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
return true;
}
+ /**
+ * Signal a fatal exception to the framework.
+ * This will cause the function instance to terminate properly.
+ *
+ * @param e the fatal exception
+ */
+ private void fatal(Exception e) {
+ if (sinkContext != null && state.compareAndSet(State.OPEN,
State.FAILED)) {
+ sinkContext.fatal(e);
+ }
+ }
+
}
diff --git
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 901ac9f1e39..7cab33df309 100644
---
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -18,11 +18,16 @@
*/
package org.apache.pulsar.io.jdbc;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -56,6 +61,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.io.core.SinkContext;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -133,7 +139,9 @@ public class SqliteJdbcSinkTest {
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
- jdbcSink.close();
+ if (jdbcSink != null) {
+ jdbcSink.close();
+ }
sqliteUtils.tearDown();
}
@@ -860,6 +868,70 @@ public class SqliteJdbcSinkTest {
}
}
+ /**
+ * Test that fatal() is called when an unrecoverable exception occurs
during flush.
+ * This verifies the PIP-297 implementation for proper termination of the
sink.
+ *
+ * The test works by:
+ * 1. Opening the sink with a valid table (so open() succeeds)
+ * 2. Using reflection to replace the insertStatement with a mock that
throws SQLException
+ * 3. Writing a record to trigger flush
+ * 4. Verifying that fatal() was called with the exception
+ */
+ @Test
+ public void testFatalCalledOnFlushException() throws Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+
+ String jdbcUrl = sqliteUtils.sqliteUri();
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName); // Use valid table so open()
succeeds
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ conf.put("batchSize", 1);
+
+ SinkContext mockSinkContext = mock(SinkContext.class);
+ AtomicReference<Throwable> fatalException = new AtomicReference<>();
+ doAnswer(invocation -> {
+ fatalException.set(invocation.getArgument(0));
+ return null;
+ }).when(mockSinkContext).fatal(any(Throwable.class));
+
+ SqliteJdbcAutoSchemaSink sinkWithContext = new
SqliteJdbcAutoSchemaSink();
+ try {
+ sinkWithContext.open(conf, mockSinkContext);
+
+ // Create a mock PreparedStatement that throws SQLException on
execute()
+ PreparedStatement mockStatement = mock(PreparedStatement.class);
+ SQLException simulatedException = new SQLException("Simulated
database connection failure");
+ doThrow(simulatedException).when(mockStatement).execute();
+ doThrow(simulatedException).when(mockStatement).executeBatch();
+
+ // Use reflection to replace the insertStatement with our mock
+ FieldUtils.writeField(sinkWithContext, "insertStatement",
mockStatement, true);
+
+ Foo insertObj = new Foo("f1", "f2", 1);
+ Map<String, String> props = Maps.newHashMap();
+ props.put("ACTION", "INSERT");
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ sinkWithContext.write(createMockFooRecord(insertObj, props,
future));
+
+ // Wait for the flush to complete and fail
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
{
+ verify(mockSinkContext).fatal(any(Throwable.class));
+ Assert.assertNotNull(fatalException.get());
+ Assert.assertTrue(fatalException.get() instanceof
SQLException);
+ Assert.assertEquals(fatalException.get().getMessage(),
"Simulated database connection failure");
+ });
+
+ // Verify the record was failed (not acked)
+ Assert.assertFalse(future.get(1, TimeUnit.SECONDS));
+ } finally {
+ sinkWithContext.close();
+ }
+ }
+
@SuppressWarnings("unchecked")
private Record<GenericObject> createMockFooRecord(Foo record, Map<String,
String> actionProperties,
CompletableFuture<Boolean> future) {