This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 9aa261a  KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
9aa261a is described below

commit 9aa261a6422fdda319fefb7183adfb17d17b6e1f
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Mon Oct 19 17:51:44 2020 +0200

    KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
    
    Trigger task reconfiguration when:
    - topic-partitions are created or deleted on source cluster
    - topic-partitions are missing on target cluster
    
    Authors: Mickael Maison <mickael.mai...@gmail.com>, Edoardo Comar 
<eco...@uk.ibm.com>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../connect/mirror/MirrorSourceConnector.java      | 54 +++++++++++++++-------
 .../connect/mirror/MirrorSourceConnectorTest.java  | 31 ++++++++++++-
 2 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 041814a..0f6eb46 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -96,7 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
         this.replicationPolicy = replicationPolicy;
         this.topicFilter = topicFilter;
         this.configPropertyFilter = configPropertyFilter;
-    } 
+    }
 
     @Override
     public void start(Map<String, String> props) {
@@ -202,6 +202,7 @@ public class MirrorSourceConnector extends SourceConnector {
             throws InterruptedException, ExecutionException {
         Set<String> topics = listTopics(targetAdminClient).stream()
             .filter(t -> 
sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+            .filter(t -> !t.equals(config.checkpointsTopic()))
             .collect(Collectors.toSet());
         return describeTopics(targetAdminClient, topics).stream()
                 .flatMap(MirrorSourceConnector::expandTopicDescription)
@@ -211,23 +212,44 @@ public class MirrorSourceConnector extends 
SourceConnector {
     // visible for testing
     void refreshTopicPartitions()
             throws InterruptedException, ExecutionException {
-        knownSourceTopicPartitions = findSourceTopicPartitions();
-        knownTargetTopicPartitions = findTargetTopicPartitions();
-        List<TopicPartition> upstreamTargetTopicPartitions = 
knownTargetTopicPartitions.stream()
+
+        List<TopicPartition> sourceTopicPartitions = 
findSourceTopicPartitions();
+        List<TopicPartition> targetTopicPartitions = 
findTargetTopicPartitions();
+
+        Set<TopicPartition> sourceTopicPartitionsSet = new 
HashSet<>(sourceTopicPartitions);
+        Set<TopicPartition> knownSourceTopicPartitionsSet = new 
HashSet<>(knownSourceTopicPartitions);
+
+        Set<TopicPartition> upstreamTargetTopicPartitions = 
targetTopicPartitions.stream()
                 .map(x -> new 
TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
-                .collect(Collectors.toList());
+                .collect(Collectors.toSet());
+
+        Set<TopicPartition> missingInTarget = new 
HashSet<>(sourceTopicPartitions);
+        missingInTarget.removeAll(upstreamTargetTopicPartitions);
+
+        knownTargetTopicPartitions = targetTopicPartitions;
+
+        // Detect if topic-partitions were added or deleted from the source 
cluster
+        // or if topic-partitions are missing from the target cluster
+        if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || 
!missingInTarget.isEmpty()) {
+
+            Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet;
+            newTopicPartitions.removeAll(knownSourceTopicPartitions);
+
+            Set<TopicPartition> deletedTopicPartitions = 
knownSourceTopicPartitionsSet;
+            deletedTopicPartitions.removeAll(sourceTopicPartitions);
+
+            log.info("Found {} new topic-partitions on {}. " +
+                     "Found {} deleted topic-partitions on {}. " +
+                     "Found {} topic-partitions missing on {}.",
+                     newTopicPartitions.size(), sourceAndTarget.source(),
+                     deletedTopicPartitions.size(), sourceAndTarget.source(),
+                     missingInTarget.size(), sourceAndTarget.target());
+
+            log.trace("Found new topic-partitions on {}: {}", 
sourceAndTarget.source(), newTopicPartitions);
+            log.trace("Found deleted topic-partitions on {}: {}", 
sourceAndTarget.source(), deletedTopicPartitions);
+            log.trace("Found missing topic-partitions on {}: {}", 
sourceAndTarget.target(), missingInTarget);
 
-        Set<TopicPartition> newTopicPartitions = new HashSet<>();
-        newTopicPartitions.addAll(knownSourceTopicPartitions);
-        newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
-        Set<TopicPartition> deadTopicPartitions = new HashSet<>();
-        deadTopicPartitions.addAll(upstreamTargetTopicPartitions);
-        deadTopicPartitions.removeAll(knownSourceTopicPartitions);
-        if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
-            log.info("Found {} topic-partitions on {}. {} are new. {} were 
removed. Previously had {}.",
-                    knownSourceTopicPartitions.size(), 
sourceAndTarget.source(), newTopicPartitions.size(),
-                    deadTopicPartitions.size(), 
knownSourceTopicPartitions.size());
-            log.trace("Found new topic-partitions: {}", newTopicPartitions);
+            knownSourceTopicPartitions = sourceTopicPartitions;
             computeAndCreateTopicPartitions();
             context.requestTaskReconfiguration();
         }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index e86d21e..c915845 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -186,7 +186,7 @@ public class MirrorSourceConnectorTest {
         connector.initialize(mock(ConnectorContext.class));
         connector = spy(connector);
 
-        List<TopicPartition> sourceTopicPartitions = Arrays.asList(new 
TopicPartition("topic", 0));
+        List<TopicPartition> sourceTopicPartitions = 
Collections.singletonList(new TopicPartition("topic", 0));
         
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
         
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
         doNothing().when(connector).createTopicPartitions(any(), any(), any());
@@ -205,11 +205,38 @@ public class MirrorSourceConnectorTest {
                 eq(expectedNewTopics),
                 eq(Collections.emptyMap()));
 
-        List<TopicPartition> targetTopicPartitions = Arrays.asList(new 
TopicPartition("source.topic", 0));
+        List<TopicPartition> targetTopicPartitions = 
Collections.singletonList(new TopicPartition("source.topic", 0));
         
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
         connector.refreshTopicPartitions();
 
         // once target topic is created, refreshTopicPartitions() will NOT 
call computeAndCreateTopicPartitions() again
         verify(connector, times(2)).computeAndCreateTopicPartitions();
     }
+
+    @Test
+    public void testRefreshTopicPartitionsTopicOnTargetFirst() throws 
Exception {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), new DefaultTopicFilter(), new 
DefaultConfigPropertyFilter());
+        connector.initialize(mock(ConnectorContext.class));
+        connector = spy(connector);
+
+        List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
+        List<TopicPartition> targetTopicPartitions = 
Collections.singletonList(new TopicPartition("source.topic", 0));
+        
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+        
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
+        doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
+        // partitions appearing on the target cluster should not cause 
reconfiguration
+        connector.refreshTopicPartitions();
+        connector.refreshTopicPartitions();
+        verify(connector, times(0)).computeAndCreateTopicPartitions();
+
+        sourceTopicPartitions = Collections.singletonList(new 
TopicPartition("topic", 0));
+        
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+
+        // when partitions are added to the source cluster, reconfiguration is 
triggered
+        connector.refreshTopicPartitions();
+        verify(connector, times(1)).computeAndCreateTopicPartitions();
+
+    }
 }

Reply via email to