This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push: new a7e0eb0 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) a7e0eb0 is described below commit a7e0eb06734ade0d5932e496d352e208a41d3664 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Tue Nov 30 11:35:50 2021 -0500 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit. A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store. Author: Chris Egerton <chr...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- .../kafka/connect/runtime/SubmittedRecords.java | 59 ++++++++++++-- .../kafka/connect/runtime/WorkerSourceTask.java | 6 +- .../connect/runtime/SubmittedRecordsTest.java | 94 +++++++++++++++++++++- 3 files changed, 149 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java index 472a266..6cdd2c1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -26,6 +26,9 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying @@ -41,10 +44,11 @@ class SubmittedRecords { private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); // Visible for testing - final Map<Map<String, Object>, Deque<SubmittedRecord>> records; + final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new HashMap<>(); + private int numUnackedMessages = 0; + private CountDownLatch messageDrainLatch; public SubmittedRecords() { - this.records = new HashMap<>(); } /** @@ -68,6 +72,9 @@ class SubmittedRecords { SubmittedRecord result = new SubmittedRecord(partition, offset); records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) .add(result); + synchronized (this) { + numUnackedMessages++; + } return result; } @@ -89,7 +96,9 @@ class SubmittedRecords { if (deque.isEmpty()) { records.remove(record.partition()); } - if (!result) { + if (result) { + messageAcked(); + } else { log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition()); } return result; @@ -132,6 +141,28 @@ class SubmittedRecords { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } + /** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * This method is expected to be called from the same thread that calls {@link #committableOffsets()}. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ + public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { + // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization + // on an instance variable when invoking CountDownLatch::await outside a synchronized block + CountDownLatch messageDrainLatch; + synchronized (this) { + messageDrainLatch = new CountDownLatch(numUnackedMessages); + this.messageDrainLatch = messageDrainLatch; + } + try { + return messageDrainLatch.await(timeout, timeUnit); + } catch (InterruptedException e) { + return false; + } + } + // Note that this will return null if either there are no committable offsets for the given deque, or the latest // committable offset is itself null. The caller is responsible for distinguishing between the two cases. private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) { @@ -146,15 +177,25 @@ class SubmittedRecords { return queuedRecords.peek() != null && queuedRecords.peek().acked(); } - static class SubmittedRecord { + // Synchronize in order to ensure that the number of unacknowledged messages isn't modified in the middle of a call + // to awaitAllMessages (which might cause us to decrement first, then create a new message drain latch, then count down + // that latch here, effectively double-acking the message) + private synchronized void messageAcked() { + numUnackedMessages--; + if (messageDrainLatch != null) { + messageDrainLatch.countDown(); + } + } + + class SubmittedRecord { private final Map<String, Object> partition; private final Map<String, Object> offset; - private volatile boolean acked; + private final AtomicBoolean acked; public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) { this.partition = partition; this.offset = offset; - this.acked = false; + this.acked = new AtomicBoolean(false); } /** @@ -162,11 +203,13 @@ class SubmittedRecords { * This is safe to be called from a different thread than what called {@link SubmittedRecords#submit(SourceRecord)}. */ public void ack() { - this.acked = true; + if (this.acked.compareAndSet(false, true)) { + messageAcked(); + } } private boolean acked() { - return acked; + return acked.get(); } private Map<String, Object> partition() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a33821a..ed36676 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; @@ -65,7 +66,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -260,6 +260,10 @@ class WorkerSourceTask extends WorkerTask { } catch (InterruptedException e) { // Ignore and allow to exit. } finally { + submittedRecords.awaitAllMessages( + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG), + TimeUnit.MILLISECONDS + ); // It should still be safe to commit offsets since any exception would have // simply resulted in not getting more records but all the existing records should be ok to flush // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java index fd339fc..4028249 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; import org.junit.Before; import org.junit.Test; @@ -23,9 +24,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -244,6 +247,95 @@ public class SubmittedRecordsTest { assertNoEmptyDeques(); } + @Test + public void testAwaitMessagesNoneSubmitted() { + assertTrue(submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)); + } + + @Test + public void testAwaitMessagesAfterAllAcknowledged() { + SubmittedRecord recordToAck = submittedRecords.submit(PARTITION1, newOffset()); + assertFalse(submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)); + recordToAck.ack(); + assertTrue(submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)); + } + + @Test + public void testAwaitMessagesAfterAllRemoved() { + SubmittedRecord recordToRemove1 = submittedRecords.submit(PARTITION1, newOffset()); + SubmittedRecord recordToRemove2 = submittedRecords.submit(PARTITION1, newOffset()); + assertFalse( + "Await should fail since neither of the in-flight records has been removed so far", + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + ); + + submittedRecords.removeLastOccurrence(recordToRemove1); + assertFalse( + "Await should fail since only one of the two submitted records has been removed so far", + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + ); + + submittedRecords.removeLastOccurrence(recordToRemove1); + assertFalse( + "Await should fail since only one of the two submitted records has been removed so far, " + + "even though that record has been removed twice", + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + ); + + submittedRecords.removeLastOccurrence(recordToRemove2); + assertTrue( + "Await should succeed since both submitted records have now been removed", + submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS) + ); + } + + @Test + public void testAwaitMessagesTimesOut() { + submittedRecords.submit(PARTITION1, newOffset()); + assertFalse(submittedRecords.awaitAllMessages(10, TimeUnit.MILLISECONDS)); + } + + @Test + public void testAwaitMessagesReturnsAfterAsynchronousAck() throws Exception { + SubmittedRecord inFlightRecord1 = submittedRecords.submit(PARTITION1, newOffset()); + SubmittedRecord inFlightRecord2 = submittedRecords.submit(PARTITION2, newOffset()); + + AtomicBoolean awaitResult = new AtomicBoolean(); + CountDownLatch awaitComplete = new CountDownLatch(1); + new Thread(() -> { + awaitResult.set(submittedRecords.awaitAllMessages(5, TimeUnit.SECONDS)); + awaitComplete.countDown(); + }).start(); + + assertTrue( + "Should not have finished awaiting message delivery before either in-flight record was acknowledged", + awaitComplete.getCount() > 0 + ); + + inFlightRecord1.ack(); + assertTrue( + "Should not have finished awaiting message delivery before one in-flight record was acknowledged", + awaitComplete.getCount() > 0 + ); + + inFlightRecord1.ack(); + assertTrue( + "Should not have finished awaiting message delivery before one in-flight record was acknowledged, " + + "even though the other record has been acknowledged twice", + awaitComplete.getCount() > 0 + ); + + inFlightRecord2.ack(); + assertTrue( + "Should have finished awaiting message delivery after both in-flight records were acknowledged", + awaitComplete.await(1, TimeUnit.SECONDS) + ); + assertTrue( + "Await of in-flight messages should have succeeded", + awaitResult.get() + ); + } + private void assertNoRemainingDeques() { assertEquals("Internal records map should be completely empty", Collections.emptyMap(), submittedRecords.records); }