This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3800e02f3f KAFKA-18843: MirrorMaker2 unique workerId (#18994)
e3800e02f3f is described below
commit e3800e02f3f55fb1d021c9b18fcc6270f5984387
Author: Kondrat Bertalan <[email protected]>
AuthorDate: Fri Mar 14 09:49:57 2025 +0100
KAFKA-18843: MirrorMaker2 unique workerId (#18994)
Change the worker ID to the combination of the host name, flow and a random
UUID to make it unique.
Reviewers: Viktor Somogyi-Vass <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorMaker.java | 26 +++++++++++++++++-----
1 file changed, 21 insertions(+), 5 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index b0cc368a5fc..9c9513e6472 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -53,7 +53,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URI;
import java.net.URLEncoder;
+import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@@ -63,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -109,7 +113,7 @@ public class MirrorMaker {
private CountDownLatch stopLatch;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;
- private final String advertisedUrl;
+ private final URI advertisedUrl;
private final Time time;
private final MirrorMakerConfig config;
private final Set<String> clusters;
@@ -130,11 +134,11 @@ public class MirrorMaker {
this.restClient = new RestClient(config);
internalServer = new MirrorRestServer(config.originals(),
restClient);
internalServer.initializeServer();
- this.advertisedUrl = internalServer.advertisedUrl().toString();
+ this.advertisedUrl = internalServer.advertisedUrl();
} else {
internalServer = null;
restClient = null;
- this.advertisedUrl = "NOTUSED";
+ this.advertisedUrl = URI.create("NOTUSED");
}
this.config = config;
if (clusters != null && !clusters.isEmpty()) {
@@ -240,7 +244,7 @@ public class MirrorMaker {
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unable to create encoded URL paths for
source and target using UTF-8", e);
}
- String workerId = sourceAndTarget.toString();
+ String workerId = generateWorkerId(sourceAndTarget);
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new
DistributedConfig(workerProps);
@@ -273,7 +277,7 @@ public class MirrorMaker {
// tracking the various shared admin objects in this class.
Herder herder = new MirrorHerder(config, sourceAndTarget,
distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl, restClient, clientConfigOverridePolicy,
+ advertisedUrl.toString(), restClient,
clientConfigOverridePolicy,
restNamespace, sharedAdmin);
herders.put(sourceAndTarget, herder);
}
@@ -291,6 +295,18 @@ public class MirrorMaker {
.replaceAll("\\+", "%20");
}
+ private String generateWorkerId(SourceAndTarget sourceAndTarget) {
+ if (config.enableInternalRest()) {
+ return advertisedUrl.getHost() + ":" + advertisedUrl.getPort() +
"/" + sourceAndTarget.toString();
+ }
+ try {
+ //UUID to make sure it is unique even if multiple workers running
on the same host
+ return InetAddress.getLocalHost().getCanonicalHostName() + "/" +
sourceAndTarget.toString() + "/" + UUID.randomUUID();
+ } catch (UnknownHostException e) {
+ return sourceAndTarget.toString() + "/" + UUID.randomUUID();
+ }
+ }
+
private class ShutdownHook extends Thread {
@Override
public void run() {