This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new f1d61f7ff6d MINOR: Revert timing change for creating connect config
(#20891)
f1d61f7ff6d is described below
commit f1d61f7ff6df9cd368a38d7be5c4ff81fa60f40e
Author: majialong <[email protected]>
AuthorDate: Sat Nov 22 07:14:14 2025 +0800
MINOR: Revert timing change for creating connect config (#20891)
[This PR](https://github.com/apache/kafka/pull/20612) adjusted the
creation order of Connect configurations to ensure that the
`plugin.path` config was validated before use.
However, this change caused issues loading classes that only exist in
the `plugin.path` path. This PR reverts the previous changes and adds
additional unit tests to prevent this issue from recurring.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorMaker.java | 4 +-
.../kafka/connect/cli/AbstractConnectCli.java | 8 +-
.../connect/runtime/isolation/PluginUtils.java | 3 +
.../kafka/connect/cli/AbstractConnectCliTest.java | 173 +++++++++++++++++++++
4 files changed, 184 insertions(+), 4 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 6a412112c3f..01e38340b93 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -233,13 +233,15 @@ public class MirrorMaker {
private void addHerder(SourceAndTarget sourceAndTarget) {
log.info("creating herder for {}", sourceAndTarget.toString());
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
- DistributedConfig distributedConfig = new
DistributedConfig(workerProps);
String encodedSource = encodePath(sourceAndTarget.source());
String encodedTarget = encodePath(sourceAndTarget.target());
List<String> restNamespace = List.of(encodedSource, encodedTarget);
String workerId = generateWorkerId(sourceAndTarget);
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
+ // create DistributedConfig only after
compareAndSwapWithDelegatingLoader to
+ // ensure plugin classes on plugin.path are loadable
+ DistributedConfig distributedConfig = new
DistributedConfig(workerProps);
String kafkaClusterId = distributedConfig.kafkaClusterId();
String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
// Create the admin client to be shared by all backing stores for this
herder
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 5a8bc5e08ae..4cd43d7c9ef 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -114,9 +114,6 @@ public abstract class AbstractConnectCli<H extends Herder,
T extends WorkerConfi
log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs();
- T config = createConfig(workerProps);
- log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
-
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
@@ -124,6 +121,11 @@ public abstract class AbstractConnectCli<H extends Herder,
T extends WorkerConfi
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
+ // must call createConfig after
plugins.compareAndSwapWithDelegatingLoader()
+ // because WorkerConfig may instantiate classes only available on
plugin.path.
+ T config = createConfig(workerProps);
+ log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
RestClient restClient = new RestClient(config);
ConnectRestServer restServer = new
ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index ac7deaa4b35..729074d508e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -209,6 +209,9 @@ public class PluginUtils {
for (String path : pluginPathElements) {
try {
Path pluginPathElement = Paths.get(path).toAbsolutePath();
+ if (pluginPath.isEmpty()) {
+ log.warn("Plugin path element is empty, evaluating to
{}.", pluginPathElement);
+ }
if (!Files.exists(pluginPathElement)) {
throw new
FileNotFoundException(pluginPathElement.toString());
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
new file mode 100644
index 00000000000..5e5bcaf095c
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.config.ConfigException;
+import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.spy;
+
+public class AbstractConnectCliTest {
+
+ /**
+ * Verifies that createConfig is called after
compareAndSwapWithDelegatingLoader in startConnect.
+ * If the order is wrong, ConfigProvider classes in plugin.path cannot be
loaded.
+ */
+ @Test
+ public void testStartConnectEnforcesCorrectOrder() {
+ ClassLoader originalTCCL =
Thread.currentThread().getContextClassLoader();
+
+ try {
+ // Create worker props with ConfigProvider that's only in
plugin.path
+ Set<Path> pluginPaths =
TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_CONFIG_PROVIDER);
+ String pluginPath = String.join(",",
pluginPaths.stream().map(Path::toString).toList());
+
+ Map<String, String> workerProps =
createBaseWorkerProps(pluginPath);
+ workerProps.put(WorkerConfig.CONFIG_PROVIDERS_CONFIG,
"testProvider");
+ String providerClassName =
TestPlugins.TestPlugin.SAMPLING_CONFIG_PROVIDER.className();
+ workerProps.put(WorkerConfig.CONFIG_PROVIDERS_CONFIG +
".testProvider.class", providerClassName);
+
+ // Use a restricted classloader that cannot find the
ConfigProvider class
+ ClassLoader restrictedClassLoader = new
RestrictedClassLoader(providerClassName);
+
Thread.currentThread().setContextClassLoader(restrictedClassLoader);
+
+ // Verify the restricted classloader cannot load the
ConfigProvider class
+ assertThrows(ClassNotFoundException.class, () ->
+ restrictedClassLoader.loadClass(providerClassName));
+
+ // Config creation should fail when ConfigProvider class cannot be
loaded
+ assertThrows(ConfigException.class, () -> new
DistributedConfig(workerProps));
+
+ // Call startConnect and verify the order is correct
+ TestConnectCli testConnectCli = new TestConnectCli();
+
+ // Mock ConnectRestServer to avoid actual server initialization
+ try (MockedConstruction<ConnectRestServer> mockRestServer =
mockConstruction(
+ ConnectRestServer.class,
+ (mock, context) -> {
+
doReturn(URI.create("http://localhost:8083")).when(mock).advertisedUrl();
+ doNothing().when(mock).initializeServer();
+ })) {
+
+ // If order is correct, createConfig succeeds and we reach
createHerder which throws ExpectedException
+ assertThrows(ExpectedException.class, () ->
testConnectCli.startConnect(workerProps));
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalTCCL);
+ }
+ }
+
+ private Map<String, String> createBaseWorkerProps(String pluginPath) {
+ Map<String, String> props = new HashMap<>();
+ props.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-cluster");
+ props.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+ props.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"connect-offsets");
+ props.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"connect-status");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ return props;
+ }
+
+ /**
+ * Test implementation that calls the parent's startConnect to verify
correct order.
+ */
+ private static class TestConnectCli extends AbstractConnectCli<Herder,
DistributedConfig> {
+ TestConnectCli() {
+ super();
+ }
+
+ @Override
+ protected String usage() {
+ return "test";
+ }
+
+ @Override
+ protected Herder createHerder(DistributedConfig config, String
workerId, Plugins plugins,
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
+ RestServer restServer,
+ RestClient restClient) {
+ // Reaching createHerder means createConfig succeeded, indicating
correct order was maintained
+ throw new ExpectedException();
+ }
+
+ @Override
+ protected DistributedConfig createConfig(Map<String, String>
workerProps) {
+ DistributedConfig config = new DistributedConfig(workerProps);
+ // Mock kafkaClusterId() to avoid connecting to Kafka broker
+ DistributedConfig spyConfig = spy(config);
+ doReturn("test-cluster-id").when(spyConfig).kafkaClusterId();
+ return spyConfig;
+ }
+ }
+
+ /**
+ * ClassLoader that cannot load a specific class, simulating plugin
classes only in plugin.path.
+ */
+ private static class RestrictedClassLoader extends ClassLoader {
+ private final String restrictedClassName;
+ private final ClassLoader systemLoader;
+
+ RestrictedClassLoader(String restrictedClassName) {
+ super(null); // No parent to prevent delegation
+ this.restrictedClassName = restrictedClassName;
+ this.systemLoader = ClassLoader.getSystemClassLoader();
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
+ // Block the restricted class to simulate it being only in
plugin.path, not classpath.
+ if (name.equals(restrictedClassName)) {
+ throw new ClassNotFoundException("Class " + name + " not found
(restricted for testing)");
+ }
+ // For other classes, delegate to system classloader
+ return systemLoader.loadClass(name);
+ }
+ }
+
+ /**
+ * Exception thrown by createHerder to indicate that createConfig
succeeded and correct order was maintained.
+ * If this exception is thrown, it means
compareAndSwapWithDelegatingLoader was called before createConfig.
+ */
+ private static class ExpectedException extends RuntimeException {
+ ExpectedException() {
+ super("Expected exception, createConfig succeeded");
+ }
+ }
+}