This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 9aa261a KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
9aa261a is described below
commit 9aa261a6422fdda319fefb7183adfb17d17b6e1f
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Oct 19 17:51:44 2020 +0200
KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster
Authors: Mickael Maison <[email protected]>, Edoardo Comar
<[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../connect/mirror/MirrorSourceConnector.java | 54 +++++++++++++++-------
.../connect/mirror/MirrorSourceConnectorTest.java | 31 ++++++++++++-
2 files changed, 67 insertions(+), 18 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 041814a..0f6eb46 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -96,7 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
this.replicationPolicy = replicationPolicy;
this.topicFilter = topicFilter;
this.configPropertyFilter = configPropertyFilter;
- }
+ }
@Override
public void start(Map<String, String> props) {
@@ -202,6 +202,7 @@ public class MirrorSourceConnector extends SourceConnector {
throws InterruptedException, ExecutionException {
Set<String> topics = listTopics(targetAdminClient).stream()
.filter(t ->
sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+ .filter(t -> !t.equals(config.checkpointsTopic()))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
@@ -211,23 +212,44 @@ public class MirrorSourceConnector extends
SourceConnector {
// visible for testing
void refreshTopicPartitions()
throws InterruptedException, ExecutionException {
- knownSourceTopicPartitions = findSourceTopicPartitions();
- knownTargetTopicPartitions = findTargetTopicPartitions();
- List<TopicPartition> upstreamTargetTopicPartitions =
knownTargetTopicPartitions.stream()
+
+ List<TopicPartition> sourceTopicPartitions =
findSourceTopicPartitions();
+ List<TopicPartition> targetTopicPartitions =
findTargetTopicPartitions();
+
+ Set<TopicPartition> sourceTopicPartitionsSet = new
HashSet<>(sourceTopicPartitions);
+ Set<TopicPartition> knownSourceTopicPartitionsSet = new
HashSet<>(knownSourceTopicPartitions);
+
+ Set<TopicPartition> upstreamTargetTopicPartitions =
targetTopicPartitions.stream()
.map(x -> new
TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
+
+ Set<TopicPartition> missingInTarget = new
HashSet<>(sourceTopicPartitions);
+ missingInTarget.removeAll(upstreamTargetTopicPartitions);
+
+ knownTargetTopicPartitions = targetTopicPartitions;
+
+ // Detect if topic-partitions were added or deleted from the source
cluster
+ // or if topic-partitions are missing from the target cluster
+ if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) ||
!missingInTarget.isEmpty()) {
+
+ Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet;
+ newTopicPartitions.removeAll(knownSourceTopicPartitions);
+
+ Set<TopicPartition> deletedTopicPartitions =
knownSourceTopicPartitionsSet;
+ deletedTopicPartitions.removeAll(sourceTopicPartitions);
+
+ log.info("Found {} new topic-partitions on {}. " +
+ "Found {} deleted topic-partitions on {}. " +
+ "Found {} topic-partitions missing on {}.",
+ newTopicPartitions.size(), sourceAndTarget.source(),
+ deletedTopicPartitions.size(), sourceAndTarget.source(),
+ missingInTarget.size(), sourceAndTarget.target());
+
+ log.trace("Found new topic-partitions on {}: {}",
sourceAndTarget.source(), newTopicPartitions);
+ log.trace("Found deleted topic-partitions on {}: {}",
sourceAndTarget.source(), deletedTopicPartitions);
+ log.trace("Found missing topic-partitions on {}: {}",
sourceAndTarget.target(), missingInTarget);
- Set<TopicPartition> newTopicPartitions = new HashSet<>();
- newTopicPartitions.addAll(knownSourceTopicPartitions);
- newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
- Set<TopicPartition> deadTopicPartitions = new HashSet<>();
- deadTopicPartitions.addAll(upstreamTargetTopicPartitions);
- deadTopicPartitions.removeAll(knownSourceTopicPartitions);
- if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
- log.info("Found {} topic-partitions on {}. {} are new. {} were
removed. Previously had {}.",
- knownSourceTopicPartitions.size(),
sourceAndTarget.source(), newTopicPartitions.size(),
- deadTopicPartitions.size(),
knownSourceTopicPartitions.size());
- log.trace("Found new topic-partitions: {}", newTopicPartitions);
+ knownSourceTopicPartitions = sourceTopicPartitions;
computeAndCreateTopicPartitions();
context.requestTaskReconfiguration();
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index e86d21e..c915845 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -186,7 +186,7 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
- List<TopicPartition> sourceTopicPartitions = Arrays.asList(new
TopicPartition("topic", 0));
+ List<TopicPartition> sourceTopicPartitions =
Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());
@@ -205,11 +205,38 @@ public class MirrorSourceConnectorTest {
eq(expectedNewTopics),
eq(Collections.emptyMap()));
- List<TopicPartition> targetTopicPartitions = Arrays.asList(new
TopicPartition("source.topic", 0));
+ List<TopicPartition> targetTopicPartitions =
Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
connector.refreshTopicPartitions();
// once target topic is created, refreshTopicPartitions() will NOT
call computeAndCreateTopicPartitions() again
verify(connector, times(2)).computeAndCreateTopicPartitions();
}
+
+ @Test
+ public void testRefreshTopicPartitionsTopicOnTargetFirst() throws
Exception {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), new DefaultTopicFilter(), new
DefaultConfigPropertyFilter());
+ connector.initialize(mock(ConnectorContext.class));
+ connector = spy(connector);
+
+ List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
+ List<TopicPartition> targetTopicPartitions =
Collections.singletonList(new TopicPartition("source.topic", 0));
+
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
+ doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
+ // partitions appearing on the target cluster should not cause
reconfiguration
+ connector.refreshTopicPartitions();
+ connector.refreshTopicPartitions();
+ verify(connector, times(0)).computeAndCreateTopicPartitions();
+
+ sourceTopicPartitions = Collections.singletonList(new
TopicPartition("topic", 0));
+
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
+
+ // when partitions are added to the source cluster, reconfiguration is
triggered
+ connector.refreshTopicPartitions();
+ verify(connector, times(1)).computeAndCreateTopicPartitions();
+
+ }
}