This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c97421c1007 KAFKA-17150: Use Utils.loadClass instead of Class.forName
to resolve aliases correctly (#16608)
c97421c1007 is described below
commit c97421c100715a3ad70024b8ef37a0b433292c5d
Author: Greg Harris <[email protected]>
AuthorDate: Wed Jul 17 16:00:45 2024 -0700
KAFKA-17150: Use Utils.loadClass instead of Class.forName to resolve
aliases correctly (#16608)
Signed-off-by: Greg Harris <[email protected]>
Reviewers: Chris Egerton <[email protected]>, Chia-Ping Tsai
<[email protected]>, Josep Prat <[email protected]>
---
.../clients/consumer/ConsumerPartitionAssignor.java | 2 +-
.../org/apache/kafka/common/config/ConfigDef.java | 9 +--------
.../java/org/apache/kafka/common/utils/Utils.java | 11 +++++++++--
.../kafka/connect/runtime/isolation/PluginsTest.java | 20 ++++++++++++++++++++
.../runtime/isolation/SynchronizationTest.java | 4 +++-
core/src/main/scala/kafka/utils/CoreUtils.scala | 2 +-
6 files changed, 35 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 5c5e843d32e..20f2551ba6b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -298,7 +298,7 @@ public interface ConsumerPartitionAssignor {
// first try to get the class if passed in as a string
if (klass instanceof String) {
try {
- klass = Class.forName((String) klass, true,
Utils.getContextOrKafkaClassLoader());
+ klass = Utils.loadClass((String) klass, Object.class);
} catch (ClassNotFoundException classNotFound) {
throw new KafkaException(klass + " ClassNotFoundException
exception occurred", classNotFound);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index d82d06fa162..fd40c031ee7 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -766,14 +766,7 @@ public class ConfigDef {
if (value instanceof Class)
return value;
else if (value instanceof String) {
- ClassLoader contextOrKafkaClassLoader =
Utils.getContextOrKafkaClassLoader();
- // Use loadClass here instead of Class.forName because
the name we use here may be an alias
- // and not match the name of the class that gets
loaded. If that happens, Class.forName can
- // throw an exception.
- Class<?> klass =
contextOrKafkaClassLoader.loadClass(trimmed);
- // Invoke forName here with the true name of the
requested class to cause class
- // initialization to take place.
- return Class.forName(klass.getName(), true,
contextOrKafkaClassLoader);
+ return Utils.loadClass(trimmed, Object.class);
} else
throw new ConfigException(name, value, "Expected a
Class instance or class name.");
default:
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4485b1bd66d..f2961a8f282 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -425,7 +425,14 @@ public final class Utils {
* @return the new class
*/
public static <T> Class<? extends T> loadClass(String klass, Class<T>
base) throws ClassNotFoundException {
- return Class.forName(klass, true,
Utils.getContextOrKafkaClassLoader()).asSubclass(base);
+ ClassLoader contextOrKafkaClassLoader =
Utils.getContextOrKafkaClassLoader();
+ // Use loadClass here instead of Class.forName because the name we use
here may be an alias
+ // and not match the name of the class that gets loaded. If that
happens, Class.forName can
+ // throw an exception.
+ Class<?> loadedClass = contextOrKafkaClassLoader.loadClass(klass);
+ // Invoke forName here with the true name of the requested class to
cause class
+ // initialization to take place.
+ return Class.forName(loadedClass.getName(), true,
contextOrKafkaClassLoader).asSubclass(base);
}
/**
@@ -454,7 +461,7 @@ public final class Utils {
Class<?>[] argTypes = new Class<?>[params.length / 2];
Object[] args = new Object[params.length / 2];
try {
- Class<?> c = Class.forName(className, true,
Utils.getContextOrKafkaClassLoader());
+ Class<?> c = Utils.loadClass(className, Object.class);
for (int i = 0; i < params.length / 2; i++) {
argTypes[i] = (Class<?>) params[2 * i];
args[i] = params[(2 * i) + 1];
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 3d6230bc631..d1c723852bc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -596,6 +597,25 @@ public class PluginsTest {
}
}
+ @Test
+ public void testAliasesInConverters() throws ClassNotFoundException {
+ ClassLoader connectorLoader =
plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
+ try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader))
{
+ String configKey = "config.key";
+ String alias = "SamplingConverter";
+
assertTrue(TestPlugin.SAMPLING_CONVERTER.className().contains(alias));
+ ConfigDef def = new ConfigDef().define(configKey,
ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, "docstring");
+ AbstractConfig config = new AbstractConfig(def,
Collections.singletonMap(configKey, alias));
+
+ assertNotNull(config.getClass(configKey));
+ assertNotNull(config.getConfiguredInstance(configKey,
Converter.class));
+ assertNotNull(plugins.newConverter(config, configKey,
ClassLoaderUsage.CURRENT_CLASSLOADER));
+ assertNotNull(plugins.newConverter(config, configKey,
ClassLoaderUsage.PLUGINS));
+
+ assertNotNull(Utils.newInstance(alias, Converter.class));
+ }
+ }
+
private void assertClassLoaderReadsVersionFromResource(
TestPlugin parentResource, TestPlugin childResource, String
className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index a262f2036ee..6d8d7f71768 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -328,7 +330,7 @@ public class SynchronizationTest {
synchronized (externalTestLock) {
try {
progress.await(null);
-
Class.forName(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), true,
connectorLoader);
+
Utils.loadClass(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(),
Converter.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to load test plugin",
e);
}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 8da7a4e7cc1..8403f5245e3 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -114,7 +114,7 @@ object CoreUtils {
* Create an instance of the class with the given class name
*/
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
- val klass = Class.forName(className, true,
Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
+ val klass = Utils.loadClass(className,
classOf[Object]).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)
}