This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new bd29da9c431 KAFKA-17150: Use Utils.loadClass instead of Class.forName 
to resolve aliases correctly (#16608)
bd29da9c431 is described below

commit bd29da9c43153f7b46e8c71f3cdd26a87ddde9f1
Author: Greg Harris <[email protected]>
AuthorDate: Wed Jul 17 16:00:45 2024 -0700

    KAFKA-17150: Use Utils.loadClass instead of Class.forName to resolve 
aliases correctly (#16608)
    
    Signed-off-by: Greg Harris <[email protected]>
    Reviewers: Chris Egerton <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Josep Prat <[email protected]>
---
 .../clients/consumer/ConsumerPartitionAssignor.java  |  2 +-
 .../org/apache/kafka/common/config/ConfigDef.java    |  9 +--------
 .../java/org/apache/kafka/common/utils/Utils.java    | 11 +++++++++--
 .../kafka/connect/runtime/isolation/PluginsTest.java | 20 ++++++++++++++++++++
 .../runtime/isolation/SynchronizationTest.java       |  4 +++-
 core/src/main/scala/kafka/utils/CoreUtils.scala      |  2 +-
 6 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index ed1871a3d8b..06bd17f4ebd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -297,7 +297,7 @@ public interface ConsumerPartitionAssignor {
             // first try to get the class if passed in as a string
             if (klass instanceof String) {
                 try {
-                    klass = Class.forName((String) klass, true, 
Utils.getContextOrKafkaClassLoader());
+                    klass = Utils.loadClass((String) klass, Object.class);
                 } catch (ClassNotFoundException classNotFound) {
                     throw new KafkaException(klass + " ClassNotFoundException 
exception occurred", classNotFound);
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index d82d06fa162..fd40c031ee7 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -766,14 +766,7 @@ public class ConfigDef {
                     if (value instanceof Class)
                         return value;
                     else if (value instanceof String) {
-                        ClassLoader contextOrKafkaClassLoader = 
Utils.getContextOrKafkaClassLoader();
-                        // Use loadClass here instead of Class.forName because 
the name we use here may be an alias
-                        // and not match the name of the class that gets 
loaded. If that happens, Class.forName can
-                        // throw an exception.
-                        Class<?> klass = 
contextOrKafkaClassLoader.loadClass(trimmed);
-                        // Invoke forName here with the true name of the 
requested class to cause class
-                        // initialization to take place.
-                        return Class.forName(klass.getName(), true, 
contextOrKafkaClassLoader);
+                        return Utils.loadClass(trimmed, Object.class);
                     } else
                         throw new ConfigException(name, value, "Expected a 
Class instance or class name.");
                 default:
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ab9368f0558..8f73c8822ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -423,7 +423,14 @@ public final class Utils {
      * @return the new class
      */
     public static <T> Class<? extends T> loadClass(String klass, Class<T> 
base) throws ClassNotFoundException {
-        return Class.forName(klass, true, 
Utils.getContextOrKafkaClassLoader()).asSubclass(base);
+        ClassLoader contextOrKafkaClassLoader = 
Utils.getContextOrKafkaClassLoader();
+        // Use loadClass here instead of Class.forName because the name we use 
here may be an alias
+        // and not match the name of the class that gets loaded. If that 
happens, Class.forName can
+        // throw an exception.
+        Class<?> loadedClass = contextOrKafkaClassLoader.loadClass(klass);
+        // Invoke forName here with the true name of the requested class to 
cause class
+        // initialization to take place.
+        return Class.forName(loadedClass.getName(), true, 
contextOrKafkaClassLoader).asSubclass(base);
     }
 
     /**
@@ -452,7 +459,7 @@ public final class Utils {
         Class<?>[] argTypes = new Class<?>[params.length / 2];
         Object[] args = new Object[params.length / 2];
         try {
-            Class<?> c = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader());
+            Class<?> c = Utils.loadClass(className, Object.class);
             for (int i = 0; i < params.length / 2; i++) {
                 argTypes[i] = (Class<?>) params[2 * i];
                 args[i] = params[(2 * i) + 1];
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 8a637beee5e..a3d08ad036e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -596,6 +597,25 @@ public class PluginsTest {
         }
     }
 
+    @Test
+    public void testAliasesInConverters() throws ClassNotFoundException {
+        ClassLoader connectorLoader = 
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            String configKey = "config.key";
+            String alias = "SamplingConverter";
+            
assertTrue(TestPlugin.SAMPLING_CONVERTER.className().contains(alias));
+            ConfigDef def = new ConfigDef().define(configKey, 
ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, "docstring");
+            AbstractConfig config = new AbstractConfig(def, 
Collections.singletonMap(configKey, alias));
+
+            assertNotNull(config.getClass(configKey));
+            assertNotNull(config.getConfiguredInstance(configKey, 
Converter.class));
+            assertNotNull(plugins.newConverter(config, configKey, 
ClassLoaderUsage.CURRENT_CLASSLOADER));
+            assertNotNull(plugins.newConverter(config, configKey, 
ClassLoaderUsage.PLUGINS));
+
+            assertNotNull(Utils.newInstance(alias, Converter.class));
+        }
+    }
+
     private void assertClassLoaderReadsVersionFromResource(
             TestPlugin parentResource, TestPlugin childResource, String 
className, String... expectedVersions) {
         URL[] systemPath = TestPlugins.pluginPath(parentResource)
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index e8ba9153bc0..beb1921401d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -44,7 +44,9 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -328,7 +330,7 @@ public class SynchronizationTest {
             synchronized (externalTestLock) {
                 try {
                     progress.await(null);
-                    
Class.forName(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), true, 
connectorLoader);
+                    
Utils.loadClass(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), 
Converter.class);
                 } catch (ClassNotFoundException e) {
                     throw new RuntimeException("Failed to load test plugin", 
e);
                 }
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 8da7a4e7cc1..8403f5245e3 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -114,7 +114,7 @@ object CoreUtils {
    * Create an instance of the class with the given class name
    */
   def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
-    val klass = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
+    val klass = Utils.loadClass(className, 
classOf[Object]).asInstanceOf[Class[T]]
     val constructor = klass.getConstructor(args.map(_.getClass): _*)
     constructor.newInstance(args: _*)
   }

Reply via email to