This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bbed823e81 KAFKA-17200: Allow the replication of user internal topics 
(#17815)
0bbed823e81 is described below

commit 0bbed823e818a920fbaa2d54b50f4fcd81a8a759
Author: Patrik Marton <[email protected]>
AuthorDate: Fri Dec 6 15:23:58 2024 +0100

    KAFKA-17200: Allow the replication of user internal topics (#17815)
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>
---
 .../org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java | 2 +-
 .../java/org/apache/kafka/connect/mirror/ReplicationPolicy.java   | 5 ++---
 .../org/apache/kafka/connect/mirror/ReplicationPolicyTest.java    | 8 +++++---
 .../java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java  | 2 +-
 docs/upgrade.html                                                 | 5 +++++
 5 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
index 7733ccf3fd5..ae273c36849 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -115,6 +115,6 @@ public class DefaultReplicationPolicy implements 
ReplicationPolicy, Configurable
 
     @Override
     public boolean isMM2InternalTopic(String topic) {
-        return  topic.endsWith(internalSuffix());
+        return  topic.startsWith("mm2") && topic.endsWith(internalSuffix()) || 
isCheckpointsTopic(topic);
     }
 }
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index fbd8725eb64..34d33445097 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -98,7 +98,7 @@ public interface ReplicationPolicy {
      * This is used to make sure the topic doesn't need to be replicated.
      */
     default boolean isMM2InternalTopic(String topic) {
-        return  topic.endsWith(".internal");
+        return  topic.startsWith("mm2") && topic.endsWith(".internal") || 
isCheckpointsTopic(topic);
     }
 
     /**
@@ -106,7 +106,6 @@ public interface ReplicationPolicy {
      */
     default boolean isInternalTopic(String topic) {
         boolean isKafkaInternalTopic = topic.startsWith("__") || 
topic.startsWith(".");
-        boolean isDefaultConnectTopic =  topic.endsWith("-internal") ||  
topic.endsWith(".internal");
-        return isMM2InternalTopic(topic) || isKafkaInternalTopic || 
isDefaultConnectTopic;
+        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
     }
 }
diff --git 
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
index 802d0b606c2..86aaf8ffd0e 100644
--- 
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
+++ 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
@@ -38,15 +38,17 @@ public class ReplicationPolicyTest {
 
     @Test
     public void testInternalTopic() {
+        Map<String, Object> config =  new HashMap<>();
+        config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, ".");
+        DEFAULT_REPLICATION_POLICY.configure(config);
+
         // starts with '__'
         
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
         // starts with '.'
         assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
 
-        // ends with '.internal': default 
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
+        // starts with 'mm2' and ends with '.internal': default 
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
         
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
-        // ends with '-internal'
-        
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
         // non-internal topic.
         
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
     }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
index 35da6132c9f..95414685ba7 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -33,7 +33,7 @@ public class DefaultTopicFilter implements TopicFilter {
 
     public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
     private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or 
regexes that should not be replicated.";
-    public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal, 
.*\\.replica, __.*";
+    public static final String TOPICS_EXCLUDE_DEFAULT = "mm2.*\\.internal, 
.*\\.replica, __.*";
 
     private Pattern includePattern;
     private Pattern excludePattern;
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 84b2b36ebc1..5967aeb9cee 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -204,6 +204,11 @@
                     index. This API is used when the consumers are enabled 
with isolation level as READ_COMMITTED.
                     See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions";>KIP-1058</a>
 for more details.
                 </li>
+                <li>
+                    The criteria for identifying internal topics in 
ReplicationPolicy and DefaultReplicationPolicy have
+                    been updated to enable the replication of topics that 
appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
+                    See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics";>KIP-1074</a>
 for more details.
+                </li>
             </ul>
         </li>
     </ul>

Reply via email to