This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9fc8dfb55d1aef850217824e620fc246c360736f Author: Harangozó Péter <harangozo...@gmail.com> AuthorDate: Thu Sep 11 23:18:12 2025 +0200 [fix][io] Improve Kafka Connect source offset flushing logic (#24654) (cherry picked from commit a824f04a36e168e415284d599fbe2fb94caa58c8) --- pulsar-io/kafka-connect-adaptor/pom.xml | 6 + .../kafka/connect/AbstractKafkaConnectSource.java | 141 +++++++++------- .../io/kafka/connect/KafkaConnectSourceTest.java | 186 ++++++++++++++++++++- 3 files changed, 270 insertions(+), 63 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index 1801d850767..de89da85267 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -185,6 +185,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-broker</artifactId> diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 270decd8ab5..e048ddf7244 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -31,11 +31,13 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; @@ -68,7 +70,6 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { // pulsar io related variables private Iterator<SourceRecord> currentBatch = null; - private CompletableFuture<Void> flushFuture; private OffsetBackingStore offsetStore; private OffsetStorageReader offsetReader; private String topicNamespace; @@ -76,6 +77,8 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { public OffsetStorageWriter offsetWriter; // number of outstandingRecords that have been polled but not been acked private final AtomicInteger outstandingRecords = new AtomicInteger(0); + private final AtomicBoolean flushing = new AtomicBoolean(false); + private final AtomicReference<CompletableFuture<Void>> flushFutureRef = new AtomicReference<>(); public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass"; @@ -162,38 +165,103 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { sourceTask.initialize(sourceTaskContext); sourceTask.start(taskConfig); } + private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> snapshotFlushFuture) { + if (error != null) { + log.error("Failed to flush offsets to storage: ", error); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("No Offsets Added Error", error)); + return; + } + try { + sourceTask.commit(); + if (log.isDebugEnabled()) { + log.debug("Finished flushing offsets to storage"); + } + snapshotFlushFuture.complete(null); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("Flush interrupted, cancelling", ie); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", ie)); + } catch (Throwable t) { + log.warn("Flush failed, cancelling", t); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", t)); + } + } + + private void triggerOffsetsFlushIfNeeded() { + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); + // Only flush when we have a batch in flight, nothing outstanding, and a pending future + if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || outstandingRecords.get() != 0) { + return; + } + if (!flushing.compareAndSet(false, true)) { + return; // someone else is flushing + } + try { + if (offsetWriter.beginFlush()) { + offsetWriter.doFlush((error, ignored) -> { + try { + onOffsetsFlushed(error, snapshotFlushFuture); + } finally { + flushing.set(false); + } + }); + } else { + try { + onOffsetsFlushed(null, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } catch (ConnectException alreadyFlushing) { + // Another thread initiated the flush; let their callback complete the future. + // Keep 'flushing' = true until read() finalizes the batch. + } catch (Exception t) { + try { + onOffsetsFlushed(t, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } @Override public synchronized Record<T> read() throws Exception { while (true) { if (currentBatch == null) { - flushFuture = new CompletableFuture<>(); List<SourceRecord> recordList = sourceTask.poll(); if (recordList == null || recordList.isEmpty()) { continue; } outstandingRecords.addAndGet(recordList.size()); currentBatch = recordList.iterator(); + + final CompletableFuture<Void> newFuture = new CompletableFuture<>(); + flushFutureRef.set(newFuture); } if (currentBatch.hasNext()) { AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next()); if (processRecord == null || processRecord.isEmpty()) { outstandingRecords.decrementAndGet(); - continue; + triggerOffsetsFlushIfNeeded(); } else { return processRecord; } } else { - // there is no records any more, then waiting for the batch to complete writing - // to sink and the offsets are committed as well, then do next round read. + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); try { - flushFuture.get(); + if (snapshotFlushFuture != null) { + snapshotFlushFuture.get(); + } } catch (ExecutionException ex) { - // log the error, continue execution log.error("execution exception while get flushFuture", ex); throw new Exception("Flush failed", ex.getCause()); } finally { - flushFuture = null; + flushing.set(false); + // Clear only if this is still the current batch future + flushFutureRef.compareAndSet(snapshotFlushFuture, null); currentBatch = null; } } @@ -272,62 +340,19 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { return this.value == null; } - private void completedFlushOffset(Throwable error, Void result) { - if (error != null) { - log.error("Failed to flush offsets to storage: ", error); - currentBatch = null; - offsetWriter.cancelFlush(); - flushFuture.completeExceptionally(new Exception("No Offsets Added Error")); - } else { - try { - sourceTask.commit(); - - log.info("Finished flushing offsets to storage"); - currentBatch = null; - flushFuture.complete(null); - } catch (InterruptedException exception) { - log.warn("Flush of {} offsets interrupted, cancelling", this); - Thread.currentThread().interrupt(); - offsetWriter.cancelFlush(); - flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception)); - } catch (Throwable t) { - // SourceTask can throw unchecked ConnectException/KafkaException. - // Make sure the future is cancelled in that case - log.warn("Flush of {} offsets failed, cancelling", this); - offsetWriter.cancelFlush(); - flushFuture.completeExceptionally(new Exception("Failed to commit offsets", t)); - } - } - } - @Override public void ack() { - // TODO: should flush for each batch. not wait for a time for acked all. - // How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked. - boolean canFlush = (outstandingRecords.decrementAndGet() == 0); - - // consumed all the records, flush the offsets - if (canFlush && flushFuture != null) { - if (!offsetWriter.beginFlush()) { - log.error("When beginFlush, No offsets to commit!"); - flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush")); - return; - } - - Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset); - if (doFlush == null) { - // Offsets added in processSourceRecord, But here no offsets to commit - log.error("No offsets to commit!"); - flushFuture.completeExceptionally(new Exception("No Offsets Added Error")); - return; - } + // Decrement and let the centralized flusher decide if we should flush now + if (outstandingRecords.decrementAndGet() == 0) { + triggerOffsetsFlushIfNeeded(); } } @Override public void fail() { - if (flushFuture != null) { - flushFuture.completeExceptionally(new Exception("Sink Error")); + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); + if (snapshotFlushFuture != null) { + snapshotFlushFuture.completeExceptionally(new Exception("Sink Error")); } } } diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index dfa3031abb2..5d414b6373c 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.io.kafka.connect; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -28,19 +30,25 @@ import static org.testng.Assert.assertTrue; import java.io.File; import java.io.OutputStream; import java.nio.file.Files; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.connect.KafkaConnectSource.KafkaSourceRecord; +import org.awaitility.Awaitility; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -49,7 +57,7 @@ import org.testng.annotations.Test; * Test the implementation of {@link KafkaConnectSource}. */ @Slf4j -public class KafkaConnectSourceTest extends ProducerConsumerBase { +public class KafkaConnectSourceTest extends ProducerConsumerBase { private String offsetTopicName; // The topic to publish data to, for kafkaSource @@ -96,6 +104,174 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { testOpenAndReadTask(config); } + private OffsetStorageWriter wrapOffsetWriterWithSpy(KafkaConnectSource source) throws IllegalAccessException { + OffsetStorageWriter original = source.getOffsetWriter(); + OffsetStorageWriter spy = org.mockito.Mockito.spy(original); + FieldUtils.writeField(source, "offsetWriter", spy, true); + return spy; + } + + + @Test(timeOut = 30000) + public void testFlushWhenAllMessagesFilteredWithoutBlocking() throws Exception { + + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + config.put("transforms", "Filter"); + config.put("transforms.Filter.type", "org.apache.kafka.connect.transforms.Filter"); + config.put("transforms.Filter.predicate", "DropMeTopic"); + + config.put("predicates", "DropMeTopic"); + config.put("predicates.DropMeTopic.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.DropMeTopic.pattern", ".*my-property/my-ns/kafka-connect-source.*"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter spyWriter = wrapOffsetWriterWithSpy(kafkaConnectSource); + + try (OutputStream os = Files.newOutputStream(tempFile.toPath())) { + os.write("first\n".getBytes()); + os.flush(); + os.write("second\n".getBytes()); + os.flush(); + } + + Thread t = new Thread(() -> { + try { + kafkaConnectSource.read(); + } catch (Exception e) { + // Ignore, it just needs the loop running through a batch where every message is filtered out + log.error("Exception in read thread", e); + } + }); + t.setDaemon(true); + t.start(); + + try { + // First ensure offsets are being written for filtered records. + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + Mockito.verify(spyWriter, Mockito.atLeastOnce()) + .offset(ArgumentMatchers.any(), ArgumentMatchers.any()); + }); + + // Then ensure a flush cycle is triggered without waiting for any acks (since all are filtered). + Awaitility.await() + .atMost(java.time.Duration.ofSeconds(10)) + .untilAsserted(() -> { + Mockito.verify(spyWriter, Mockito.atLeastOnce()) + .beginFlush(); + Mockito.verify(spyWriter, Mockito.atLeastOnce()) + .doFlush(ArgumentMatchers.any()); + }); + } finally { + kafkaConnectSource.close(); + t.interrupt(); + } + } + + @Test(timeOut = 40000) + public void testNoFlushUntilAck() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter spyWriter = wrapOffsetWriterWithSpy(kafkaConnectSource); + + try (OutputStream os = Files.newOutputStream(tempFile.toPath())) { + os.write("one\n".getBytes()); + os.flush(); + } + + Record<KeyValue<byte[], byte[]>> readRecord = kafkaConnectSource.read(); + + // Verify no flush happens while there is one outstanding record (no ack yet) + verify(spyWriter, timeout(10000).times(0)).beginFlush(); + verify(spyWriter, timeout(10000).times(0)) + .doFlush(ArgumentMatchers.any()); + + readRecord.ack(); + + // Verify flush happens after ack + verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush(); + verify(spyWriter, timeout(10000).atLeastOnce()) + .doFlush(ArgumentMatchers.any()); + + kafkaConnectSource.close(); + } + + @Test(timeOut = 40000) + public void testPartialAckNoFlush_ThenFlushOnAllAck() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter spyWriter = wrapOffsetWriterWithSpy(kafkaConnectSource); + + try (OutputStream os = Files.newOutputStream(tempFile.toPath())) { + os.write("first\n".getBytes()); + os.flush(); + os.write("second\n".getBytes()); + os.flush(); + } + + Record<KeyValue<byte[], byte[]>> r1 = kafkaConnectSource.read(); + Record<KeyValue<byte[], byte[]>> r2 = kafkaConnectSource.read(); + + // Ack only the first; one outstanding remains -> no flush should happen + r1.ack(); + verify(spyWriter, timeout(10000).times(0)).beginFlush(); + verify(spyWriter, timeout(10000).times(0)) + .doFlush(ArgumentMatchers.any()); + + // Ack the second; outstanding reaches zero -> flush should be triggered + r2.ack(); + verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush(); + verify(spyWriter, timeout(10000).atLeastOnce()) + .doFlush(ArgumentMatchers.any()); + + kafkaConnectSource.close(); + } + + @Test(timeOut = 20000) + public void testAckFlush() throws Exception { + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter spyWriter = wrapOffsetWriterWithSpy(kafkaConnectSource); + + try (OutputStream os = Files.newOutputStream(tempFile.toPath())) { + os.write("first\n".getBytes()); + os.flush(); + os.write("second\n".getBytes()); + os.flush(); + } + + Record<KeyValue<byte[], byte[]>> r1 = kafkaConnectSource.read(); + Record<KeyValue<byte[], byte[]>> r2 = kafkaConnectSource.read(); + + // Ack both, no outstanding remains -> flush should happen + r1.ack(); + r2.ack(); + + verify(spyWriter, timeout(10000).atLeastOnce()).beginFlush(); + verify(spyWriter, timeout(10000).atLeastOnce()) + .doFlush(ArgumentMatchers.any()); + + kafkaConnectSource.close(); + } + + @Test public void testOpenAndReadTaskDirect() throws Exception { Map<String, Object> config = getConfig(); @@ -157,8 +333,8 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { sourceOffset.put("test", 0); value.put("myField", "42"); SourceRecord srcRecord = new SourceRecord( - sourcePartition, sourceOffset, topicName, null, - null, null, null, value + sourcePartition, sourceOffset, topicName, null, + null, null, null, value ); KafkaSourceRecord record = kafkaConnectSource.processSourceRecord(srcRecord); @@ -197,8 +373,8 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { Map<String, Object> value = new HashMap<>(); value.put("myField", "42"); SourceRecord record = new SourceRecord( - null, null, "test-topic", null, - null, null, null, value + null, null, "test-topic", null, + null, null, null, value ); SourceRecord transformed = kafkaConnectSource.applyTransforms(record);