This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0bbed823e81 KAFKA-17200: Allow the replication of user internal topics
(#17815)
0bbed823e81 is described below
commit 0bbed823e818a920fbaa2d54b50f4fcd81a8a759
Author: Patrik Marton <[email protected]>
AuthorDate: Fri Dec 6 15:23:58 2024 +0100
KAFKA-17200: Allow the replication of user internal topics (#17815)
Reviewers: Viktor Somogyi-Vass <[email protected]>
---
.../org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java | 2 +-
.../java/org/apache/kafka/connect/mirror/ReplicationPolicy.java | 5 ++---
.../org/apache/kafka/connect/mirror/ReplicationPolicyTest.java | 8 +++++---
.../java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java | 2 +-
docs/upgrade.html | 5 +++++
5 files changed, 14 insertions(+), 8 deletions(-)
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
index 7733ccf3fd5..ae273c36849 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -115,6 +115,6 @@ public class DefaultReplicationPolicy implements
ReplicationPolicy, Configurable
@Override
public boolean isMM2InternalTopic(String topic) {
- return topic.endsWith(internalSuffix());
+ return topic.startsWith("mm2") && topic.endsWith(internalSuffix()) ||
isCheckpointsTopic(topic);
}
}
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index fbd8725eb64..34d33445097 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -98,7 +98,7 @@ public interface ReplicationPolicy {
* This is used to make sure the topic doesn't need to be replicated.
*/
default boolean isMM2InternalTopic(String topic) {
- return topic.endsWith(".internal");
+ return topic.startsWith("mm2") && topic.endsWith(".internal") ||
isCheckpointsTopic(topic);
}
/**
@@ -106,7 +106,6 @@ public interface ReplicationPolicy {
*/
default boolean isInternalTopic(String topic) {
boolean isKafkaInternalTopic = topic.startsWith("__") ||
topic.startsWith(".");
- boolean isDefaultConnectTopic = topic.endsWith("-internal") ||
topic.endsWith(".internal");
- return isMM2InternalTopic(topic) || isKafkaInternalTopic ||
isDefaultConnectTopic;
+ return isMM2InternalTopic(topic) || isKafkaInternalTopic;
}
}
diff --git
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
index 802d0b606c2..86aaf8ffd0e 100644
---
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
+++
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
@@ -38,15 +38,17 @@ public class ReplicationPolicyTest {
@Test
public void testInternalTopic() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, ".");
+ DEFAULT_REPLICATION_POLICY.configure(config);
+
// starts with '__'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
// starts with '.'
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
- // ends with '.internal': default
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
+ // starts with 'mm2' and ends with '.internal': default
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
- // ends with '-internal'
-
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
// non-internal topic.
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
index 35da6132c9f..95414685ba7 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -33,7 +33,7 @@ public class DefaultTopicFilter implements TopicFilter {
public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or
regexes that should not be replicated.";
- public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal,
.*\\.replica, __.*";
+ public static final String TOPICS_EXCLUDE_DEFAULT = "mm2.*\\.internal,
.*\\.replica, __.*";
private Pattern includePattern;
private Pattern excludePattern;
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 84b2b36ebc1..5967aeb9cee 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -204,6 +204,11 @@
index. This API is used when the consumers are enabled
with isolation level as READ_COMMITTED.
See <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions">KIP-1058</a>
for more details.
</li>
+ <li>
+ The criteria for identifying internal topics in
ReplicationPolicy and DefaultReplicationPolicy have
+ been updated to enable the replication of topics that
appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
+ See <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics">KIP-1074</a>
for more details.
+ </li>
</ul>
</li>
</ul>