This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9fc8dfb55d1aef850217824e620fc246c360736f
Author: Harangozó Péter <harangozo...@gmail.com>
AuthorDate: Thu Sep 11 23:18:12 2025 +0200

    [fix][io] Improve Kafka Connect source offset flushing logic (#24654)
    
    (cherry picked from commit a824f04a36e168e415284d599fbe2fb94caa58c8)
---
 pulsar-io/kafka-connect-adaptor/pom.xml            |   6 +
 .../kafka/connect/AbstractKafkaConnectSource.java  | 141 +++++++++-------
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 186 ++++++++++++++++++++-
 3 files changed, 270 insertions(+), 63 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml 
b/pulsar-io/kafka-connect-adaptor/pom.xml
index 1801d850767..de89da85267 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -185,6 +185,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-broker</artifactId>
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 270decd8ab5..e048ddf7244 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -31,11 +31,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -68,7 +70,6 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
     // pulsar io related variables
     private Iterator<SourceRecord> currentBatch = null;
-    private CompletableFuture<Void> flushFuture;
     private OffsetBackingStore offsetStore;
     private OffsetStorageReader offsetReader;
     private String topicNamespace;
@@ -76,6 +77,8 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
     public OffsetStorageWriter offsetWriter;
     // number of outstandingRecords that have been polled but not been acked
     private final AtomicInteger outstandingRecords = new AtomicInteger(0);
+    private final AtomicBoolean flushing = new AtomicBoolean(false);
+    private final AtomicReference<CompletableFuture<Void>> flushFutureRef = 
new AtomicReference<>();
 
     public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";
 
@@ -162,38 +165,103 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
         sourceTask.initialize(sourceTaskContext);
         sourceTask.start(taskConfig);
     }
+    private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> 
snapshotFlushFuture) {
+        if (error != null) {
+            log.error("Failed to flush offsets to storage: ", error);
+            offsetWriter.cancelFlush();
+            snapshotFlushFuture.completeExceptionally(new Exception("No 
Offsets Added Error", error));
+            return;
+        }
+        try {
+            sourceTask.commit();
+            if (log.isDebugEnabled()) {
+                log.debug("Finished flushing offsets to storage");
+            }
+            snapshotFlushFuture.complete(null);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            log.warn("Flush interrupted, cancelling", ie);
+            offsetWriter.cancelFlush();
+            snapshotFlushFuture.completeExceptionally(new Exception("Failed to 
commit offsets", ie));
+        } catch (Throwable t) {
+            log.warn("Flush failed, cancelling", t);
+            offsetWriter.cancelFlush();
+            snapshotFlushFuture.completeExceptionally(new Exception("Failed to 
commit offsets", t));
+        }
+    }
+
+    private void triggerOffsetsFlushIfNeeded() {
+        final CompletableFuture<Void> snapshotFlushFuture = 
flushFutureRef.get();
+        // Only flush when we have a batch in flight, nothing outstanding, and 
a pending future
+        if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || 
outstandingRecords.get() != 0) {
+            return;
+        }
+        if (!flushing.compareAndSet(false, true)) {
+            return; // someone else is flushing
+        }
+        try {
+            if (offsetWriter.beginFlush()) {
+                offsetWriter.doFlush((error, ignored) -> {
+                    try {
+                        onOffsetsFlushed(error, snapshotFlushFuture);
+                    } finally {
+                        flushing.set(false);
+                    }
+                });
+            } else {
+                try {
+                    onOffsetsFlushed(null, snapshotFlushFuture);
+                } finally {
+                    flushing.set(false);
+                }
+            }
+        } catch (ConnectException alreadyFlushing) {
+            // Another thread initiated the flush; let their callback complete 
the future.
+            // Keep 'flushing' = true until read() finalizes the batch.
+        } catch (Exception t) {
+            try {
+                onOffsetsFlushed(t, snapshotFlushFuture);
+            } finally {
+                flushing.set(false);
+            }
+        }
+    }
 
     @Override
     public synchronized Record<T> read() throws Exception {
         while (true) {
             if (currentBatch == null) {
-                flushFuture = new CompletableFuture<>();
                 List<SourceRecord> recordList = sourceTask.poll();
                 if (recordList == null || recordList.isEmpty()) {
                     continue;
                 }
                 outstandingRecords.addAndGet(recordList.size());
                 currentBatch = recordList.iterator();
+
+                final CompletableFuture<Void> newFuture = new 
CompletableFuture<>();
+                flushFutureRef.set(newFuture);
             }
             if (currentBatch.hasNext()) {
                 AbstractKafkaSourceRecord<T> processRecord = 
processSourceRecord(currentBatch.next());
                 if (processRecord == null || processRecord.isEmpty()) {
                     outstandingRecords.decrementAndGet();
-                    continue;
+                    triggerOffsetsFlushIfNeeded();
                 } else {
                     return processRecord;
                 }
             } else {
-                // there is no records any more, then waiting for the batch to 
complete writing
-                // to sink and the offsets are committed as well, then do next 
round read.
+                final CompletableFuture<Void> snapshotFlushFuture = 
flushFutureRef.get();
                 try {
-                    flushFuture.get();
+                    if (snapshotFlushFuture != null) {
+                        snapshotFlushFuture.get();
+                    }
                 } catch (ExecutionException ex) {
-                    // log the error, continue execution
                     log.error("execution exception while get flushFuture", ex);
                     throw new Exception("Flush failed", ex.getCause());
                 } finally {
-                    flushFuture = null;
+                    flushing.set(false);
+                    // Clear only if this is still the current batch future
+                    flushFutureRef.compareAndSet(snapshotFlushFuture, null);
                     currentBatch = null;
                 }
             }
@@ -272,62 +340,19 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
             return this.value == null;
         }
 
-        private void completedFlushOffset(Throwable error, Void result) {
-            if (error != null) {
-                log.error("Failed to flush offsets to storage: ", error);
-                currentBatch = null;
-                offsetWriter.cancelFlush();
-                flushFuture.completeExceptionally(new Exception("No Offsets 
Added Error"));
-            } else {
-                try {
-                    sourceTask.commit();
-
-                    log.info("Finished flushing offsets to storage");
-                    currentBatch = null;
-                    flushFuture.complete(null);
-                } catch (InterruptedException exception) {
-                    log.warn("Flush of {} offsets interrupted, cancelling", 
this);
-                    Thread.currentThread().interrupt();
-                    offsetWriter.cancelFlush();
-                    flushFuture.completeExceptionally(new Exception("Failed to 
commit offsets", exception));
-                } catch (Throwable t) {
-                    // SourceTask can throw unchecked 
ConnectException/KafkaException.
-                    // Make sure the future is cancelled in that case
-                    log.warn("Flush of {} offsets failed, cancelling", this);
-                    offsetWriter.cancelFlush();
-                    flushFuture.completeExceptionally(new Exception("Failed to 
commit offsets", t));
-                }
-            }
-        }
-
         @Override
         public void ack() {
-            // TODO: should flush for each batch. not wait for a time for 
acked all.
-            // How to handle order between each batch. QueueList<pair<batch, 
automicInt>>. check if head is all acked.
-            boolean canFlush = (outstandingRecords.decrementAndGet() == 0);
-
-            // consumed all the records, flush the offsets
-            if (canFlush && flushFuture != null) {
-                if (!offsetWriter.beginFlush()) {
-                    log.error("When beginFlush, No offsets to commit!");
-                    flushFuture.completeExceptionally(new Exception("No 
Offsets Added Error when beginFlush"));
-                    return;
-                }
-
-                Future<Void> doFlush = 
offsetWriter.doFlush(this::completedFlushOffset);
-                if (doFlush == null) {
-                    // Offsets added in processSourceRecord, But here no 
offsets to commit
-                    log.error("No offsets to commit!");
-                    flushFuture.completeExceptionally(new Exception("No 
Offsets Added Error"));
-                    return;
-                }
+            // Decrement and let the centralized flusher decide if we should 
flush now
+            if (outstandingRecords.decrementAndGet() == 0) {
+                triggerOffsetsFlushIfNeeded();
             }
         }
 
         @Override
         public void fail() {
-            if (flushFuture != null) {
-                flushFuture.completeExceptionally(new Exception("Sink Error"));
+            final CompletableFuture<Void> snapshotFlushFuture = 
flushFutureRef.get();
+            if (snapshotFlushFuture != null) {
+                snapshotFlushFuture.completeExceptionally(new Exception("Sink 
Error"));
             }
         }
     }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index dfa3031abb2..5d414b6373c 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.io.kafka.connect;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -28,19 +30,25 @@ import static org.testng.Assert.assertTrue;
 import java.io.File;
 import java.io.OutputStream;
 import java.nio.file.Files;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.kafka.connect.file.FileStreamSourceConnector;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.connect.KafkaConnectSource.KafkaSourceRecord;
+import org.awaitility.Awaitility;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -49,7 +57,7 @@ import org.testng.annotations.Test;
  * Test the implementation of {@link KafkaConnectSource}.
  */
 @Slf4j
-public class KafkaConnectSourceTest extends ProducerConsumerBase  {
+public class KafkaConnectSourceTest extends ProducerConsumerBase {
 
     private String offsetTopicName;
     // The topic to publish data to, for kafkaSource
@@ -96,6 +104,174 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         testOpenAndReadTask(config);
     }
 
+    private OffsetStorageWriter wrapOffsetWriterWithSpy(KafkaConnectSource 
source) throws IllegalAccessException {
+        OffsetStorageWriter original = source.getOffsetWriter();
+        OffsetStorageWriter spy = org.mockito.Mockito.spy(original);
+        FieldUtils.writeField(source, "offsetWriter", spy, true);
+        return spy;
+    }
+
+
+    @Test(timeOut = 30000)
+    public void testFlushWhenAllMessagesFilteredWithoutBlocking() throws 
Exception {
+
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        config.put("transforms", "Filter");
+        config.put("transforms.Filter.type", 
"org.apache.kafka.connect.transforms.Filter");
+        config.put("transforms.Filter.predicate", "DropMeTopic");
+
+        config.put("predicates", "DropMeTopic");
+        config.put("predicates.DropMeTopic.type", 
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
+        config.put("predicates.DropMeTopic.pattern", 
".*my-property/my-ns/kafka-connect-source.*");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        OffsetStorageWriter spyWriter = 
wrapOffsetWriterWithSpy(kafkaConnectSource);
+
+        try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
+            os.write("first\n".getBytes());
+            os.flush();
+            os.write("second\n".getBytes());
+            os.flush();
+        }
+
+        Thread t = new Thread(() -> {
+            try {
+                kafkaConnectSource.read();
+            } catch (Exception e) {
+                // Ignore, it just needs the loop running through a batch 
where every message is filtered out
+                log.error("Exception in read thread", e);
+            }
+        });
+        t.setDaemon(true);
+        t.start();
+
+        try {
+            // First ensure offsets are being written for filtered records.
+            Awaitility.await()
+                    .atMost(Duration.ofSeconds(10))
+                    .untilAsserted(() -> {
+                        Mockito.verify(spyWriter, Mockito.atLeastOnce())
+                                .offset(ArgumentMatchers.any(), 
ArgumentMatchers.any());
+                    });
+
+            // Then ensure a flush cycle is triggered without waiting for any 
acks (since all are filtered).
+            Awaitility.await()
+                    .atMost(java.time.Duration.ofSeconds(10))
+                    .untilAsserted(() -> {
+                        Mockito.verify(spyWriter, Mockito.atLeastOnce())
+                                .beginFlush();
+                        Mockito.verify(spyWriter, Mockito.atLeastOnce())
+                                .doFlush(ArgumentMatchers.any());
+                    });
+        } finally {
+            kafkaConnectSource.close();
+            t.interrupt();
+        }
+    }
+
+    @Test(timeOut = 40000)
+    public void testNoFlushUntilAck() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        OffsetStorageWriter spyWriter = 
wrapOffsetWriterWithSpy(kafkaConnectSource);
+
+        try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
+            os.write("one\n".getBytes());
+            os.flush();
+        }
+
+        Record<KeyValue<byte[], byte[]>> readRecord = 
kafkaConnectSource.read();
+
+        // Verify no flush happens while there is one outstanding record (no 
ack yet)
+        verify(spyWriter, timeout(10000).times(0)).beginFlush();
+        verify(spyWriter, timeout(10000).times(0))
+                .doFlush(ArgumentMatchers.any());
+
+        readRecord.ack();
+
+        // Verify flush happens after ack
+        verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush();
+        verify(spyWriter, timeout(10000).atLeastOnce())
+                .doFlush(ArgumentMatchers.any());
+
+        kafkaConnectSource.close();
+    }
+
+    @Test(timeOut = 40000)
+    public void testPartialAckNoFlush_ThenFlushOnAllAck() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        OffsetStorageWriter spyWriter = 
wrapOffsetWriterWithSpy(kafkaConnectSource);
+
+        try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
+            os.write("first\n".getBytes());
+            os.flush();
+            os.write("second\n".getBytes());
+            os.flush();
+        }
+
+        Record<KeyValue<byte[], byte[]>> r1 = kafkaConnectSource.read();
+        Record<KeyValue<byte[], byte[]>> r2 = kafkaConnectSource.read();
+
+        // Ack only the first; one outstanding remains -> no flush should 
happen
+        r1.ack();
+        verify(spyWriter, timeout(10000).times(0)).beginFlush();
+        verify(spyWriter, timeout(10000).times(0))
+                .doFlush(ArgumentMatchers.any());
+
+        // Ack the second; outstanding reaches zero -> flush should be 
triggered
+        r2.ack();
+        verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush();
+        verify(spyWriter, timeout(10000).atLeastOnce())
+                .doFlush(ArgumentMatchers.any());
+
+        kafkaConnectSource.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testAckFlush() throws Exception {
+        Map<String, Object> config = getConfig();
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.kafka.connect.file.FileStreamSourceTask");
+
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        OffsetStorageWriter spyWriter = 
wrapOffsetWriterWithSpy(kafkaConnectSource);
+
+        try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
+            os.write("first\n".getBytes());
+            os.flush();
+            os.write("second\n".getBytes());
+            os.flush();
+        }
+
+        Record<KeyValue<byte[], byte[]>> r1 = kafkaConnectSource.read();
+        Record<KeyValue<byte[], byte[]>> r2 = kafkaConnectSource.read();
+
+        // Ack both, no outstanding remains -> flush should happen
+        r1.ack();
+        r2.ack();
+
+        verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush();
+        verify(spyWriter, timeout(10000).atLeastOnce())
+                .doFlush(ArgumentMatchers.any());
+
+        kafkaConnectSource.close();
+    }
+
+
     @Test
     public void testOpenAndReadTaskDirect() throws Exception {
         Map<String, Object> config = getConfig();
@@ -157,8 +333,8 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         sourceOffset.put("test", 0);
         value.put("myField", "42");
         SourceRecord srcRecord = new SourceRecord(
-            sourcePartition, sourceOffset, topicName, null,
-            null, null, null, value
+                sourcePartition, sourceOffset, topicName, null,
+                null, null, null, value
         );
 
         KafkaSourceRecord record = 
kafkaConnectSource.processSourceRecord(srcRecord);
@@ -197,8 +373,8 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         Map<String, Object> value = new HashMap<>();
         value.put("myField", "42");
         SourceRecord record = new SourceRecord(
-            null, null, "test-topic", null,
-            null, null, null, value
+                null, null, "test-topic", null,
+                null, null, null, value
         );
 
         SourceRecord transformed = kafkaConnectSource.applyTransforms(record);

Reply via email to