This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 04cf8fb  KAFKA-8550: Fix plugin loading of aliased converters in 
Connect (#6959)
04cf8fb is described below

commit 04cf8fb2c116812abaa1b348d5e30536b4a1061c
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Sun Aug 11 07:56:05 2019 -0700

    KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
    
    Connector validation fails if an alias is used for the converter since the 
validation for that is done via `ConfigDef.validateAll(...)`, which in turn 
invokes `Class.forName(...)` on the alias. Even though the class is 
successfully loaded by the DelegatingClassLoader, some Java implementations 
will refuse to return a class from `Class.forName(...)` whose name differs from 
the argument provided.
    
    This commit alters `ConfigDef.parseType(...)` to first invoke 
`ClassLoader.loadClass(...)` on the class using our class loader in order to 
get a handle on the actual class object to be loaded, then invoke 
`Class.forName(...)` with the fully-qualified class name of the to-be-loaded 
class and return the result. The invocation of `Class.forName(...)` is 
necessary in order to allow static initialization to take place; simply calling 
`ClassLoader.loadClass(...)` is insufficient.
    
    Also corrected a unit test that relied upon the old behavior.
    
    Author: Chris Egerton <chr...@confluent.io>
    Reviewers: Robert Yokota <rayok...@gmail.com>, Randall Hauch 
<rha...@gmail.com>
---
 .../org/apache/kafka/common/config/ConfigDef.java  | 13 +++++++++---
 .../kafka/common/config/AbstractConfigTest.java    |  2 +-
 .../apache/kafka/common/config/ConfigDefTest.java  | 23 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 362b1a6..fa99931 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -707,9 +707,16 @@ public class ConfigDef {
                 case CLASS:
                     if (value instanceof Class)
                         return value;
-                    else if (value instanceof String)
-                        return Class.forName(trimmed, true, 
Utils.getContextOrKafkaClassLoader());
-                    else
+                    else if (value instanceof String) {
+                        ClassLoader contextOrKafkaClassLoader = 
Utils.getContextOrKafkaClassLoader();
+                        // Use loadClass here instead of Class.forName because 
the name we use here may be an alias
+                        // and not match the name of the class that gets 
loaded. If that happens, Class.forName can
+                        // throw an exception.
+                        Class<?> klass = 
contextOrKafkaClassLoader.loadClass(trimmed);
+                        // Invoke forName here with the true name of the 
requested class to cause class
+                        // initialization to take place.
+                        return Class.forName(klass.getName(), true, 
contextOrKafkaClassLoader);
+                    } else
                         throw new ConfigException(name, value, "Expected a 
Class instance or class name.");
                 default:
                     throw new IllegalStateException("Unknown type.");
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index f686475..b5fff6f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -265,7 +265,7 @@ public class AbstractConfigTest {
             @Override
             protected Class<?> findClass(String name) throws 
ClassNotFoundException {
                 if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || 
name.equals(ClassTestConfig.RESTRICTED_CLASS.getName()))
-                    return null;
+                    throw new ClassNotFoundException();
                 else
                     return 
ClassTestConfig.class.getClassLoader().loadClass(name);
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 974d39f..701e9f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -639,6 +639,29 @@ public class ConfigDefTest {
         assertEquals(NestedClass.class, Class.forName(actual));
     }
 
+    @Test
+    public void testClassWithAlias() {
+        final String alias = "PluginAlias";
+        ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            // Could try to use the Plugins class from Connect here, but this 
should simulate enough
+            // of the aliasing logic to suffice for this test.
+            Thread.currentThread().setContextClassLoader(new 
ClassLoader(originalClassLoader) {
+                @Override
+                public Class loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+                    if (alias.equals(name)) {
+                        return NestedClass.class;
+                    } else {
+                        return super.loadClass(name, resolve);
+                    }
+                }
+            });
+            ConfigDef.parseType("Test config", alias, Type.CLASS);
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
     private class NestedClass {
     }
 }

Reply via email to