This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new c2792047f24 KAFKA-18211: Override class loaders for class graph scanning in connect. (#18403) c2792047f24 is described below commit c2792047f24ae281a36d22a3cc5a3ce38df9de08 Author: snehashisp <snehashisp1...@gmail.com> AuthorDate: Tue Jan 14 23:15:53 2025 +0530 KAFKA-18211: Override class loaders for class graph scanning in connect. (#18403) Reviewers: Greg Harris <greg.har...@aiven.io> --- .../runtime/isolation/ReflectionScanner.java | 19 ++++- .../runtime/isolation/PluginScannerTest.java | 16 ++++ .../connect/runtime/isolation/TestPlugins.java | 9 +- .../org.apache.kafka.connect.storage.Converter | 17 ++++ .../connect/converters/ByteArrayConverter.java | 96 ++++++++++++++++++++++ 5 files changed, 154 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index 5b91c150c86..85f514d5f1e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -29,7 +29,9 @@ import org.apache.kafka.connect.transforms.predicates.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.ServiceLoader; import java.util.SortedSet; import java.util.TreeSet; @@ -77,7 +79,7 @@ public class ReflectionScanner extends PluginScanner { @Override protected PluginScanResult scanPlugins(PluginSource source) { ClassGraph classGraphBuilder = new ClassGraph() - .addClassLoader(source.loader()) + .overrideClassLoaders(classLoaderOrder(source)) .enableExternalClasses() .enableClassInfo(); try (ScanResult classGraph = classGraphBuilder.scan()) { @@ -105,6 +107,21 @@ public class ReflectionScanner extends PluginScanner { return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(classGraph, PluginType.TRANSFORMATION, source); } + private ClassLoader[] classLoaderOrder(PluginSource source) { + // Classgraph will first scan all the class URLs from the provided classloader chain and use said chain during classloading. + // We compute and provide the classloader chain starting from the isolated PluginClassLoader to ensure that it adheres + // to the child first delegation model used in connect. In addition, classgraph can fail to find URLs from the + // application classloader as it uses an illegal reflections access. Providing the entire chain of classloaders + // which included the application classloader forces classpath URLs to be scanned separately. + List<ClassLoader> classLoaderOrder = new ArrayList<>(); + ClassLoader cl = source.loader(); + while (cl != null) { + classLoaderOrder.add(cl); + cl = cl.getParent(); + } + return classLoaderOrder.toArray(new ClassLoader[0]); + } + @SuppressWarnings({"unchecked"}) private <T> SortedSet<PluginDesc<T>> getPluginDesc( ScanResult classGraph, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index d083f980349..ca099976444 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.connect.storage.Converter; + import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -26,11 +28,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -137,6 +141,18 @@ public class PluginScannerTest { versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version())); } + @ParameterizedTest + @MethodSource("parameters") + public void testClasspathPluginIsAlsoLoadedInIsolation(PluginScanner scanner) { + Set<Path> isolatedClassPathPlugin = TestPlugins.pluginPath(TestPlugins.TestPlugin.CLASSPATH_CONVERTER); + PluginScanResult result = scan(scanner, isolatedClassPathPlugin); + Optional<PluginDesc<Converter>> pluginDesc = result.converters().stream() + .filter(desc -> desc.className().equals(TestPlugins.TestPlugin.CLASSPATH_CONVERTER.className())) + .findFirst(); + assertTrue(pluginDesc.isPresent()); + assertInstanceOf(PluginClassLoader.class, pluginDesc.get().loader()); + } + private PluginScanResult scan(PluginScanner scanner, Set<Path> pluginLocations) { ClassLoaderFactory factory = new ClassLoaderFactory(); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index c9d8892da91..adb2c2418d5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -85,7 +85,8 @@ public class TestPlugins { SAMPLING_CONVERTER("sampling-converter"), SAMPLING_HEADER_CONVERTER("sampling-header-converter"), SERVICE_LOADER("service-loader"), - SUBCLASS_OF_CLASSPATH("subclass-of-classpath"); + SUBCLASS_OF_CLASSPATH("subclass-of-classpath"), + CLASSPATH_CONVERTER("classpath-converter"); private final String resourceDir; private final Predicate<String> removeRuntimeClasses; @@ -251,7 +252,11 @@ public class TestPlugins { /** * A ServiceLoader discovered plugin which subclasses another plugin which is present on the classpath */ - SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, "test.plugins.SubclassOfClasspathOverridePolicy"); + SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY(TestPackage.SUBCLASS_OF_CLASSPATH, "test.plugins.SubclassOfClasspathOverridePolicy"), + /** + * A plugin which is part of the classpath by default. This packages it as a separate jar which is used to test plugin isolation from the classpath plugin. + */ + CLASSPATH_CONVERTER(TestPackage.CLASSPATH_CONVERTER, "org.apache.kafka.connect.converters.ByteArrayConverter", false); private final TestPackage testPackage; private final String className; diff --git a/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..ae9c2a58203 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/classpath-converter/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter + diff --git a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java new file mode 100644 index 00000000000..699d71635a0 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.converters; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.HeaderConverter; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +public class ByteArrayConverter implements Converter, HeaderConverter, Versioned { + + private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef(); + @Override + public String version() { + return AppInfoParser.getVersion(); + } + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + if (schema != null && schema.type() != Schema.Type.BYTES) + throw new DataException("Invalid schema type for ByteArrayConverter: " + schema.type().toString()); + + if (value != null && !(value instanceof byte[]) && !(value instanceof ByteBuffer)) + throw new DataException("ByteArrayConverter is not compatible with objects of type " + value.getClass()); + + return value instanceof ByteBuffer ? getBytesFromByteBuffer((ByteBuffer) value) : (byte[]) value; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value); + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return fromConnectData(topic, schema, value); + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return toConnectData(topic, value); + } + + @Override + public void close() { + // do nothing + } + + private byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + + byteBuffer.rewind(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } +}