This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 87b3052 KAFKA-13469: Block for in-flight record delivery before
end-of-life source task offset commit (#11524)
87b3052 is described below
commit 87b3052c94696369156ebf6f521ea96c274a89cd
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Nov 30 11:35:50 2021 -0500
KAFKA-13469: Block for in-flight record delivery before end-of-life source
task offset commit (#11524)
Although committing source task offsets without blocking on the delivery of
all in-flight records is beneficial most of the time, it can lead to duplicate
record delivery if there are in-flight records at the time of the task's
end-of-life offset commit.
A best-effort attempt is made here to wait for any such in-flight records
to be delivered before proceeding with the end-of-life offset commit for source
tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before
calculating the latest committable offsets for the task and flushing those to
the persistent offset store.
Author: Chris Egerton <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../kafka/connect/runtime/SubmittedRecords.java | 59 ++++++++++++--
.../kafka/connect/runtime/WorkerSourceTask.java | 6 +-
.../connect/runtime/SubmittedRecordsTest.java | 94 +++++++++++++++++++++-
3 files changed, 149 insertions(+), 10 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
index 472a266..6cdd2c1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -26,6 +26,9 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Used to track source records that have been (or are about to be) dispatched
to a producer and their accompanying
@@ -41,10 +44,11 @@ class SubmittedRecords {
private static final Logger log =
LoggerFactory.getLogger(SubmittedRecords.class);
// Visible for testing
- final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+ final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new
HashMap<>();
+ private int numUnackedMessages = 0;
+ private CountDownLatch messageDrainLatch;
public SubmittedRecords() {
- this.records = new HashMap<>();
}
/**
@@ -68,6 +72,9 @@ class SubmittedRecords {
SubmittedRecord result = new SubmittedRecord(partition, offset);
records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
.add(result);
+ synchronized (this) {
+ numUnackedMessages++;
+ }
return result;
}
@@ -89,7 +96,9 @@ class SubmittedRecords {
if (deque.isEmpty()) {
records.remove(record.partition());
}
- if (!result) {
+ if (result) {
+ messageAcked();
+ } else {
log.warn("Attempted to remove record from submitted queue for
partition {}, but the record has not been submitted or has already been
removed", record.partition());
}
return result;
@@ -132,6 +141,28 @@ class SubmittedRecords {
return new CommittableOffsets(offsets, totalCommittableMessages,
totalUncommittableMessages, records.size(), largestDequeSize,
largestDequePartition);
}
+ /**
+ * Wait for all currently in-flight messages to be acknowledged, up to the
requested timeout.
+ * This method is expected to be called from the same thread that calls
{@link #committableOffsets()}.
+ * @param timeout the maximum time to wait
+ * @param timeUnit the time unit of the timeout argument
+ * @return whether all in-flight messages were acknowledged before the
timeout elapsed
+ */
+ public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+ // Create a new message drain latch as a local variable to avoid
SpotBugs warnings about inconsistent synchronization
+ // on an instance variable when invoking CountDownLatch::await outside
a synchronized block
+ CountDownLatch messageDrainLatch;
+ synchronized (this) {
+ messageDrainLatch = new CountDownLatch(numUnackedMessages);
+ this.messageDrainLatch = messageDrainLatch;
+ }
+ try {
+ return messageDrainLatch.await(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
// Note that this will return null if either there are no committable
offsets for the given deque, or the latest
// committable offset is itself null. The caller is responsible for
distinguishing between the two cases.
private Map<String, Object> committableOffset(Deque<SubmittedRecord>
queuedRecords) {
@@ -146,15 +177,25 @@ class SubmittedRecords {
return queuedRecords.peek() != null && queuedRecords.peek().acked();
}
- static class SubmittedRecord {
+ // Synchronize in order to ensure that the number of unacknowledged
messages isn't modified in the middle of a call
+ // to awaitAllMessages (which might cause us to decrement first, then
create a new message drain latch, then count down
+ // that latch here, effectively double-acking the message)
+ private synchronized void messageAcked() {
+ numUnackedMessages--;
+ if (messageDrainLatch != null) {
+ messageDrainLatch.countDown();
+ }
+ }
+
+ class SubmittedRecord {
private final Map<String, Object> partition;
private final Map<String, Object> offset;
- private volatile boolean acked;
+ private final AtomicBoolean acked;
public SubmittedRecord(Map<String, Object> partition, Map<String,
Object> offset) {
this.partition = partition;
this.offset = offset;
- this.acked = false;
+ this.acked = new AtomicBoolean(false);
}
/**
@@ -162,11 +203,13 @@ class SubmittedRecords {
* This is safe to be called from a different thread than what called
{@link SubmittedRecords#submit(SourceRecord)}.
*/
public void ack() {
- this.acked = true;
+ if (this.acked.compareAndSet(false, true)) {
+ messageAcked();
+ }
}
private boolean acked() {
- return acked;
+ return acked.get();
}
private Map<String, Object> partition() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a33821a..ed36676 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
@@ -65,7 +66,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import static
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
import static
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
import static
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
@@ -260,6 +260,10 @@ class WorkerSourceTask extends WorkerTask {
} catch (InterruptedException e) {
// Ignore and allow to exit.
} finally {
+ submittedRecords.awaitAllMessages(
+
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
+ TimeUnit.MILLISECONDS
+ );
// It should still be safe to commit offsets since any exception
would have
// simply resulted in not getting more records but all the
existing records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an
exception causing the offset commit
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
index fd339fc..4028249 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
import org.junit.Before;
import org.junit.Test;
@@ -23,9 +24,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
import static
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -244,6 +247,95 @@ public class SubmittedRecordsTest {
assertNoEmptyDeques();
}
+ @Test
+ public void testAwaitMessagesNoneSubmitted() {
+ assertTrue(submittedRecords.awaitAllMessages(0,
TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testAwaitMessagesAfterAllAcknowledged() {
+ SubmittedRecord recordToAck = submittedRecords.submit(PARTITION1,
newOffset());
+ assertFalse(submittedRecords.awaitAllMessages(0,
TimeUnit.MILLISECONDS));
+ recordToAck.ack();
+ assertTrue(submittedRecords.awaitAllMessages(0,
TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testAwaitMessagesAfterAllRemoved() {
+ SubmittedRecord recordToRemove1 = submittedRecords.submit(PARTITION1,
newOffset());
+ SubmittedRecord recordToRemove2 = submittedRecords.submit(PARTITION1,
newOffset());
+ assertFalse(
+ "Await should fail since neither of the in-flight records has
been removed so far",
+ submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+ );
+
+ submittedRecords.removeLastOccurrence(recordToRemove1);
+ assertFalse(
+ "Await should fail since only one of the two submitted records
has been removed so far",
+ submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+ );
+
+ submittedRecords.removeLastOccurrence(recordToRemove1);
+ assertFalse(
+ "Await should fail since only one of the two submitted records
has been removed so far, "
+ + "even though that record has been removed twice",
+ submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+ );
+
+ submittedRecords.removeLastOccurrence(recordToRemove2);
+ assertTrue(
+ "Await should succeed since both submitted records have now
been removed",
+ submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+ );
+ }
+
+ @Test
+ public void testAwaitMessagesTimesOut() {
+ submittedRecords.submit(PARTITION1, newOffset());
+ assertFalse(submittedRecords.awaitAllMessages(10,
TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testAwaitMessagesReturnsAfterAsynchronousAck() throws
Exception {
+ SubmittedRecord inFlightRecord1 = submittedRecords.submit(PARTITION1,
newOffset());
+ SubmittedRecord inFlightRecord2 = submittedRecords.submit(PARTITION2,
newOffset());
+
+ AtomicBoolean awaitResult = new AtomicBoolean();
+ CountDownLatch awaitComplete = new CountDownLatch(1);
+ new Thread(() -> {
+ awaitResult.set(submittedRecords.awaitAllMessages(5,
TimeUnit.SECONDS));
+ awaitComplete.countDown();
+ }).start();
+
+ assertTrue(
+ "Should not have finished awaiting message delivery before
either in-flight record was acknowledged",
+ awaitComplete.getCount() > 0
+ );
+
+ inFlightRecord1.ack();
+ assertTrue(
+ "Should not have finished awaiting message delivery before one
in-flight record was acknowledged",
+ awaitComplete.getCount() > 0
+ );
+
+ inFlightRecord1.ack();
+ assertTrue(
+ "Should not have finished awaiting message delivery before one
in-flight record was acknowledged, "
+ + "even though the other record has been acknowledged
twice",
+ awaitComplete.getCount() > 0
+ );
+
+ inFlightRecord2.ack();
+ assertTrue(
+ "Should have finished awaiting message delivery after both
in-flight records were acknowledged",
+ awaitComplete.await(1, TimeUnit.SECONDS)
+ );
+ assertTrue(
+ "Await of in-flight messages should have succeeded",
+ awaitResult.get()
+ );
+ }
+
private void assertNoRemainingDeques() {
assertEquals("Internal records map should be completely empty",
Collections.emptyMap(), submittedRecords.records);
}