This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c2792047f24 KAFKA-18211: Override class loaders for class graph 
scanning in connect. (#18403)
c2792047f24 is described below

commit c2792047f24ae281a36d22a3cc5a3ce38df9de08
Author: snehashisp <snehashisp1...@gmail.com>
AuthorDate: Tue Jan 14 23:15:53 2025 +0530

    KAFKA-18211: Override class loaders for class graph scanning in connect. 
(#18403)
    
    Reviewers: Greg Harris <greg.har...@aiven.io>
---
 .../runtime/isolation/ReflectionScanner.java       | 19 ++++-
 .../runtime/isolation/PluginScannerTest.java       | 16 ++++
 .../connect/runtime/isolation/TestPlugins.java     |  9 +-
 .../org.apache.kafka.connect.storage.Converter     | 17 ++++
 .../connect/converters/ByteArrayConverter.java     | 96 ++++++++++++++++++++++
 5 files changed, 154 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
index 5b91c150c86..85f514d5f1e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
@@ -29,7 +29,9 @@ import 
org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.ServiceLoader;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -77,7 +79,7 @@ public class ReflectionScanner extends PluginScanner {
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
         ClassGraph classGraphBuilder = new ClassGraph()
-                .addClassLoader(source.loader())
+                .overrideClassLoaders(classLoaderOrder(source))
                 .enableExternalClasses()
                 .enableClassInfo();
         try (ScanResult classGraph = classGraphBuilder.scan()) {
@@ -105,6 +107,21 @@ public class ReflectionScanner extends PluginScanner {
         return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(classGraph, PluginType.TRANSFORMATION, source);
     }
 
+    private ClassLoader[] classLoaderOrder(PluginSource source) {
+        // Classgraph will first scan all the class URLs from the provided 
classloader chain and use said chain during classloading.
+        // We compute and provide the classloader chain starting from the 
isolated PluginClassLoader to ensure that it adheres
+        // to the child first delegation model used in connect. In addition, 
classgraph can fail to find URLs from the
+        // application classloader as it uses an illegal reflections access. 
Providing the entire chain of classloaders
+        // which included the application classloader forces classpath URLs to 
be scanned separately.
+        List<ClassLoader> classLoaderOrder = new ArrayList<>();
+        ClassLoader cl = source.loader();
+        while (cl != null) {
+            classLoaderOrder.add(cl);
+            cl = cl.getParent();
+        }
+        return classLoaderOrder.toArray(new ClassLoader[0]);
+    }
+
     @SuppressWarnings({"unchecked"})
     private <T> SortedSet<PluginDesc<T>> getPluginDesc(
             ScanResult classGraph,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
index d083f980349..ca099976444 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.connect.storage.Converter;
+
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -26,11 +28,13 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -137,6 +141,18 @@ public class PluginScannerTest {
         versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", 
pluginDesc.version()));
     }
 
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void testClasspathPluginIsAlsoLoadedInIsolation(PluginScanner 
scanner) {
+        Set<Path> isolatedClassPathPlugin = 
TestPlugins.pluginPath(TestPlugins.TestPlugin.CLASSPATH_CONVERTER);
+        PluginScanResult result = scan(scanner, isolatedClassPathPlugin);
+        Optional<PluginDesc<Converter>> pluginDesc = 
result.converters().stream()
+            .filter(desc -> 
desc.className().equals(TestPlugins.TestPlugin.CLASSPATH_CONVERTER.className()))
+            .findFirst();
+        assertTrue(pluginDesc.isPresent());
+        assertInstanceOf(PluginClassLoader.class, pluginDesc.get().loader());
+    }
+
     private PluginScanResult scan(PluginScanner scanner, Set<Path> 
pluginLocations) {
         ClassLoaderFactory factory = new ClassLoaderFactory();
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, 
PluginScannerTest.class.getClassLoader(), factory);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index c9d8892da91..adb2c2418d5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -85,7 +85,8 @@ public class TestPlugins {
         SAMPLING_CONVERTER("sampling-converter"),
         SAMPLING_HEADER_CONVERTER("sampling-header-converter"),
         SERVICE_LOADER("service-loader"),
-        SUBCLASS_OF_CLASSPATH("subclass-of-classpath");
+        SUBCLASS_OF_CLASSPATH("subclass-of-classpath"),
+        CLASSPATH_CONVERTER("classpath-converter");
 
         private final String resourceDir;
         private final Predicate<String> removeRuntimeClasses;
@@ -251,7 +252,11 @@ public class TestPlugins {
         /**
          * A ServiceLoader discovered plugin which subclasses another plugin 
which is present on the classpath
          */
-        
SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, 
"test.plugins.SubclassOfClasspathOverridePolicy");
+        
SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, 
"test.plugins.SubclassOfClasspathOverridePolicy"),
+        /**
+         * A plugin which is part of the classpath by default. This packages 
it as a separate jar which is used to test plugin isolation from the classpath 
plugin.
+         */
+        CLASSPATH_CONVERTER(TestPackage.CLASSPATH_CONVERTER, 
"org.apache.kafka.connect.converters.ByteArrayConverter", false);
 
         private final TestPackage testPackage;
         private final String className;
diff --git 
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..ae9c2a58203
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,17 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.converters.ByteArrayConverter
+
diff --git 
a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
new file mode 100644
index 00000000000..699d71635a0
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.converters;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+public class ByteArrayConverter implements Converter, HeaderConverter, 
Versioned {
+
+    private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        if (schema != null && schema.type() != Schema.Type.BYTES)
+            throw new DataException("Invalid schema type for 
ByteArrayConverter: " + schema.type().toString());
+
+        if (value != null && !(value instanceof byte[]) && !(value instanceof 
ByteBuffer))
+            throw new DataException("ByteArrayConverter is not compatible with 
objects of type " + value.getClass());
+
+        return value instanceof ByteBuffer ? 
getBytesFromByteBuffer((ByteBuffer) value) : (byte[]) value;
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
+    }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+        return fromConnectData(topic, schema, value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
+        return toConnectData(topic, value);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) {
+        if (byteBuffer == null) {
+            return null;
+        }
+
+        byteBuffer.rewind();
+        byte[] bytes = new byte[byteBuffer.remaining()];
+        byteBuffer.get(bytes);
+        return bytes;
+    }
+}

Reply via email to