This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new a7e0eb0  KAFKA-13469: Block for in-flight record delivery before 
end-of-life source task offset commit (#11524)
a7e0eb0 is described below

commit a7e0eb06734ade0d5932e496d352e208a41d3664
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Tue Nov 30 11:35:50 2021 -0500

    KAFKA-13469: Block for in-flight record delivery before end-of-life source 
task offset commit (#11524)
    
    Although committing source task offsets without blocking on the delivery of 
all in-flight records is beneficial most of the time, it can lead to duplicate 
record delivery if there are in-flight records at the time of the task's 
end-of-life offset commit.
    
    A best-effort attempt is made here to wait for any such in-flight records 
to be delivered before proceeding with the end-of-life offset commit for source 
tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before 
calculating the latest committable offsets for the task and flushing those to 
the persistent offset store.
    
    Author: Chris Egerton <chr...@confluent.io>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../kafka/connect/runtime/SubmittedRecords.java    | 59 ++++++++++++--
 .../kafka/connect/runtime/WorkerSourceTask.java    |  6 +-
 .../connect/runtime/SubmittedRecordsTest.java      | 94 +++++++++++++++++++++-
 3 files changed, 149 insertions(+), 10 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
index 472a266..6cdd2c1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -26,6 +26,9 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
@@ -41,10 +44,11 @@ class SubmittedRecords {
     private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
 
     // Visible for testing
-    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new 
HashMap<>();
+    private int numUnackedMessages = 0;
+    private CountDownLatch messageDrainLatch;
 
     public SubmittedRecords() {
-        this.records = new HashMap<>();
     }
 
     /**
@@ -68,6 +72,9 @@ class SubmittedRecords {
         SubmittedRecord result = new SubmittedRecord(partition, offset);
         records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
                 .add(result);
+        synchronized (this) {
+            numUnackedMessages++;
+        }
         return result;
     }
 
@@ -89,7 +96,9 @@ class SubmittedRecords {
         if (deque.isEmpty()) {
             records.remove(record.partition());
         }
-        if (!result) {
+        if (result) {
+            messageAcked();
+        } else {
             log.warn("Attempted to remove record from submitted queue for 
partition {}, but the record has not been submitted or has already been 
removed", record.partition());
         }
         return result;
@@ -132,6 +141,28 @@ class SubmittedRecords {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.
+     * This method is expected to be called from the same thread that calls 
{@link #committableOffsets()}.
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return whether all in-flight messages were acknowledged before the 
timeout elapsed
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+        // Create a new message drain latch as a local variable to avoid 
SpotBugs warnings about inconsistent synchronization
+        // on an instance variable when invoking CountDownLatch::await outside 
a synchronized block
+        CountDownLatch messageDrainLatch;
+        synchronized (this) {
+            messageDrainLatch = new CountDownLatch(numUnackedMessages);
+            this.messageDrainLatch = messageDrainLatch;
+        }
+        try {
+            return messageDrainLatch.await(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
     // Note that this will return null if either there are no committable 
offsets for the given deque, or the latest
     // committable offset is itself null. The caller is responsible for 
distinguishing between the two cases.
     private Map<String, Object> committableOffset(Deque<SubmittedRecord> 
queuedRecords) {
@@ -146,15 +177,25 @@ class SubmittedRecords {
         return queuedRecords.peek() != null && queuedRecords.peek().acked();
     }
 
-    static class SubmittedRecord {
+    // Synchronize in order to ensure that the number of unacknowledged 
messages isn't modified in the middle of a call
+    // to awaitAllMessages (which might cause us to decrement first, then 
create a new message drain latch, then count down
+    // that latch here, effectively double-acking the message)
+    private synchronized void messageAcked() {
+        numUnackedMessages--;
+        if (messageDrainLatch != null) {
+            messageDrainLatch.countDown();
+        }
+    }
+
+    class SubmittedRecord {
         private final Map<String, Object> partition;
         private final Map<String, Object> offset;
-        private volatile boolean acked;
+        private final AtomicBoolean acked;
 
         public SubmittedRecord(Map<String, Object> partition, Map<String, 
Object> offset) {
             this.partition = partition;
             this.offset = offset;
-            this.acked = false;
+            this.acked = new AtomicBoolean(false);
         }
 
         /**
@@ -162,11 +203,13 @@ class SubmittedRecords {
          * This is safe to be called from a different thread than what called 
{@link SubmittedRecords#submit(SourceRecord)}.
          */
         public void ack() {
-            this.acked = true;
+            if (this.acked.compareAndSet(false, true)) {
+                messageAcked();
+            }
         }
 
         private boolean acked() {
-            return acked;
+            return acked.get();
         }
 
         private Map<String, Object> partition() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a33821a..ed36676 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
@@ -65,7 +66,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
 import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 
@@ -260,6 +260,10 @@ class WorkerSourceTask extends WorkerTask {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } finally {
+            submittedRecords.awaitAllMessages(
+                    
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
+                    TimeUnit.MILLISECONDS
+            );
             // It should still be safe to commit offsets since any exception 
would have
             // simply resulted in not getting more records but all the 
existing records should be ok to flush
             // and commit offsets. Worst case, task.flush() will also throw an 
exception causing the offset commit
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
index fd339fc..4028249 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -23,9 +24,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
 import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -244,6 +247,95 @@ public class SubmittedRecordsTest {
         assertNoEmptyDeques();
     }
 
+    @Test
+    public void testAwaitMessagesNoneSubmitted() {
+        assertTrue(submittedRecords.awaitAllMessages(0, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testAwaitMessagesAfterAllAcknowledged() {
+        SubmittedRecord recordToAck = submittedRecords.submit(PARTITION1, 
newOffset());
+        assertFalse(submittedRecords.awaitAllMessages(0, 
TimeUnit.MILLISECONDS));
+        recordToAck.ack();
+        assertTrue(submittedRecords.awaitAllMessages(0, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testAwaitMessagesAfterAllRemoved() {
+        SubmittedRecord recordToRemove1 = submittedRecords.submit(PARTITION1, 
newOffset());
+        SubmittedRecord recordToRemove2 = submittedRecords.submit(PARTITION1, 
newOffset());
+        assertFalse(
+                "Await should fail since neither of the in-flight records has 
been removed so far",
+                submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+        );
+
+        submittedRecords.removeLastOccurrence(recordToRemove1);
+        assertFalse(
+                "Await should fail since only one of the two submitted records 
has been removed so far",
+                submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+        );
+
+        submittedRecords.removeLastOccurrence(recordToRemove1);
+        assertFalse(
+                "Await should fail since only one of the two submitted records 
has been removed so far, "
+                        + "even though that record has been removed twice",
+                submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+        );
+
+        submittedRecords.removeLastOccurrence(recordToRemove2);
+        assertTrue(
+                "Await should succeed since both submitted records have now 
been removed",
+                submittedRecords.awaitAllMessages(0, TimeUnit.MILLISECONDS)
+        );
+    }
+
+    @Test
+    public void testAwaitMessagesTimesOut() {
+        submittedRecords.submit(PARTITION1, newOffset());
+        assertFalse(submittedRecords.awaitAllMessages(10, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testAwaitMessagesReturnsAfterAsynchronousAck() throws 
Exception {
+        SubmittedRecord inFlightRecord1 = submittedRecords.submit(PARTITION1, 
newOffset());
+        SubmittedRecord inFlightRecord2 = submittedRecords.submit(PARTITION2, 
newOffset());
+
+        AtomicBoolean awaitResult = new AtomicBoolean();
+        CountDownLatch awaitComplete = new CountDownLatch(1);
+        new Thread(() -> {
+            awaitResult.set(submittedRecords.awaitAllMessages(5, 
TimeUnit.SECONDS));
+            awaitComplete.countDown();
+        }).start();
+
+        assertTrue(
+                "Should not have finished awaiting message delivery before 
either in-flight record was acknowledged",
+                awaitComplete.getCount() > 0
+        );
+
+        inFlightRecord1.ack();
+        assertTrue(
+                "Should not have finished awaiting message delivery before one 
in-flight record was acknowledged",
+                awaitComplete.getCount() > 0
+        );
+
+        inFlightRecord1.ack();
+        assertTrue(
+                "Should not have finished awaiting message delivery before one 
in-flight record was acknowledged, "
+                        + "even though the other record has been acknowledged 
twice",
+                awaitComplete.getCount() > 0
+        );
+
+        inFlightRecord2.ack();
+        assertTrue(
+                "Should have finished awaiting message delivery after both 
in-flight records were acknowledged",
+                awaitComplete.await(1, TimeUnit.SECONDS)
+        );
+        assertTrue(
+                "Await of in-flight messages should have succeeded",
+                awaitResult.get()
+        );
+    }
+
     private void assertNoRemainingDeques() {
         assertEquals("Internal records map should be completely empty", 
Collections.emptyMap(), submittedRecords.records);
     }

Reply via email to