This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1c7c93b  KAFKA-10710; MM2 - Create herders only if 
source->target.enabled=true and heartbeats are disabled (#9589)
1c7c93b is described below

commit 1c7c93ba37308f4c3c3282b455b5263c0e455f09
Author: Julien Chanaud <[email protected]>
AuthorDate: Thu Jan 28 23:52:51 2021 +0100

    KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and 
heartbeats are disabled (#9589)
    
    By default Mirror Maker 2 creates herders for all the possible combinations 
even if the "links" are not enabled.
    
    This is because the beats are emitted from the "opposite" herder.
    If there is a replication flow from A to B and heartbeats are required, 2 
herders are needed :
    
    - A->B for the MirrorSourceConnector
    - B->A for the MirrorHeartbeatConnector
    
    The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on 
cluster A.
    The MirrorSourceConnector on A->B then replicates whichever topic is 
configured as well as heartbeats.
    
    In cases with multiple clusters (10 and more), this leads to an incredible 
amount of connections, file descriptors and configuration topics created in 
every target clusters that are not necessary.
    
    With this code change, we will leverage the top level property 
"emit.heartbeats.enabled" which defaults to "true".
    We skip creating the A->B herder whenever 
A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false 
(defaults to false).
    
    Existing users will not see any change and if they depend on these 
"opposites" herders for their monitoring, it will still work.
    New users with more complex use case can change this property and fine tune 
their heartbeat generation.
    
    Reviewers: Ryanne Dolan <[email protected]>,  Sanjana Kaundinya 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../kafka/connect/mirror/MirrorMakerConfig.java    | 23 ++++++-
 .../connect/mirror/MirrorMakerConfigTest.java      | 75 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 059ab78..b5c361c 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -87,11 +87,30 @@ public class MirrorMakerConfig extends AbstractConfig {
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
+        Map<String, String> originalStrings = originalsStrings();
+        boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+        if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+            globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+        }
+
         for (String source : clusters) {
             for (String target : clusters) {
-                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
                 if (!source.equals(target)) {
-                    pairs.add(sourceAndTarget);
+                    String clusterPairConfigPrefix = source + "->" + target + 
".";
+                    boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+                    boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+                    if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+                        clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+                    }
+
+                    // By default, all source->target Herder combinations are 
created even if `x->y.enabled=false`
+                    // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
+                    // Reason for this behavior: for a given replication flow 
A->B with heartbeats, 2 herders are required :
+                    // B->A for the MirrorHeartbeatConnector (emits heartbeats 
into A for monitoring replication health)
+                    // A->B for the MirrorSourceConnector (actual replication 
flow)
+                    if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
+                        pairs.add(new SourceAndTarget(source, target));
+                    }
                 }
             }
         }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 1cba87f..c223acc 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -256,6 +257,80 @@ public class MirrorMakerConfigTest {
             "secret2", bProps.get("producer.ssl.key.password"));
     }
 
+    @Test
+    public void testClusterPairsWithDefaultSettings() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c"));
+        // implicit configuration associated
+        // a->b.enabled=false
+        // a->b.emit.heartbeat.enabled=true
+        // a->c.enabled=false
+        // a->c.emit.heartbeat.enabled=true
+        // b->a.enabled=false
+        // b->a.emit.heartbeat.enabled=true
+        // b->c.enabled=false
+        // b->c.emit.heartbeat.enabled=true
+        // c->a.enabled=false
+        // c->a.emit.heartbeat.enabled=true
+        // c->b.enabled=false
+        // c->b.emit.heartbeat.enabled=true
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match all combinations count",
+                6, clusterPairs.size());
+    }
+
+    @Test
+    public void testEmptyClusterPairsWithGloballyDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c",
+                "emit.heartbeats.enabled", "false"));
+        assertEquals("clusterPairs count should be 0",
+                0, mirrorConfig.clusterPairs().size());
+    }
+
+    @Test
+    public void testClusterPairsWithTwoDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c",
+                "a->b.emit.heartbeats.enabled", "false",
+                "a->c.emit.heartbeats.enabled", "false"));
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match all combinations count 
except x->y.emit.heartbeats.enabled=false",
+                4, clusterPairs.size());
+    }
+
+    @Test
+    public void testClusterPairsWithGloballyDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c, d, e, f",
+                "emit.heartbeats.enabled", "false",
+                "a->b.enabled", "true",
+                "a->c.enabled", "true",
+                "a->d.enabled", "true",
+                "a->e.enabled", "false",
+                "a->f.enabled", "false"));
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match (x->y.enabled=true or 
x->y.emit.heartbeats.enabled=true) count",
+                3, clusterPairs.size());
+
+        // Link b->a.enabled doesn't exist therefore it must not be in 
clusterPairs
+        SourceAndTarget sourceAndTarget = new SourceAndTarget("b", "a");
+        assertFalse("disabled/unset link x->y should not be in clusterPairs", 
clusterPairs.contains(sourceAndTarget));
+    }
+
+    @Test
+    public void testClusterPairsWithGloballyDisabledHeartbeatsCentralLocal() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "central, local_one, local_two, beats_emitter",
+                "emit.heartbeats.enabled", "false",
+                "central->local_one.enabled", "true",
+                "central->local_two.enabled", "true",
+                "beats_emitter->central.emit.heartbeats.enabled", "true"));
+
+        assertEquals("clusterPairs count should match (x->y.enabled=true or 
x->y.emit.heartbeats.enabled=true) count",
+                3, mirrorConfig.clusterPairs().size());
+    }
+
     public static class FakeConfigProvider implements ConfigProvider {
 
         Map<String, String> secrets = Collections.singletonMap("password", 
"secret2");

Reply via email to