This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 04cf8fb KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959) 04cf8fb is described below commit 04cf8fb2c116812abaa1b348d5e30536b4a1061c Author: Chris Egerton <chr...@confluent.io> AuthorDate: Sun Aug 11 07:56:05 2019 -0700 KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959) Connector validation fails if an alias is used for the converter since the validation for that is done via `ConfigDef.validateAll(...)`, which in turn invokes `Class.forName(...)` on the alias. Even though the class is successfully loaded by the DelegatingClassLoader, some Java implementations will refuse to return a class from `Class.forName(...)` whose name differs from the argument provided. This commit alters `ConfigDef.parseType(...)` to first invoke `ClassLoader.loadClass(...)` on the class using our class loader in order to get a handle on the actual class object to be loaded, then invoke `Class.forName(...)` with the fully-qualified class name of the to-be-loaded class and return the result. The invocation of `Class.forName(...)` is necessary in order to allow static initialization to take place; simply calling `ClassLoader.loadClass(...)` is insufficient. Also corrected a unit test that relied upon the old behavior. Author: Chris Egerton <chr...@confluent.io> Reviewers: Robert Yokota <rayok...@gmail.com>, Randall Hauch <rha...@gmail.com> --- .../org/apache/kafka/common/config/ConfigDef.java | 13 +++++++++--- .../kafka/common/config/AbstractConfigTest.java | 2 +- .../apache/kafka/common/config/ConfigDefTest.java | 23 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 362b1a6..fa99931 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -707,9 +707,16 @@ public class ConfigDef { case CLASS: if (value instanceof Class) return value; - else if (value instanceof String) - return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()); - else + else if (value instanceof String) { + ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader(); + // Use loadClass here instead of Class.forName because the name we use here may be an alias + // and not match the name of the class that gets loaded. If that happens, Class.forName can + // throw an exception. + Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed); + // Invoke forName here with the true name of the requested class to cause class + // initialization to take place. + return Class.forName(klass.getName(), true, contextOrKafkaClassLoader); + } else throw new ConfigException(name, value, "Expected a Class instance or class name."); default: throw new IllegalStateException("Unknown type."); diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index f686475..b5fff6f 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -265,7 +265,7 @@ public class AbstractConfigTest { @Override protected Class<?> findClass(String name) throws ClassNotFoundException { if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName())) - return null; + throw new ClassNotFoundException(); else return ClassTestConfig.class.getClassLoader().loadClass(name); } diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 974d39f..701e9f4 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -639,6 +639,29 @@ public class ConfigDefTest { assertEquals(NestedClass.class, Class.forName(actual)); } + @Test + public void testClassWithAlias() { + final String alias = "PluginAlias"; + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + try { + // Could try to use the Plugins class from Connect here, but this should simulate enough + // of the aliasing logic to suffice for this test. + Thread.currentThread().setContextClassLoader(new ClassLoader(originalClassLoader) { + @Override + public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (alias.equals(name)) { + return NestedClass.class; + } else { + return super.loadClass(name, resolve); + } + } + }); + ConfigDef.parseType("Test config", alias, Type.CLASS); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + private class NestedClass { } }