This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7c78f3f68e83eb9cef3b147951f9108c4bba8b80 Author: emilnkrastev <[email protected]> AuthorDate: Tue Jan 10 16:46:25 2023 +0200 KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818) Reviewers: Greg Harris <[email protected]>, Chris Egerton <[email protected]> --- .../kafka/connect/mirror/MirrorSourceTask.java | 23 ++- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 173 ++++++++++++++++++++- .../MirrorConnectorsIntegrationBaseTest.java | 6 +- 3 files changed, 192 insertions(+), 10 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index ec1e15cda7e..4e714a6bf46 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -69,13 +69,19 @@ public class MirrorSourceTask extends SourceTask { // for testing MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorMetrics metrics, String sourceClusterAlias, - ReplicationPolicy replicationPolicy, long maxOffsetLag) { + ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer, + Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates, + String offsetSyncsTopic) { this.consumer = consumer; this.metrics = metrics; this.sourceClusterAlias = sourceClusterAlias; this.replicationPolicy = replicationPolicy; this.maxOffsetLag = maxOffsetLag; consumerAccess = new Semaphore(1); + this.offsetProducer = producer; + this.outstandingOffsetSyncs = outstandingOffsetSyncs; + this.partitionStates = partitionStates; + this.offsetSyncsTopic = offsetSyncsTopic; } @Override @@ -197,16 +203,18 @@ public class MirrorSourceTask extends SourceTask { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); if (partitionState.update(upstreamOffset, downstreamOffset)) { - sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset); + if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { + partitionState.reset(); + } } } // sends OffsetSync record upstream to internal offsets topic - private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, + private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { if (!outstandingOffsetSyncs.tryAcquire()) { // Too many outstanding offset syncs. - return; + return false; } OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0, @@ -220,6 +228,7 @@ public class MirrorSourceTask extends SourceTask { } outstandingOffsetSyncs.release(); }); + return true; } private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) { @@ -271,6 +280,7 @@ public class MirrorSourceTask extends SourceTask { long lastSyncUpstreamOffset = -1L; long lastSyncDownstreamOffset = -1L; long maxOffsetLag; + boolean shouldSyncOffsets; PartitionState(long maxOffsetLag) { this.maxOffsetLag = maxOffsetLag; @@ -278,7 +288,6 @@ public class MirrorSourceTask extends SourceTask { // true if we should emit an offset sync boolean update(long upstreamOffset, long downstreamOffset) { - boolean shouldSyncOffsets = false; long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; if (lastSyncDownstreamOffset == -1L @@ -293,5 +302,9 @@ public class MirrorSourceTask extends SourceTask { previousDownstreamOffset = downstreamOffset; return shouldSyncOffsets; } + + void reset() { + shouldSyncOffsets = false; + } } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index feb2f7fb6ba..bbd9ec3aff5 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -19,25 +19,38 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.Semaphore; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class MirrorSourceTaskTest { @@ -51,8 +64,10 @@ public class MirrorSourceTaskTest { headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'}); ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L, TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty()); + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7", - new DefaultReplicationPolicy(), 50); + new DefaultReplicationPolicy(), 50, producer, null, null, null); SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord); assertEquals("cluster7.topic1", sourceRecord.topic(), "Failure on cluster7.topic1 consumerRecord serde"); @@ -77,15 +92,33 @@ public class MirrorSourceTaskTest { MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50); assertTrue(partitionState.update(0, 100), "always emit offset sync on first update"); + assertTrue(partitionState.shouldSyncOffsets, "should sync offsets"); + partitionState.reset(); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync"); + partitionState.reset(); assertFalse(partitionState.update(3, 152), "no sync"); + partitionState.reset(); assertFalse(partitionState.update(4, 153), "no sync"); + partitionState.reset(); assertFalse(partitionState.update(5, 154), "no sync"); + partitionState.reset(); assertTrue(partitionState.update(6, 205), "one past target offset"); + partitionState.reset(); assertTrue(partitionState.update(2, 206), "upstream reset"); + partitionState.reset(); assertFalse(partitionState.update(3, 207), "no sync"); + partitionState.reset(); assertTrue(partitionState.update(4, 3), "downstream reset"); + partitionState.reset(); assertFalse(partitionState.update(5, 4), "no sync"); + assertTrue(partitionState.update(7, 6), "sync"); + assertTrue(partitionState.update(7, 6), "sync"); + assertTrue(partitionState.update(8, 7), "sync"); + assertTrue(partitionState.update(10, 57), "sync"); + partitionState.reset(); + assertFalse(partitionState.update(11, 58), "sync"); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); } @Test @@ -94,15 +127,32 @@ public class MirrorSourceTaskTest { // if max offset lag is zero, should always emit offset syncs assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect"); + assertTrue(partitionState.shouldSyncOffsets, "should sync offsets"); + partitionState.reset(); + assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false"); assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect"); + partitionState.reset(); assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect"); + assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect"); + assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect"); + assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect"); + assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect"); + partitionState.reset(); + assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect"); } @Test @@ -127,6 +177,8 @@ public class MirrorSourceTaskTest { @SuppressWarnings("unchecked") KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class); + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); when(consumer.poll(any())).thenReturn(consumerRecords); MirrorMetrics metrics = mock(MirrorMetrics.class); @@ -134,7 +186,7 @@ public class MirrorSourceTaskTest { String sourceClusterName = "cluster1"; ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, - replicationPolicy, 50); + replicationPolicy, 50, producer, null, null, null); List<SourceRecord> sourceRecords = mirrorSourceTask.poll(); assertEquals(2, sourceRecords.size()); @@ -160,6 +212,121 @@ public class MirrorSourceTaskTest { } } + @Test + public void testCommitRecordWithNullMetadata() { + // Create a consumer mock + byte[] key1 = "abc".getBytes(); + byte[] value1 = "fgh".getBytes(); + String topicName = "test"; + String headerKey = "key"; + RecordHeaders headers = new RecordHeaders(new Header[] { + new RecordHeader(headerKey, "value".getBytes()), + }); + + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class); + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + MirrorMetrics metrics = mock(MirrorMetrics.class); + + String sourceClusterName = "cluster1"; + ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, + replicationPolicy, 50, producer, null, null, null); + + SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(), + TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty())); + + // Expect that commitRecord will not throw an exception + mirrorSourceTask.commitRecord(sourceRecord, null); + verifyNoInteractions(producer); + } + + @Test + public void testSendSyncEvent() { + byte[] recordKey = "key".getBytes(); + byte[] recordValue = "value".getBytes(); + int maxOffsetLag = 50; + int recordPartition = 0; + int recordOffset = 0; + int metadataOffset = 100; + String topicName = "topic"; + String sourceClusterName = "sourceCluster"; + + RecordHeaders headers = new RecordHeaders(); + ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); + + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class); + @SuppressWarnings("unchecked") + KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class); + MirrorMetrics metrics = mock(MirrorMetrics.class); + Semaphore outstandingOffsetSyncs = new Semaphore(1); + PartitionState partitionState = new PartitionState(maxOffsetLag); + Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); + + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, + replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName); + + SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()); + partitionStates.put(sourceTopicPartition, partitionState); + RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + + ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class); + when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> { + producerCallback.getValue().onCompletion(null, null); + return null; + }); + + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + + verify(producer, times(1)).send(any(), any()); + + recordOffset = 2; + metadataOffset = 102; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + // Do not release outstanding sync semaphore + doReturn(null).when(producer).send(any(), producerCallback.capture()); + + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + + verify(producer, times(2)).send(any(), any()); + + // Do not send sync event + recordOffset = 4; + metadataOffset = 104; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + + verify(producer, times(2)).send(any(), any()); + + // Should send sync event + recordOffset = 5; + metadataOffset = 150; + recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); + sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, + recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, + recordValue.length, recordKey, recordValue, headers, Optional.empty())); + + producerCallback.getValue().onCompletion(null, null); + + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + + verify(producer, times(3)).send(any(), any()); + } + private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) { assertEquals(expectedHeaders.size(), taskHeaders.size()); for (int i = 0; i < expectedHeaders.size(); i++) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index dfafdcbd8c6..3d448e9c150 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -299,8 +299,10 @@ public class MirrorConnectorsIntegrationBaseTest { Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofMillis(CHECKPOINT_DURATION_MS)); - assertTrue(backupOffsets.containsKey( - new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + for (int i = 0; i < NUM_PARTITIONS; i++) { + assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)), + "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + } // Failover consumer group to backup cluster. try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
