This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d650ff9e5c KAFKA-17534: Add configuration to disable the heartbeats
topic replication (#17413)
6d650ff9e5c is described below
commit 6d650ff9e5c88b707fdf0107beb9a0f1da1a2f8e
Author: Dániel Urbán <[email protected]>
AuthorDate: Thu Oct 10 14:21:36 2024 +0200
KAFKA-17534: Add configuration to disable the heartbeats topic replication
(#17413)
Introducing heartbeats.replication.enabled to explicitly disable the
default heartbeats topic replication.
This change implements KIP-1089.
Reviewers: Viktor Somogyi-Vass <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorSourceConfig.java | 17 +++++++++++++++++
.../kafka/connect/mirror/MirrorSourceConnector.java | 12 +++++++++++-
.../kafka/connect/mirror/MirrorSourceConnectorTest.java | 16 ++++++++++++++++
3 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 04bfd68c62c..dc0da538233 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -88,6 +88,12 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
+ public static final String HEARTBEATS_REPLICATION_ENABLED =
"heartbeats.replication" + ENABLED_SUFFIX;
+ private static final String HEARTBEATS_REPLICATION_ENABLED_DOC = "Whether
to replicate the heartbeats topics even when the topic filter does not include
them." +
+ " If set to true, heartbeats topics identified by the replication
policy will always be replicated, regardless of the topic filter
configuration." +
+ " If set to false, heartbeats topics will only be replicated if
the topic filter allows.";
+ public static final boolean HEARTBEATS_REPLICATION_ENABLED_DEFAULT = true;
+
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
@@ -193,6 +199,10 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
return getBoolean(EMIT_OFFSET_SYNCS_ENABLED);
}
+ boolean heartbeatsReplicationEnabled() {
+ return getBoolean(HEARTBEATS_REPLICATION_ENABLED);
+ }
+
private static ConfigDef defineSourceConfig(ConfigDef baseConfig) {
return baseConfig
.define(
@@ -298,6 +308,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
EMIT_OFFSET_SYNCS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_OFFSET_SYNCS_ENABLED_DOC
+ )
+ .define(
+ HEARTBEATS_REPLICATION_ENABLED,
+ ConfigDef.Type.BOOLEAN,
+ HEARTBEATS_REPLICATION_ENABLED_DEFAULT,
+ ConfigDef.Importance.LOW,
+ HEARTBEATS_REPLICATION_ENABLED_DOC
);
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 14929bd1750..f65899dac6e 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -105,6 +105,7 @@ public class MirrorSourceConnector extends SourceConnector {
private int replicationFactor;
private Admin sourceAdminClient;
private Admin targetAdminClient;
+ private boolean heartbeatsReplicationEnabled;
public MirrorSourceConnector() {
// nop
@@ -119,10 +120,17 @@ public class MirrorSourceConnector extends
SourceConnector {
// visible for testing
MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy
replicationPolicy,
TopicFilter topicFilter, ConfigPropertyFilter
configPropertyFilter) {
+ this(sourceAndTarget, replicationPolicy, topicFilter,
configPropertyFilter, true);
+ }
+
+ // visible for testing
+ MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy
replicationPolicy,
+ TopicFilter topicFilter, ConfigPropertyFilter
configPropertyFilter, boolean heartbeatsReplicationEnabled) {
this.sourceAndTarget = sourceAndTarget;
this.replicationPolicy = replicationPolicy;
this.topicFilter = topicFilter;
this.configPropertyFilter = configPropertyFilter;
+ this.heartbeatsReplicationEnabled = heartbeatsReplicationEnabled;
}
// visible for testing
@@ -147,6 +155,7 @@ public class MirrorSourceConnector extends SourceConnector {
replicationFactor = config.replicationFactor();
sourceAdminClient =
config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
targetAdminClient =
config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
+ heartbeatsReplicationEnabled = config.heartbeatsReplicationEnabled();
scheduler = new Scheduler(getClass(), config.entityLabel(),
config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream
offset-syncs topic");
@@ -687,7 +696,8 @@ public class MirrorSourceConnector extends SourceConnector {
}
boolean shouldReplicateTopic(String topic) {
- return (topicFilter.shouldReplicateTopic(topic) ||
replicationPolicy.isHeartbeatsTopic(topic))
+ return (topicFilter.shouldReplicateTopic(topic)
+ || (heartbeatsReplicationEnabled &&
replicationPolicy.isHeartbeatsTopic(topic)))
&& !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index fae73092a7f..a410adde944 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -112,6 +112,22 @@ public class MirrorSourceConnectorTest {
assertFalse(connector.shouldReplicateTopic("us-west.heartbeats"),
"should not consider this topic as a heartbeats topic");
}
+ @Test
+ public void testDoesNotReplicateHeartbeatsWhenDisabled() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), new DefaultTopicFilter(), new
DefaultConfigPropertyFilter(), false);
+ assertFalse(connector.shouldReplicateTopic("heartbeats"), "should not
replicate heartbeats");
+ assertFalse(connector.shouldReplicateTopic("us-west.heartbeats"),
"should not replicate upstream heartbeats");
+ }
+
+ @Test
+ public void testReplicatesHeartbeatsWhenDisabledButFilterAllows() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, new
DefaultConfigPropertyFilter(), false);
+ assertTrue(connector.shouldReplicateTopic("heartbeats"), "should
replicate heartbeats");
+ assertTrue(connector.shouldReplicateTopic("us-west.heartbeats"),
"should replicate upstream heartbeats");
+ }
+
@Test
public void testNoCycles() {
MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),