This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 9aa261a KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343) 9aa261a is described below commit 9aa261a6422fdda319fefb7183adfb17d17b6e1f Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Mon Oct 19 17:51:44 2020 +0200 KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343) Trigger task reconfiguration when: - topic-partitions are created or deleted on source cluster - topic-partitions are missing on target cluster Authors: Mickael Maison <mickael.mai...@gmail.com>, Edoardo Comar <eco...@uk.ibm.com> Reviewer: Randall Hauch <rha...@gmail.com> --- .../connect/mirror/MirrorSourceConnector.java | 54 +++++++++++++++------- .../connect/mirror/MirrorSourceConnectorTest.java | 31 ++++++++++++- 2 files changed, 67 insertions(+), 18 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 041814a..0f6eb46 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -96,7 +96,7 @@ public class MirrorSourceConnector extends SourceConnector { this.replicationPolicy = replicationPolicy; this.topicFilter = topicFilter; this.configPropertyFilter = configPropertyFilter; - } + } @Override public void start(Map<String, String> props) { @@ -202,6 +202,7 @@ public class MirrorSourceConnector extends SourceConnector { throws InterruptedException, ExecutionException { Set<String> topics = listTopics(targetAdminClient).stream() .filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t))) + .filter(t -> !t.equals(config.checkpointsTopic())) .collect(Collectors.toSet()); return describeTopics(targetAdminClient, topics).stream() .flatMap(MirrorSourceConnector::expandTopicDescription) @@ -211,23 +212,44 @@ public class MirrorSourceConnector extends SourceConnector { // visible for testing void refreshTopicPartitions() throws InterruptedException, ExecutionException { - knownSourceTopicPartitions = findSourceTopicPartitions(); - knownTargetTopicPartitions = findTargetTopicPartitions(); - List<TopicPartition> upstreamTargetTopicPartitions = knownTargetTopicPartitions.stream() + + List<TopicPartition> sourceTopicPartitions = findSourceTopicPartitions(); + List<TopicPartition> targetTopicPartitions = findTargetTopicPartitions(); + + Set<TopicPartition> sourceTopicPartitionsSet = new HashSet<>(sourceTopicPartitions); + Set<TopicPartition> knownSourceTopicPartitionsSet = new HashSet<>(knownSourceTopicPartitions); + + Set<TopicPartition> upstreamTargetTopicPartitions = targetTopicPartitions.stream() .map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition())) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + + Set<TopicPartition> missingInTarget = new HashSet<>(sourceTopicPartitions); + missingInTarget.removeAll(upstreamTargetTopicPartitions); + + knownTargetTopicPartitions = targetTopicPartitions; + + // Detect if topic-partitions were added or deleted from the source cluster + // or if topic-partitions are missing from the target cluster + if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) { + + Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet; + newTopicPartitions.removeAll(knownSourceTopicPartitions); + + Set<TopicPartition> deletedTopicPartitions = knownSourceTopicPartitionsSet; + deletedTopicPartitions.removeAll(sourceTopicPartitions); + + log.info("Found {} new topic-partitions on {}. " + + "Found {} deleted topic-partitions on {}. " + + "Found {} topic-partitions missing on {}.", + newTopicPartitions.size(), sourceAndTarget.source(), + deletedTopicPartitions.size(), sourceAndTarget.source(), + missingInTarget.size(), sourceAndTarget.target()); + + log.trace("Found new topic-partitions on {}: {}", sourceAndTarget.source(), newTopicPartitions); + log.trace("Found deleted topic-partitions on {}: {}", sourceAndTarget.source(), deletedTopicPartitions); + log.trace("Found missing topic-partitions on {}: {}", sourceAndTarget.target(), missingInTarget); - Set<TopicPartition> newTopicPartitions = new HashSet<>(); - newTopicPartitions.addAll(knownSourceTopicPartitions); - newTopicPartitions.removeAll(upstreamTargetTopicPartitions); - Set<TopicPartition> deadTopicPartitions = new HashSet<>(); - deadTopicPartitions.addAll(upstreamTargetTopicPartitions); - deadTopicPartitions.removeAll(knownSourceTopicPartitions); - if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) { - log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.", - knownSourceTopicPartitions.size(), sourceAndTarget.source(), newTopicPartitions.size(), - deadTopicPartitions.size(), knownSourceTopicPartitions.size()); - log.trace("Found new topic-partitions: {}", newTopicPartitions); + knownSourceTopicPartitions = sourceTopicPartitions; computeAndCreateTopicPartitions(); context.requestTaskReconfiguration(); } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index e86d21e..c915845 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -186,7 +186,7 @@ public class MirrorSourceConnectorTest { connector.initialize(mock(ConnectorContext.class)); connector = spy(connector); - List<TopicPartition> sourceTopicPartitions = Arrays.asList(new TopicPartition("topic", 0)); + List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0)); doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions(); doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions(); doNothing().when(connector).createTopicPartitions(any(), any(), any()); @@ -205,11 +205,38 @@ public class MirrorSourceConnectorTest { eq(expectedNewTopics), eq(Collections.emptyMap())); - List<TopicPartition> targetTopicPartitions = Arrays.asList(new TopicPartition("source.topic", 0)); + List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0)); doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions(); connector.refreshTopicPartitions(); // once target topic is created, refreshTopicPartitions() will NOT call computeAndCreateTopicPartitions() again verify(connector, times(2)).computeAndCreateTopicPartitions(); } + + @Test + public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception { + MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter()); + connector.initialize(mock(ConnectorContext.class)); + connector = spy(connector); + + List<TopicPartition> sourceTopicPartitions = Collections.emptyList(); + List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0)); + doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions(); + doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions(); + doNothing().when(connector).createTopicPartitions(any(), any(), any()); + + // partitions appearing on the target cluster should not cause reconfiguration + connector.refreshTopicPartitions(); + connector.refreshTopicPartitions(); + verify(connector, times(0)).computeAndCreateTopicPartitions(); + + sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0)); + doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions(); + + // when partitions are added to the source cluster, reconfiguration is triggered + connector.refreshTopicPartitions(); + verify(connector, times(1)).computeAndCreateTopicPartitions(); + + } }