This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b9a45546a79 KAFKA-15244: Remove PluginType.from(Class) (#14089) b9a45546a79 is described below commit b9a45546a7918799b6fb3c0fe63b56f47d8fcba9 Author: Greg Harris <greg.har...@aiven.io> AuthorDate: Tue Aug 1 10:05:46 2023 -0700 KAFKA-15244: Remove PluginType.from(Class) (#14089) Reviewers: Chris Egerton <chr...@aiven.io> --- .../kafka/connect/runtime/AbstractHerder.java | 41 +++---- .../runtime/isolation/PluginClassLoader.java | 14 +-- .../connect/runtime/isolation/PluginDesc.java | 26 +++-- .../runtime/isolation/PluginScanResult.java | 2 +- .../connect/runtime/isolation/PluginScanner.java | 25 ++-- .../connect/runtime/isolation/PluginType.java | 16 +-- .../connect/runtime/isolation/PluginUtils.java | 11 +- .../kafka/connect/runtime/isolation/Plugins.java | 2 +- .../runtime/isolation/ReflectionScanner.java | 33 +++--- .../runtime/isolation/ServiceLoaderScanner.java | 18 +-- .../kafka/connect/runtime/AbstractHerderTest.java | 5 +- .../isolation/DelegatingClassLoaderTest.java | 3 +- .../connect/runtime/isolation/PluginDescTest.java | 128 +++++++++++++++++---- .../runtime/isolation/PluginScannerTest.java | 14 +-- .../connect/runtime/isolation/PluginUtilsTest.java | 18 +-- .../connect/runtime/isolation/TestPlugins.java | 7 +- .../resources/ConnectorPluginsResourceTest.java | 93 +++++---------- 17 files changed, 246 insertions(+), 210 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index b1a19fd46ae..7f04914c8d1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -33,7 +33,6 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; -import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; @@ -842,34 +841,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) { Object plugin = p.newPlugin(pluginName); - PluginType pluginType = PluginType.from(plugin.getClass()); // Contains definitions coming from Connect framework ConfigDef baseConfigDefs = null; // Contains definitions specifically declared on the plugin ConfigDef pluginConfigDefs; - switch (pluginType) { - case SINK: - baseConfigDefs = SinkConnectorConfig.configDef(); - pluginConfigDefs = ((SinkConnector) plugin).config(); - break; - case SOURCE: - baseConfigDefs = SourceConnectorConfig.configDef(); - pluginConfigDefs = ((SourceConnector) plugin).config(); - break; - case CONVERTER: - pluginConfigDefs = ((Converter) plugin).config(); - break; - case HEADER_CONVERTER: - pluginConfigDefs = ((HeaderConverter) plugin).config(); - break; - case TRANSFORMATION: - pluginConfigDefs = ((Transformation<?>) plugin).config(); - break; - case PREDICATE: - pluginConfigDefs = ((Predicate<?>) plugin).config(); - break; - default: - throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); + if (plugin instanceof SinkConnector) { + baseConfigDefs = SinkConnectorConfig.configDef(); + pluginConfigDefs = ((SinkConnector) plugin).config(); + } else if (plugin instanceof SourceConnector) { + baseConfigDefs = SourceConnectorConfig.configDef(); + pluginConfigDefs = ((SourceConnector) plugin).config(); + } else if (plugin instanceof Converter) { + pluginConfigDefs = ((Converter) plugin).config(); + } else if (plugin instanceof HeaderConverter) { + pluginConfigDefs = ((HeaderConverter) plugin).config(); + } else if (plugin instanceof Transformation) { + pluginConfigDefs = ((Transformation<?>) plugin).config(); + } else if (plugin instanceof Predicate) { + pluginConfigDefs = ((Predicate<?>) plugin).config(); + } else { + throw new BadRequestException("Invalid plugin class " + pluginName + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } // Track config properties by name and, if the same property is defined in multiple places, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java index da05966e13a..825d1e6d114 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java @@ -55,19 +55,7 @@ public class PluginClassLoader extends URLClassLoader { */ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { super(urls, parent); - this.pluginLocation = pluginLocation; - } - - /** - * Constructor that defines the system classloader as parent of this plugin classloader. - * - * @param pluginLocation the top-level location of the plugin to be loaded in isolation by this - * classloader. - * @param urls the list of urls from which to load classes and resources for this plugin. - */ - public PluginClassLoader(URL pluginLocation, URL[] urls) { - super(urls); - this.pluginLocation = pluginLocation; + this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null"); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index e001f104ee3..395dc92d7b5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -21,7 +21,7 @@ import org.apache.maven.artifact.versioning.DefaultArtifactVersion; import java.util.Objects; -public class PluginDesc<T> implements Comparable<PluginDesc<T>> { +public class PluginDesc<T> implements Comparable<PluginDesc<?>> { public static final String UNDEFINED_VERSION = "undefined"; private final Class<? extends T> klass; private final String name; @@ -32,15 +32,16 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { private final String location; private final ClassLoader loader; - public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) { - this.klass = klass; - this.name = klass.getName(); + public PluginDesc(Class<? extends T> klass, String version, PluginType type, ClassLoader loader) { + this.klass = Objects.requireNonNull(klass, "Plugin class must be non-null"); + this.name = this.klass.getName(); this.version = version != null ? version : "null"; this.encodedVersion = new DefaultArtifactVersion(this.version); - this.type = PluginType.from(klass); - this.typeName = type.toString(); + this.type = Objects.requireNonNull(type, "Plugin type must be non-null"); + this.typeName = this.type.toString(); + Objects.requireNonNull(loader, "Plugin classloader must be non-null"); this.location = loader instanceof PluginClassLoader - ? ((PluginClassLoader) loader).location() + ? Objects.requireNonNull(((PluginClassLoader) loader).location(), "Plugin location must be non-null") : "classpath"; this.loader = loader; } @@ -110,11 +111,18 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { } @Override - public int compareTo(PluginDesc<T> other) { + public int compareTo(PluginDesc<?> other) { int nameComp = name.compareTo(other.name); int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); - return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); + // choose an arbitrary order between different locations and types + int loaderComp = location.compareTo(other.location); + int typeComp = type.compareTo(other.type); + return nameComp != 0 ? nameComp : + versionComp != 0 ? versionComp : + isolatedComp != 0 ? isolatedComp : + loaderComp != 0 ? loaderComp : + typeComp; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index ae015ed3507..b452beb9b63 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -88,7 +88,7 @@ public class PluginScanResult { ); } - private static <R extends Comparable<R>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) { + private static <R extends Comparable<?>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) { SortedSet<R> merged = new TreeSet<>(); for (PluginScanResult element : results) { merged.addAll(accessor.apply(element)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java index 474a92f5cf5..acb5b668cf3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java @@ -120,32 +120,32 @@ public abstract class PluginScanner { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) { - return new PluginDesc(plugin, version, source.loader()); + protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginType type, PluginSource source) { + return new PluginDesc(plugin, version, type, source.loader()); } @SuppressWarnings("unchecked") - protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) { + protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(PluginType type, PluginSource source) { SortedSet<PluginDesc<T>> result = new TreeSet<>(); - ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader()); + ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) type.superClass(), source.loader()); Iterator<T> iterator = serviceLoader.iterator(); - while (handleLinkageError(klass, source, iterator::hasNext)) { + while (handleLinkageError(type, source, iterator::hasNext)) { try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { - pluginImpl = handleLinkageError(klass, source, iterator::next); + pluginImpl = handleLinkageError(type, source, iterator::next); } catch (ServiceConfigurationError t) { log.error("Failed to discover {} in {}{}", - klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass(); if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); + type.simpleName(), pluginKlass.getClassLoader(), source.location()); continue; } - result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); + result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source)); } } return result; @@ -154,14 +154,13 @@ public abstract class PluginScanner { /** * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. * - * @param klass The plugin superclass which is being loaded + * @param type The plugin type which is being loaded * @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError} * @return the return value of function * @throws Error errors thrown by the passed-in function - * @param <T> Type being iterated over by the ServiceLoader * @param <U> Return value of the passed-in function */ - private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) { + private <U> U handleLinkageError(PluginType type, PluginSource source, Supplier<U> function) { // It's difficult to know for sure if the iterator was able to advance past the first broken // plugin class, or if it will continue to fail on that broken class for any subsequent calls // to Iterator::hasNext or Iterator::next @@ -182,7 +181,7 @@ public abstract class PluginScanner { || !Objects.equals(lastError.getClass(), t.getClass()) || !Objects.equals(lastError.getMessage(), t.getMessage())) { log.error("Failed to discover {} in {}{}", - klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); } lastError = t; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java index 696e14ba8cd..0f26e84a0bb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -37,8 +37,7 @@ public enum PluginType { PREDICATE(Predicate.class), CONFIGPROVIDER(ConfigProvider.class), REST_EXTENSION(ConnectRestExtension.class), - CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class), - UNKNOWN(Object.class); + CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class); private final Class<?> klass; @@ -46,19 +45,14 @@ public enum PluginType { this.klass = klass; } - public static PluginType from(Class<?> klass) { - for (PluginType type : PluginType.values()) { - if (type.klass.isAssignableFrom(klass)) { - return type; - } - } - return UNKNOWN; - } - public String simpleName() { return klass.getSimpleName(); } + public Class<?> superClass() { + return klass; + } + @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 88b1fc0484a..a673d6e54fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -196,12 +197,12 @@ public class PluginUtils { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } - public static List<Path> pluginLocations(String pluginPath) { + public static Set<Path> pluginLocations(String pluginPath) { if (pluginPath == null) { - return Collections.emptyList(); + return Collections.emptySet(); } String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); - List<Path> pluginLocations = new ArrayList<>(); + Set<Path> pluginLocations = new LinkedHashSet<>(); for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); @@ -328,8 +329,8 @@ public class PluginUtils { return Arrays.asList(archives.toArray(new Path[0])); } - public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { - Set<PluginSource> pluginSources = new HashSet<>(); + public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { + Set<PluginSource> pluginSources = new LinkedHashSet<>(); for (Path pluginLocation : pluginLocations) { try { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 83dec38a6fb..36e20270abc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -63,7 +63,7 @@ public class Plugins { // VisibleForTesting Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); - List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); + Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = factory.newDelegatingClassLoader(parent); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); scanResult = initLoaders(pluginSources); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index e38fefc78c9..ad5c00c42ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner { private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class); - public static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException { + private static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException { T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance(); return versionFor(pluginImpl); } @@ -84,39 +84,40 @@ public class ReflectionScanner extends PluginScanner { Reflections reflections = new Reflections(builder); return new PluginScanResult( - getPluginDesc(reflections, SinkConnector.class, source), - getPluginDesc(reflections, SourceConnector.class, source), - getPluginDesc(reflections, Converter.class, source), - getPluginDesc(reflections, HeaderConverter.class, source), + getPluginDesc(reflections, PluginType.SINK, source), + getPluginDesc(reflections, PluginType.SOURCE, source), + getPluginDesc(reflections, PluginType.CONVERTER, source), + getPluginDesc(reflections, PluginType.HEADER_CONVERTER, source), getTransformationPluginDesc(source, reflections), getPredicatePluginDesc(source, reflections), - getServiceLoaderPluginDesc(ConfigProvider.class, source), - getServiceLoaderPluginDesc(ConnectRestExtension.class, source), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) + getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source), + getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source), + getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source) ); } @SuppressWarnings({"unchecked"}) private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) { - return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, source); + return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.PREDICATE, source); } @SuppressWarnings({"unchecked"}) private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) { - return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, source); + return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.TRANSFORMATION, source); } + @SuppressWarnings({"unchecked"}) private <T> SortedSet<PluginDesc<T>> getPluginDesc( Reflections reflections, - Class<T> klass, + PluginType type, PluginSource source ) { Set<Class<? extends T>> plugins; try { - plugins = reflections.getSubTypesOf(klass); + plugins = reflections.getSubTypesOf((Class<T>) type.superClass()); } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any {} in {} for URLs: {}", - klass, source.location(), source.urls(), e); + type, source.location(), source.urls(), e); return Collections.emptySortedSet(); } @@ -128,14 +129,14 @@ public class ReflectionScanner extends PluginScanner { } if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); + pluginKlass, pluginKlass.getClassLoader(), source.location()); continue; } try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { - result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source)); + result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source)); } catch (ReflectiveOperationException | LinkageError e) { log.error("Failed to discover {} in {}: Unable to instantiate {}{}", - klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(), + type.simpleName(), source.location(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java index 727a737ff3f..9f36a5ad8e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java @@ -56,25 +56,25 @@ public class ServiceLoaderScanner extends PluginScanner { @Override protected PluginScanResult scanPlugins(PluginSource source) { return new PluginScanResult( - getServiceLoaderPluginDesc(SinkConnector.class, source), - getServiceLoaderPluginDesc(SourceConnector.class, source), - getServiceLoaderPluginDesc(Converter.class, source), - getServiceLoaderPluginDesc(HeaderConverter.class, source), + getServiceLoaderPluginDesc(PluginType.SINK, source), + getServiceLoaderPluginDesc(PluginType.SOURCE, source), + getServiceLoaderPluginDesc(PluginType.CONVERTER, source), + getServiceLoaderPluginDesc(PluginType.HEADER_CONVERTER, source), getTransformationPluginDesc(source), getPredicatePluginDesc(source), - getServiceLoaderPluginDesc(ConfigProvider.class, source), - getServiceLoaderPluginDesc(ConnectRestExtension.class, source), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) + getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source), + getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source), + getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source) ); } @SuppressWarnings({"unchecked"}) private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source) { - return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Predicate.class, source); + return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(PluginType.PREDICATE, source); } @SuppressWarnings({"unchecked"}) private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source) { - return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Transformation.class, source); + return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(PluginType.TRANSFORMATION, source); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index f1993a7e7f1..68ee2ce1561 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -667,12 +668,12 @@ public class AbstractHerderTest { @SuppressWarnings({"rawtypes", "unchecked"}) private PluginDesc<Predicate<?>> predicatePluginDesc() { - return new PluginDesc(SamplePredicate.class, "1.0", classLoader); + return new PluginDesc(SamplePredicate.class, "1.0", PluginType.PREDICATE, classLoader); } @SuppressWarnings({"rawtypes", "unchecked"}) private PluginDesc<Transformation<?>> transformationPluginDesc() { - return new PluginDesc(SampleTransformation.class, "1.0", classLoader); + return new PluginDesc(SampleTransformation.class, "1.0", PluginType.TRANSFORMATION, classLoader); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java index 4f683e3b355..a3f2ab142dc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java @@ -61,7 +61,8 @@ public class DelegatingClassLoaderTest { SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); // Lie to the DCL that this arbitrary class is a connector, since all real connector classes we have access to // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation. - pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) ARBITRARY_CLASS, null, pluginLoader); + when(pluginLoader.location()).thenReturn("some-location"); + pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader); assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className())); sinkConnectors.add(pluginDesc); scanResult = new PluginScanResult( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index f18537d0c32..171fbde6997 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -17,10 +17,13 @@ package org.apache.kafka.connect.runtime.isolation; -import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.config.provider.FileConfigProvider; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Before; @@ -31,7 +34,10 @@ import java.nio.file.Paths; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PluginDescTest { private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader(); @@ -40,41 +46,47 @@ public class PluginDescTest { private final String snapshotVersion = "1.0.0-SNAPSHOT"; private final String noVersion = "undefined"; private PluginClassLoader pluginLoader; + private PluginClassLoader otherPluginLoader; @Before public void setUp() throws Exception { // Fairly simple use case, thus no need to create a random directory here yet. URL location = Paths.get("/tmp").toUri().toURL(); + URL otherLocation = Paths.get("/tmp-other").toUri().toURL(); // Normally parent will be a DelegatingClassLoader. pluginLoader = new PluginClassLoader(location, new URL[0], systemLoader); + otherPluginLoader = new PluginClassLoader(otherLocation, new URL[0], systemLoader); } @SuppressWarnings("rawtypes") @Test public void testRegularPluginDesc() { - PluginDesc<Connector> connectorDesc = new PluginDesc<>( - Connector.class, + PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>( + SinkConnector.class, regularVersion, + PluginType.SINK, pluginLoader ); - assertPluginDesc(connectorDesc, Connector.class, regularVersion, pluginLoader.location()); + assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, pluginLoader.location()); PluginDesc<Converter> converterDesc = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, pluginLoader ); - assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location()); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, pluginLoader.location()); PluginDesc<Transformation> transformDesc = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, pluginLoader ); - assertPluginDesc(transformDesc, Transformation.class, noVersion, pluginLoader.location()); + assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, pluginLoader.location()); } @SuppressWarnings("rawtypes") @@ -84,26 +96,29 @@ public class PluginDescTest { PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>( SinkConnector.class, regularVersion, + PluginType.SINK, systemLoader ); - assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, location); + assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, location); PluginDesc<Converter> converterDesc = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, systemLoader ); - assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, location); PluginDesc<Transformation> transformDesc = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, systemLoader ); - assertPluginDesc(transformDesc, Transformation.class, noVersion, location); + assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, location); } @Test @@ -112,6 +127,7 @@ public class PluginDescTest { PluginDesc<SourceConnector> connectorDesc = new PluginDesc<>( SourceConnector.class, null, + PluginType.SOURCE, pluginLoader ); @@ -119,6 +135,7 @@ public class PluginDescTest { connectorDesc, SourceConnector.class, nullVersion, + PluginType.SOURCE, pluginLoader.location() ); @@ -126,24 +143,27 @@ public class PluginDescTest { PluginDesc<Converter> converterDesc = new PluginDesc<>( Converter.class, null, + PluginType.CONVERTER, systemLoader ); - assertPluginDesc(converterDesc, Converter.class, nullVersion, location); + assertPluginDesc(converterDesc, Converter.class, nullVersion, PluginType.CONVERTER, location); } @SuppressWarnings("rawtypes") @Test public void testPluginDescEquality() { - PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>( - Connector.class, + PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>( + SinkConnector.class, snapshotVersion, + PluginType.SINK, pluginLoader ); - PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>( - Connector.class, + PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>( + SinkConnector.class, snapshotVersion, + PluginType.SINK, systemLoader ); @@ -153,12 +173,14 @@ public class PluginDescTest { PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, pluginLoader ); PluginDesc<Converter> converterDescClasspath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, systemLoader ); @@ -168,30 +190,34 @@ public class PluginDescTest { PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>( Transformation.class, null, + PluginType.TRANSFORMATION, pluginLoader ); PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, pluginLoader ); assertNotEquals(transformDescPluginPath, transformDescClasspath); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) @Test public void testPluginDescComparison() { - PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>( - Connector.class, + PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>( + SinkConnector.class, regularVersion, + PluginType.SINK, pluginLoader ); - PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>( - Connector.class, + PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>( + SinkConnector.class, newerVersion, + PluginType.SINK, systemLoader ); @@ -200,12 +226,14 @@ public class PluginDescTest { PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, pluginLoader ); PluginDesc<Converter> converterDescClasspath = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, systemLoader ); @@ -214,12 +242,14 @@ public class PluginDescTest { PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>( Transformation.class, null, + PluginType.TRANSFORMATION, pluginLoader ); PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>( Transformation.class, regularVersion, + PluginType.TRANSFORMATION, systemLoader ); @@ -228,33 +258,87 @@ public class PluginDescTest { PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, pluginLoader ); PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, systemLoader ); assertNewer(predicateDescPluginPath, predicateDescClasspath); + + PluginDesc<ConfigProvider> configProviderDescPluginPath = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + pluginLoader + ); + + PluginDesc<ConfigProvider> configProviderDescOtherPluginLoader = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + otherPluginLoader + ); + + assertTrue("Different plugin loaders should have an ordering", + configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 0); + + + PluginDesc<Converter> jsonConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.CONVERTER, + systemLoader + ); + + PluginDesc<HeaderConverter> jsonHeaderConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.HEADER_CONVERTER, + systemLoader + ); + + assertNewer(jsonConverterPlugin, jsonHeaderConverterPlugin); + } + + @Test + public void testNullArguments() { + // Null version is acceptable + PluginDesc<SinkConnector> sink = new PluginDesc<>(SinkConnector.class, null, PluginType.SINK, systemLoader); + assertEquals("null", sink.version()); + + // Direct nulls are not acceptable for other arguments + assertThrows(NullPointerException.class, () -> new PluginDesc<>(null, regularVersion, PluginType.SINK, systemLoader)); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, null, systemLoader)); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, null)); + + // PluginClassLoaders must have non-null locations + PluginClassLoader nullLocationLoader = mock(PluginClassLoader.class); + when(nullLocationLoader.location()).thenReturn(null); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, nullLocationLoader)); } private static <T> void assertPluginDesc( PluginDesc<T> desc, Class<? extends T> klass, String version, + PluginType type, String location ) { assertEquals(desc.pluginClass(), klass); assertEquals(desc.className(), klass.getName()); assertEquals(desc.version(), version); - assertEquals(desc.type(), PluginType.from(klass)); - assertEquals(desc.typeName(), PluginType.from(klass).toString()); + assertEquals(desc.type(), type); + assertEquals(desc.typeName(), type.toString()); assertEquals(desc.location(), location); } - private static <T> void assertNewer(PluginDesc<T> older, PluginDesc<T> newer) { + private static void assertNewer(PluginDesc<?> older, PluginDesc<?> newer) { assertTrue(newer + " should be newer than " + older, older.compareTo(newer) < 0); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index b9b80fa5110..d6a85b294cf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -70,7 +70,7 @@ public class PluginScannerTest { @Test public void testScanningEmptyPluginPath() { PluginScanResult result = scan( - Collections.emptyList() + Collections.emptySet() ); assertTrue(result.isEmpty()); } @@ -91,7 +91,7 @@ public class PluginScannerTest { pluginDir.newFile("invalid.jar"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -102,7 +102,7 @@ public class PluginScannerTest { pluginDir.newFile("my-plugin/invalid.jar"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -110,7 +110,7 @@ public class PluginScannerTest { @Test public void testScanningNoPlugins() { PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -120,7 +120,7 @@ public class PluginScannerTest { pluginDir.newFolder("my-plugin"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -137,7 +137,7 @@ public class PluginScannerTest { } PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); Set<String> classes = new HashSet<>(); result.forEach(pluginDesc -> classes.add(pluginDesc.className())); @@ -145,7 +145,7 @@ public class PluginScannerTest { assertEquals(expectedClasses, classes); } - private PluginScanResult scan(List<Path> pluginLocations) { + private PluginScanResult scan(Set<Path> pluginLocations) { ClassLoaderFactory factory = new ClassLoaderFactory(); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory); return scanner.discoverPlugins(pluginSources); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index 0f830c8a8aa..4dd168a3faf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -508,11 +508,11 @@ public class PluginUtilsTest { @Test public void testNonCollidingAliases() { SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader())); SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); - sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, PluginType.SOURCE, MockSourceConnector.class.getClassLoader())); SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); PluginScanResult result = new PluginScanResult( sinkConnectors, sourceConnectors, @@ -540,8 +540,8 @@ public class PluginUtilsTest { public void testMultiVersionAlias() { SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); // distinct versions don't cause an alias collision (the class name is the same) - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", PluginType.SINK, MockSinkConnector.class.getClassLoader())); assertEquals(2, sinkConnectors.size()); PluginScanResult result = new PluginScanResult( sinkConnectors, @@ -564,9 +564,9 @@ public class PluginUtilsTest { @Test public void testCollidingPrunedAlias() { SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>(); - headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, PluginType.HEADER_CONVERTER, CollidingHeaderConverter.class.getClassLoader())); PluginScanResult result = new PluginScanResult( Collections.emptySortedSet(), Collections.emptySortedSet(), @@ -589,9 +589,9 @@ public class PluginUtilsTest { @Test public void testCollidingSimpleAlias() { SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); SortedSet<PluginDesc<Transformation<?>>> transformations = new TreeSet<>(); - transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, Colliding.class.getClassLoader())); + transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, PluginType.TRANSFORMATION, Colliding.class.getClassLoader())); PluginScanResult result = new PluginScanResult( Collections.emptySortedSet(), Collections.emptySortedSet(), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index bd87dd41516..d93fef181aa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import java.util.jar.Attributes; import java.util.jar.JarEntry; @@ -248,7 +249,7 @@ public class TestPlugins { * @return A list of plugin jar filenames * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List<Path> pluginPath() { + public static Set<Path> pluginPath() { return pluginPath(defaultPlugins()); } @@ -262,14 +263,14 @@ public class TestPlugins { * @return A list of plugin jar filenames containing the specified test plugins * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List<Path> pluginPath(TestPlugin... plugins) { + public static Set<Path> pluginPath(TestPlugin... plugins) { assertAvailable(); return Arrays.stream(plugins) .filter(Objects::nonNull) .map(TestPlugin::resourceDir) .distinct() .map(PLUGIN_JARS::get) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } public static String pluginPathJoined(TestPlugin... plugins) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 57149ca498d..9e5dc55743a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Recommender; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.converters.LongConverter; @@ -33,11 +34,9 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.SampleSinkConnector; import org.apache.kafka.connect.runtime.SampleSourceConnector; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; -import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; @@ -61,7 +60,6 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import javax.ws.rs.BadRequestException; -import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,35 +107,38 @@ public class ConnectorPluginsResourceTest { private static final ConfigInfos PARTIAL_CONFIG_INFOS; private static final int ERROR_COUNT = 0; private static final int PARTIAL_CONFIG_ERROR_COUNT = 1; - private static final Set<MockConnectorPluginDesc<?>> SINK_CONNECTOR_PLUGINS = new TreeSet<>(); - private static final Set<MockConnectorPluginDesc<?>> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>(); - private static final Set<MockConnectorPluginDesc<?>> CONVERTER_PLUGINS = new TreeSet<>(); - private static final Set<MockConnectorPluginDesc<?>> HEADER_CONVERTER_PLUGINS = new TreeSet<>(); - private static final Set<MockConnectorPluginDesc<?>> TRANSFORMATION_PLUGINS = new TreeSet<>(); - private static final Set<MockConnectorPluginDesc<?>> PREDICATE_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> SINK_CONNECTOR_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> CONVERTER_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> HEADER_CONVERTER_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> TRANSFORMATION_PLUGINS = new TreeSet<>(); + private static final Set<PluginDesc<?>> PREDICATE_PLUGINS = new TreeSet<>(); static { try { - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSinkConnector.class)); - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSinkConnector.class)); + ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); + String appVersion = AppInfoParser.getVersion(); + SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader)); + SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, classLoader)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(SchemaSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE, classLoader)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class)); + CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader)); + CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class)); + HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader)); + HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(RegexRouter.class)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(TimestampConverter.Key.class)); + TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader)); + TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(HasHeaderKey.class)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(RecordIsTombstone.class)); + PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader)); + PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader)); } catch (Exception e) { + e.printStackTrace(); fail("Failed setting up plugins"); } } @@ -344,7 +345,7 @@ public class ConnectorPluginsResourceTest { Set<PluginInfo> expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS) .flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) - .map(ConnectorPluginsResourceTest::newInfo) + .map(PluginInfo::new) .collect(Collectors.toSet()); Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); @@ -353,8 +354,9 @@ public class ConnectorPluginsResourceTest { @Test public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws Exception { - PluginInfo sinkInfo = newInfo(SampleSinkConnector.class); - PluginInfo sourceInfo = newInfo(SampleSourceConnector.class); + ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); + PluginInfo sinkInfo = new PluginInfo(new PluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK, classLoader)); + PluginInfo sourceInfo = new PluginInfo(new PluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE, classLoader)); assertEquals(PluginType.SINK.toString(), sinkInfo.type()); assertEquals(PluginType.SOURCE.toString(), sourceInfo.type()); assertEquals(SampleSinkConnector.VERSION, sinkInfo.version()); @@ -399,7 +401,7 @@ public class ConnectorPluginsResourceTest { PREDICATE_PLUGINS ).flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) - .map(ConnectorPluginsResourceTest::newInfo) + .map(PluginInfo::new) .collect(Collectors.toSet()); Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); @@ -424,41 +426,6 @@ public class ConnectorPluginsResourceTest { } } - protected static PluginInfo newInfo(PluginDesc<?> pluginDesc) { - return new PluginInfo(new MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version())); - } - - protected static PluginInfo newInfo(Class<?> klass) - throws Exception { - return new PluginInfo(new MockConnectorPluginDesc<>(klass)); - } - - public static class MockPluginClassLoader extends PluginClassLoader { - - public MockPluginClassLoader(URL pluginLocation, URL[] urls) { - super(pluginLocation, urls); - } - - @Override - public String location() { - return "/tmp/mockpath"; - } - } - - public static class MockConnectorPluginDesc<T> extends PluginDesc<T> { - public MockConnectorPluginDesc(Class<T> klass, String version) { - super(klass, version, new MockPluginClassLoader(null, new URL[0])); - } - - public MockConnectorPluginDesc(Class<T> klass) throws Exception { - super( - klass, - ReflectionScanner.versionFor(klass), - new MockPluginClassLoader(null, new URL[0]) - ); - } - } - /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends SourceConnector { @@ -476,7 +443,7 @@ public class ConnectorPluginsResourceTest { @Override public String version() { - return "1.0"; + return AppInfoParser.getVersion(); } @Override