This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4597e954d91b05b64ff694cc4c2022713c266eba
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Feb 6 04:53:58 2023 -0500

    KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method 
(#13181)
    
    Reviewers: Mickael Maison <[email protected]>, Greg Harris 
<[email protected]>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 96 ++++++++++++++--------
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 31 +++++--
 2 files changed, 88 insertions(+), 39 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 4e714a6bf46..b9dd470d013 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
@@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask {
     private ReplicationPolicy replicationPolicy;
     private MirrorMetrics metrics;
     private boolean stopping = false;
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
     private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
 
@@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask {
     @Override
     public void start(Map<String, String> props) {
         MirrorTaskConfig config = new MirrorTaskConfig(props);
+        pendingOffsetSyncs.clear();
         outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
         consumerAccess = new Semaphore(1);  // let one thread at a time access 
the consumer
         sourceClusterAlias = config.sourceClusterAlias();
@@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask {
 
     @Override
     public void commit() {
-        // nop
+        // Publish any offset syncs that we've queued up, but have not yet 
been able to publish
+        // (likely because we previously reached our limit for number of 
outstanding syncs)
+        firePendingOffsetSyncs();
     }
 
     @Override
@@ -176,59 +182,81 @@ public class MirrorSourceTask extends SourceTask {
  
     @Override
     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-        try {
-            if (stopping) {
-                return;
-            }
-            if (!metadata.hasOffset()) {
-                log.error("RecordMetadata has no offset -- can't sync offsets 
for {}.", record.topic());
-                return;
-            }
-            TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
-            long latency = System.currentTimeMillis() - record.timestamp();
-            metrics.countRecord(topicPartition);
-            metrics.replicationLatency(topicPartition, latency);
-            TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
-            long upstreamOffset = 
MirrorUtils.unwrapOffset(record.sourceOffset());
-            long downstreamOffset = metadata.offset();
-            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
-        } catch (Throwable e) {
-            log.warn("Failure committing record.", e);
+        if (stopping) {
+            return;
+        }
+        if (metadata == null) {
+            log.debug("No RecordMetadata (source record was probably filtered 
out during transformation) -- can't sync offsets for {}.", record.topic());
+            return;
         }
+        if (!metadata.hasOffset()) {
+            log.error("RecordMetadata has no offset -- can't sync offsets for 
{}.", record.topic());
+            return;
+        }
+        TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
+        long latency = System.currentTimeMillis() - record.timestamp();
+        metrics.countRecord(topicPartition);
+        metrics.replicationLatency(topicPartition, latency);
+        TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
+        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
+        long downstreamOffset = metadata.offset();
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've 
queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long 
upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, 
downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, 
upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long 
upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = 
pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.trace("No more pending offset syncs");
+                    return;
+                }
+                pendingOffsetSync = syncIterator.next();
+                if (!outstandingOffsetSyncs.tryAcquire()) {
+                    // Too many outstanding syncs
+                    log.trace("Too many in-flight offset syncs; will try to 
send remaining offset syncs later");
+                    return;
+                }
+                syncIterator.remove();
+            }
+            // Publish offset sync outside of synchronized block; we may have 
to
+            // wait for producer metadata to update before Producer::send 
returns
+            sendOffsetSync(pendingOffsetSync);
+            log.trace("Dispatched offset sync for {}", 
pendingOffsetSync.topicPartition());
         }
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, 
downstreamOffset);
+    }
+
+    // sends OffsetSync record to internal offsets topic
+    private void sendOffsetSync(OffsetSync offsetSync) {
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(offsetSyncsTopic, 0,
                 offsetSync.recordKey(), offsetSync.recordValue());
         offsetProducer.send(record, (x, e) -> {
             if (e != null) {
                 log.error("Failure sending offset sync.", e);
             } else {
-                log.trace("Sync'd offsets for {}: {}=={}", topicPartition,
-                    upstreamOffset, downstreamOffset);
+                log.trace("Sync'd offsets for {}: {}=={}", 
offsetSync.topicPartition(),
+                    offsetSync.upstreamOffset(), 
offsetSync.downstreamOffset());
             }
             outstandingOffsetSyncs.release();
         });
-        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> 
topicPartitions) {
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index bbd9ec3aff5..300a40087c6 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -283,7 +283,11 @@ public class MirrorSourceTaskTest {
         });
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(1)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of 
them so far
         verify(producer, times(1)).send(any(), any());
 
         recordOffset = 2;
@@ -297,7 +301,11 @@ public class MirrorSourceTaskTest {
         doReturn(null).when(producer).send(any(), producerCallback.capture());
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(2)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of 
them so far
         verify(producer, times(2)).send(any(), any());
 
         // Do not send sync event
@@ -309,22 +317,35 @@ public class MirrorSourceTaskTest {
                 recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        mirrorSourceTask.commit();
 
+        // We should not have dispatched any more syncs to the producer; there 
were too many already in flight
         verify(producer, times(2)).send(any(), any());
 
+        // Now the in-flight sync has been ack'd
+        producerCallback.getValue().onCompletion(null, null);
+        mirrorSourceTask.commit();
+        // We should dispatch the offset sync that was queued but previously 
not sent to the producer now
+        verify(producer, times(3)).send(any(), any());
+
+        // Ack the latest sync immediately
+        producerCallback.getValue().onCompletion(null, null);
+
         // Should send sync event
-        recordOffset = 5;
-        metadataOffset = 150;
+        recordOffset = 6;
+        metadataOffset = 106;
         recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
         sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
                 recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
                 recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
 
-        producerCallback.getValue().onCompletion(null, null);
-
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(4)).send(any(), any());
 
-        verify(producer, times(3)).send(any(), any());
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of 
them so far
+        verify(producer, times(4)).send(any(), any());
     }
 
     private void compareHeaders(List<Header> expectedHeaders, 
List<org.apache.kafka.connect.header.Header> taskHeaders) {

Reply via email to