This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3800e02f3f KAFKA-18843: MirrorMaker2 unique workerId (#18994)
e3800e02f3f is described below

commit e3800e02f3f55fb1d021c9b18fcc6270f5984387
Author: Kondrat Bertalan <[email protected]>
AuthorDate: Fri Mar 14 09:49:57 2025 +0100

    KAFKA-18843: MirrorMaker2 unique workerId (#18994)
    
    Change the worker ID to the combination of the host name, flow and a random 
UUID to make it unique.
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   | 26 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index b0cc368a5fc..9c9513e6472 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -53,7 +53,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URI;
 import java.net.URLEncoder;
+import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
@@ -63,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,7 +113,7 @@ public class MirrorMaker {
     private CountDownLatch stopLatch;
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private final ShutdownHook shutdownHook;
-    private final String advertisedUrl;
+    private final URI advertisedUrl;
     private final Time time;
     private final MirrorMakerConfig config;
     private final Set<String> clusters;
@@ -130,11 +134,11 @@ public class MirrorMaker {
             this.restClient = new RestClient(config);
             internalServer = new MirrorRestServer(config.originals(), 
restClient);
             internalServer.initializeServer();
-            this.advertisedUrl = internalServer.advertisedUrl().toString();
+            this.advertisedUrl = internalServer.advertisedUrl();
         } else {
             internalServer = null;
             restClient = null;
-            this.advertisedUrl = "NOTUSED";
+            this.advertisedUrl = URI.create("NOTUSED");
         }
         this.config = config;
         if (clusters != null && !clusters.isEmpty()) {
@@ -240,7 +244,7 @@ public class MirrorMaker {
         } catch (UnsupportedEncodingException e) {
             throw new RuntimeException("Unable to create encoded URL paths for 
source and target using UTF-8", e);
         }
-        String workerId = sourceAndTarget.toString();
+        String workerId = generateWorkerId(sourceAndTarget);
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
@@ -273,7 +277,7 @@ public class MirrorMaker {
         // tracking the various shared admin objects in this class.
         Herder herder = new MirrorHerder(config, sourceAndTarget, 
distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, restClient, clientConfigOverridePolicy,
+                advertisedUrl.toString(), restClient, 
clientConfigOverridePolicy,
                 restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
@@ -291,6 +295,18 @@ public class MirrorMaker {
                 .replaceAll("\\+", "%20");
     }
 
+    private String generateWorkerId(SourceAndTarget sourceAndTarget) {
+        if (config.enableInternalRest()) {
+            return advertisedUrl.getHost() + ":" + advertisedUrl.getPort() + 
"/" + sourceAndTarget.toString();
+        }
+        try {
+            //UUID to make sure it is unique even if multiple workers running 
on the same host
+            return InetAddress.getLocalHost().getCanonicalHostName() + "/" + 
sourceAndTarget.toString() + "/" + UUID.randomUUID();
+        } catch (UnknownHostException e) {
+            return sourceAndTarget.toString() + "/" + UUID.randomUUID();
+        }
+    }
+
     private class ShutdownHook extends Thread {
         @Override
         public void run() {

Reply via email to