This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 55e69a0db81713f700635f96cce56351abfe8ba3
Author: emilnkrastev <[email protected]>
AuthorDate: Tue Jan 10 16:46:25 2023 +0200

    KAFKA-12558: Do not prematurely mutate internal partition state in Mirror 
Maker 2 (#11818)
    
    Reviewers: Greg Harris <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     |  22 +++-
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 139 ++++++++++++++++++++-
 .../MirrorConnectorsIntegrationBaseTest.java       |   6 +-
 3 files changed, 156 insertions(+), 11 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index da4697ddf25..5635eb7189d 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,7 +69,9 @@ public class MirrorSourceTask extends SourceTask {
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
-                     ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer<byte[], byte[]> producer) {
+                     ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer<byte[], byte[]> producer,
+                     Semaphore outstandingOffsetSyncs, Map<TopicPartition, 
PartitionState> partitionStates,
+                     String offsetSyncsTopic) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
@@ -77,6 +79,9 @@ public class MirrorSourceTask extends SourceTask {
         this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
         this.offsetProducer = producer;
+        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+        this.partitionStates = partitionStates;
+        this.offsetSyncsTopic = offsetSyncsTopic;
     }
 
     @Override
@@ -198,16 +203,18 @@ public class MirrorSourceTask extends SourceTask {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            if (sendOffsetSync(topicPartition, upstreamOffset, 
downstreamOffset)) {
+                partitionState.reset();
+            }
         }
     }
 
     // sends OffsetSync record upstream to internal offsets topic
-    private void sendOffsetSync(TopicPartition topicPartition, long 
upstreamOffset,
+    private boolean sendOffsetSync(TopicPartition topicPartition, long 
upstreamOffset,
             long downstreamOffset) {
         if (!outstandingOffsetSyncs.tryAcquire()) {
             // Too many outstanding offset syncs.
-            return;
+            return false;
         }
         OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, 
downstreamOffset);
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(offsetSyncsTopic, 0,
@@ -221,6 +228,7 @@ public class MirrorSourceTask extends SourceTask {
             }
             outstandingOffsetSyncs.release();
         });
+        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> 
topicPartitions) {
@@ -272,6 +280,7 @@ public class MirrorSourceTask extends SourceTask {
         long lastSyncUpstreamOffset = -1L;
         long lastSyncDownstreamOffset = -1L;
         long maxOffsetLag;
+        boolean shouldSyncOffsets;
 
         PartitionState(long maxOffsetLag) {
             this.maxOffsetLag = maxOffsetLag;
@@ -279,7 +288,6 @@ public class MirrorSourceTask extends SourceTask {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            boolean shouldSyncOffsets = false;
             long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
             long downstreamTargetOffset = lastSyncDownstreamOffset + 
upstreamStep;
             if (lastSyncDownstreamOffset == -1L
@@ -294,5 +302,9 @@ public class MirrorSourceTask extends SourceTask {
             previousDownstreamOffset = downstreamOffset;
             return shouldSyncOffsets;
         }
+
+        void reset() {
+            shouldSyncOffsets = false;
+        }
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 14cf4143c90..9dfcf807ed2 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,26 +19,37 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verifyNoInteractions;
 
@@ -56,7 +67,7 @@ public class MirrorSourceTaskTest {
         @SuppressWarnings("unchecked")
         KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, 
"cluster7",
-                new DefaultReplicationPolicy(), 50, producer);
+                new DefaultReplicationPolicy(), 50, producer, null, null, 
null);
         SourceRecord sourceRecord = 
mirrorSourceTask.convertRecord(consumerRecord);
         assertEquals("cluster7.topic1", sourceRecord.topic(),
                 "Failure on cluster7.topic1 consumerRecord serde");
@@ -81,15 +92,33 @@ public class MirrorSourceTaskTest {
         MirrorSourceTask.PartitionState partitionState = new 
MirrorSourceTask.PartitionState(50);
 
         assertTrue(partitionState.update(0, 100), "always emit offset sync on 
first update");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to 
false");
         assertTrue(partitionState.update(2, 102), "upstream offset skipped -> 
resync");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 152), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(4, 153), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 154), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 205), "one past target offset");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 206), "upstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 207), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "downstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 4), "no sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(8, 7), "sync");
+        assertTrue(partitionState.update(10, 57), "sync");
+        partitionState.reset();
+        assertFalse(partitionState.update(11, 58), "sync");
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to 
false");
     }
 
     @Test
@@ -98,15 +127,32 @@ public class MirrorSourceTaskTest {
 
         // if max offset lag is zero, should always emit offset syncs
         assertTrue(partitionState.update(0, 100), "zeroOffsetSync 
downStreamOffset 100 is incorrect");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to 
false");
         assertTrue(partitionState.update(2, 102), "zeroOffsetSync 
downStreamOffset 102 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 153), "zeroOffsetSync 
downStreamOffset 153 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 154), "zeroOffsetSync 
downStreamOffset 154 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 155), "zeroOffsetSync 
downStreamOffset 155 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 207), "zeroOffsetSync 
downStreamOffset 207 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 208), "zeroOffsetSync 
downStreamOffset 208 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 209), "zeroOffsetSync 
downStreamOffset 209 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "zeroOffsetSync 
downStreamOffset 3 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 4), "zeroOffsetSync 
downStreamOffset 4 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync 
downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync 
downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(8, 7), "zeroOffsetSync 
downStreamOffset 7 is incorrect");
+        assertTrue(partitionState.update(10, 57), "zeroOffsetSync 
downStreamOffset 57 is incorrect");
+        partitionState.reset();
+        assertTrue(partitionState.update(11, 58), "zeroOffsetSync 
downStreamOffset 58 is incorrect");
     }
 
     @Test
@@ -140,7 +186,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
-                replicationPolicy, 50, producer);
+                replicationPolicy, 50, producer, null, null, null);
         List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
 
         assertEquals(2, sourceRecords.size());
@@ -186,7 +232,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
-                replicationPolicy, 50, producer);
+                replicationPolicy, 50, producer, null, null, null);
 
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
                 TimestampType.CREATE_TIME, key1.length, value1.length, key1, 
value1, headers, Optional.empty()));
@@ -196,6 +242,91 @@ public class MirrorSourceTaskTest {
         verifyNoInteractions(producer);
     }
 
+    @Test
+    public void testSendSyncEvent() {
+        byte[] recordKey = "key".getBytes();
+        byte[] recordValue = "value".getBytes();
+        int maxOffsetLag = 50;
+        int recordPartition = 0;
+        int recordOffset = 0;
+        int metadataOffset = 100;
+        String topicName = "topic";
+        String sourceClusterName = "sourceCluster";
+
+        RecordHeaders headers = new RecordHeaders();
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+        Semaphore outstandingOffsetSyncs = new Semaphore(1);
+        PartitionState partitionState = new PartitionState(maxOffsetLag);
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+                replicationPolicy, maxOffsetLag, producer, 
outstandingOffsetSyncs, partitionStates, topicName);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+        TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+        partitionStates.put(sourceTopicPartition, partitionState);
+        RecordMetadata recordMetadata = new 
RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+        ArgumentCaptor<Callback> producerCallback = 
ArgumentCaptor.forClass(Callback.class);
+        when(producer.send(any(), 
producerCallback.capture())).thenAnswer(mockInvocation -> {
+            producerCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(1)).send(any(), any());
+
+        recordOffset = 2;
+        metadataOffset = 102;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+        // Do not release outstanding sync semaphore
+        doReturn(null).when(producer).send(any(), producerCallback.capture());
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Do not send sync event
+        recordOffset = 4;
+        metadataOffset = 104;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Should send sync event
+        recordOffset = 5;
+        metadataOffset = 150;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
+        producerCallback.getValue().onCompletion(null, null);
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(3)).send(any(), any());
+    }
+
     private void compareHeaders(List<Header> expectedHeaders, 
List<org.apache.kafka.connect.header.Header> taskHeaders) {
         assertEquals(expectedHeaders.size(), taskHeaders.size());
         for (int i = 0; i < expectedHeaders.size(); i++) {
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 9b744cb3e99..5ca9b110707 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -305,8 +305,10 @@ public class MirrorConnectorsIntegrationBaseTest {
         Map<TopicPartition, OffsetAndMetadata> backupOffsets = 
backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS));
 
-        assertTrue(backupOffsets.containsKey(
-            new TopicPartition("primary.test-topic-1", 0)), "Offsets not 
translated downstream to backup cluster. Found: " + backupOffsets);
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            assertTrue(backupOffsets.containsKey(new 
TopicPartition("primary.test-topic-1", i)),
+                   "Offsets not translated downstream to backup cluster. 
Found: " + backupOffsets);
+        }
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = 
backup.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {

Reply via email to