This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new f1d61f7ff6d MINOR: Revert timing change for creating connect config 
(#20891)
f1d61f7ff6d is described below

commit f1d61f7ff6df9cd368a38d7be5c4ff81fa60f40e
Author: majialong <[email protected]>
AuthorDate: Sat Nov 22 07:14:14 2025 +0800

    MINOR: Revert timing change for creating connect config (#20891)
    
    [This PR](https://github.com/apache/kafka/pull/20612) adjusted the
    creation order of Connect configurations to ensure that the
    `plugin.path` config was validated before use.
    
    However, this change caused issues loading classes that only exist in
    the `plugin.path` path. This PR reverts the previous changes and adds
    additional unit tests to prevent this issue from recurring.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   |   4 +-
 .../kafka/connect/cli/AbstractConnectCli.java      |   8 +-
 .../connect/runtime/isolation/PluginUtils.java     |   3 +
 .../kafka/connect/cli/AbstractConnectCliTest.java  | 173 +++++++++++++++++++++
 4 files changed, 184 insertions(+), 4 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 6a412112c3f..01e38340b93 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -233,13 +233,15 @@ public class MirrorMaker {
     private void addHerder(SourceAndTarget sourceAndTarget) {
         log.info("creating herder for {}", sourceAndTarget.toString());
         Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
-        DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String encodedSource = encodePath(sourceAndTarget.source());
         String encodedTarget = encodePath(sourceAndTarget.target());
         List<String> restNamespace = List.of(encodedSource, encodedTarget);
         String workerId = generateWorkerId(sourceAndTarget);
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
+        // create DistributedConfig only after 
compareAndSwapWithDelegatingLoader to
+        // ensure plugin classes on plugin.path are loadable
+        DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = distributedConfig.kafkaClusterId();
         String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
         // Create the admin client to be shared by all backing stores for this 
herder
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 5a8bc5e08ae..4cd43d7c9ef 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -114,9 +114,6 @@ public abstract class AbstractConnectCli<H extends Herder, 
T extends WorkerConfi
         log.info("Kafka Connect worker initializing ...");
         long initStart = time.hiResClockMs();
 
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
-
         WorkerInfo initInfo = new WorkerInfo();
         initInfo.logAll();
 
@@ -124,6 +121,11 @@ public abstract class AbstractConnectCli<H extends Herder, 
T extends WorkerConfi
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
 
+        // must call createConfig after 
plugins.compareAndSwapWithDelegatingLoader()
+        // because WorkerConfig may instantiate classes only available on 
plugin.path.
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
         RestClient restClient = new RestClient(config);
 
         ConnectRestServer restServer = new 
ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index ac7deaa4b35..729074d508e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -209,6 +209,9 @@ public class PluginUtils {
         for (String path : pluginPathElements) {
             try {
                 Path pluginPathElement = Paths.get(path).toAbsolutePath();
+                if (pluginPath.isEmpty()) {
+                    log.warn("Plugin path element is empty, evaluating to 
{}.", pluginPathElement);
+                }
                 if (!Files.exists(pluginPathElement)) {
                     throw new 
FileNotFoundException(pluginPathElement.toString());
                 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
new file mode 100644
index 00000000000..5e5bcaf095c
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/AbstractConnectCliTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.config.ConfigException;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.TestPlugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.spy;
+
+public class AbstractConnectCliTest {
+
+    /**
+     * Verifies that createConfig is called after 
compareAndSwapWithDelegatingLoader in startConnect.
+     * If the order is wrong, ConfigProvider classes in plugin.path cannot be 
loaded.
+     */
+    @Test
+    public void testStartConnectEnforcesCorrectOrder() {
+        ClassLoader originalTCCL = 
Thread.currentThread().getContextClassLoader();
+
+        try {
+            // Create worker props with ConfigProvider that's only in 
plugin.path
+            Set<Path> pluginPaths = 
TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_CONFIG_PROVIDER);
+            String pluginPath = String.join(",", 
pluginPaths.stream().map(Path::toString).toList());
+
+            Map<String, String> workerProps = 
createBaseWorkerProps(pluginPath);
+            workerProps.put(WorkerConfig.CONFIG_PROVIDERS_CONFIG, 
"testProvider");
+            String providerClassName = 
TestPlugins.TestPlugin.SAMPLING_CONFIG_PROVIDER.className();
+            workerProps.put(WorkerConfig.CONFIG_PROVIDERS_CONFIG + 
".testProvider.class", providerClassName);
+
+            // Use a restricted classloader that cannot find the 
ConfigProvider class
+            ClassLoader restrictedClassLoader = new 
RestrictedClassLoader(providerClassName);
+            
Thread.currentThread().setContextClassLoader(restrictedClassLoader);
+
+            // Verify the restricted classloader cannot load the 
ConfigProvider class
+            assertThrows(ClassNotFoundException.class, () ->
+                restrictedClassLoader.loadClass(providerClassName));
+
+            // Config creation should fail when ConfigProvider class cannot be 
loaded
+            assertThrows(ConfigException.class, () -> new 
DistributedConfig(workerProps));
+
+            // Call startConnect and verify the order is correct
+            TestConnectCli testConnectCli = new TestConnectCli();
+
+            // Mock ConnectRestServer to avoid actual server initialization
+            try (MockedConstruction<ConnectRestServer> mockRestServer = 
mockConstruction(
+                ConnectRestServer.class,
+                (mock, context) -> {
+                    
doReturn(URI.create("http://localhost:8083";)).when(mock).advertisedUrl();
+                    doNothing().when(mock).initializeServer();
+                })) {
+
+                // If order is correct, createConfig succeeds and we reach 
createHerder which throws ExpectedException
+                assertThrows(ExpectedException.class, () -> 
testConnectCli.startConnect(workerProps));
+            }
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalTCCL);
+        }
+    }
+
+    private Map<String, String> createBaseWorkerProps(String pluginPath) {
+        Map<String, String> props = new HashMap<>();
+        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-cluster");
+        props.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+        props.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"connect-offsets");
+        props.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"connect-status");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonConverter");
+        return props;
+    }
+
+    /**
+     * Test implementation that calls the parent's startConnect to verify 
correct order.
+     */
+    private static class TestConnectCli extends AbstractConnectCli<Herder, 
DistributedConfig> {
+        TestConnectCli() {
+            super();
+        }
+
+        @Override
+        protected String usage() {
+            return "test";
+        }
+
+        @Override
+        protected Herder createHerder(DistributedConfig config, String 
workerId, Plugins plugins,
+                                      ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                                      RestServer restServer,
+                                      RestClient restClient) {
+            // Reaching createHerder means createConfig succeeded, indicating 
correct order was maintained
+            throw new ExpectedException();
+        }
+
+        @Override
+        protected DistributedConfig createConfig(Map<String, String> 
workerProps) {
+            DistributedConfig config = new DistributedConfig(workerProps);
+            // Mock kafkaClusterId() to avoid connecting to Kafka broker
+            DistributedConfig spyConfig = spy(config);
+            doReturn("test-cluster-id").when(spyConfig).kafkaClusterId();
+            return spyConfig;
+        }
+    }
+
+    /**
+     * ClassLoader that cannot load a specific class, simulating plugin 
classes only in plugin.path.
+     */
+    private static class RestrictedClassLoader extends ClassLoader {
+        private final String restrictedClassName;
+        private final ClassLoader systemLoader;
+
+        RestrictedClassLoader(String restrictedClassName) {
+            super(null); // No parent to prevent delegation
+            this.restrictedClassName = restrictedClassName;
+            this.systemLoader = ClassLoader.getSystemClassLoader();
+        }
+
+        @Override
+        protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+            // Block the restricted class to simulate it being only in 
plugin.path, not classpath.
+            if (name.equals(restrictedClassName)) {
+                throw new ClassNotFoundException("Class " + name + " not found 
(restricted for testing)");
+            }
+            // For other classes, delegate to system classloader
+            return systemLoader.loadClass(name);
+        }
+    }
+
+    /**
+     * Exception thrown by createHerder to indicate that createConfig 
succeeded and correct order was maintained.
+     * If this exception is thrown, it means 
compareAndSwapWithDelegatingLoader was called before createConfig.
+     */
+    private static class ExpectedException extends RuntimeException {
+        ExpectedException() {
+            super("Expected exception, createConfig succeeded");
+        }
+    }
+}

Reply via email to