This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d650ff9e5c KAFKA-17534: Add configuration to disable the heartbeats 
topic replication (#17413)
6d650ff9e5c is described below

commit 6d650ff9e5c88b707fdf0107beb9a0f1da1a2f8e
Author: Dániel Urbán <[email protected]>
AuthorDate: Thu Oct 10 14:21:36 2024 +0200

    KAFKA-17534: Add configuration to disable the heartbeats topic replication 
(#17413)
    
    Introducing heartbeats.replication.enabled to explicitly disable the 
default heartbeats topic replication.
    This change implements KIP-1089.
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorSourceConfig.java | 17 +++++++++++++++++
 .../kafka/connect/mirror/MirrorSourceConnector.java     | 12 +++++++++++-
 .../kafka/connect/mirror/MirrorSourceConnectorTest.java | 16 ++++++++++++++++
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 04bfd68c62c..dc0da538233 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -88,6 +88,12 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
     public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
 
+    public static final String HEARTBEATS_REPLICATION_ENABLED = 
"heartbeats.replication" + ENABLED_SUFFIX;
+    private static final String HEARTBEATS_REPLICATION_ENABLED_DOC = "Whether 
to replicate the heartbeats topics even when the topic filter does not include 
them." +
+            " If set to true, heartbeats topics identified by the replication 
policy will always be replicated, regardless of the topic filter 
configuration." +
+            " If set to false, heartbeats topics will only be replicated if 
the topic filter allows.";
+    public static final boolean HEARTBEATS_REPLICATION_ENABLED_DEFAULT = true;
+
     public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
     public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
     public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
@@ -193,6 +199,10 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
         return getBoolean(EMIT_OFFSET_SYNCS_ENABLED);
     }
 
+    boolean heartbeatsReplicationEnabled() {
+        return getBoolean(HEARTBEATS_REPLICATION_ENABLED);
+    }
+
     private static ConfigDef defineSourceConfig(ConfigDef baseConfig) {
         return baseConfig
                 .define(
@@ -298,6 +308,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
                         EMIT_OFFSET_SYNCS_ENABLED_DEFAULT,
                         ConfigDef.Importance.LOW,
                         EMIT_OFFSET_SYNCS_ENABLED_DOC
+                )
+                .define(
+                        HEARTBEATS_REPLICATION_ENABLED,
+                        ConfigDef.Type.BOOLEAN,
+                        HEARTBEATS_REPLICATION_ENABLED_DEFAULT,
+                        ConfigDef.Importance.LOW,
+                        HEARTBEATS_REPLICATION_ENABLED_DOC
                 );
     }
 
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 14929bd1750..f65899dac6e 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -105,6 +105,7 @@ public class MirrorSourceConnector extends SourceConnector {
     private int replicationFactor;
     private Admin sourceAdminClient;
     private Admin targetAdminClient;
+    private boolean heartbeatsReplicationEnabled;
 
     public MirrorSourceConnector() {
         // nop
@@ -119,10 +120,17 @@ public class MirrorSourceConnector extends 
SourceConnector {
     // visible for testing
     MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy 
replicationPolicy,
             TopicFilter topicFilter, ConfigPropertyFilter 
configPropertyFilter) {
+        this(sourceAndTarget, replicationPolicy, topicFilter, 
configPropertyFilter, true);
+    }
+
+    // visible for testing
+    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy 
replicationPolicy,
+            TopicFilter topicFilter, ConfigPropertyFilter 
configPropertyFilter, boolean heartbeatsReplicationEnabled) {
         this.sourceAndTarget = sourceAndTarget;
         this.replicationPolicy = replicationPolicy;
         this.topicFilter = topicFilter;
         this.configPropertyFilter = configPropertyFilter;
+        this.heartbeatsReplicationEnabled = heartbeatsReplicationEnabled;
     }
 
     // visible for testing
@@ -147,6 +155,7 @@ public class MirrorSourceConnector extends SourceConnector {
         replicationFactor = config.replicationFactor();
         sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
         targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
+        heartbeatsReplicationEnabled = config.heartbeatsReplicationEnabled();
 
         scheduler = new Scheduler(getClass(), config.entityLabel(), 
config.adminTimeout());
         scheduler.execute(this::createOffsetSyncsTopic, "creating upstream 
offset-syncs topic");
@@ -687,7 +696,8 @@ public class MirrorSourceConnector extends SourceConnector {
     }
 
     boolean shouldReplicateTopic(String topic) {
-        return (topicFilter.shouldReplicateTopic(topic) || 
replicationPolicy.isHeartbeatsTopic(topic))
+        return (topicFilter.shouldReplicateTopic(topic)
+                || (heartbeatsReplicationEnabled && 
replicationPolicy.isHeartbeatsTopic(topic)))
             && !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
     }
 
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index fae73092a7f..a410adde944 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -112,6 +112,22 @@ public class MirrorSourceConnectorTest {
         assertFalse(connector.shouldReplicateTopic("us-west.heartbeats"), 
"should not consider this topic as a heartbeats topic");
     }
 
+    @Test
+    public void testDoesNotReplicateHeartbeatsWhenDisabled() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), new DefaultTopicFilter(), new 
DefaultConfigPropertyFilter(), false);
+        assertFalse(connector.shouldReplicateTopic("heartbeats"), "should not 
replicate heartbeats");
+        assertFalse(connector.shouldReplicateTopic("us-west.heartbeats"), 
"should not replicate upstream heartbeats");
+    }
+
+    @Test
+    public void testReplicatesHeartbeatsWhenDisabledButFilterAllows() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), x -> true, new 
DefaultConfigPropertyFilter(), false);
+        assertTrue(connector.shouldReplicateTopic("heartbeats"), "should 
replicate heartbeats");
+        assertTrue(connector.shouldReplicateTopic("us-west.heartbeats"), 
"should replicate upstream heartbeats");
+    }
+
     @Test
     public void testNoCycles() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),

Reply via email to