Repository: kafka
Updated Branches:
  refs/heads/trunk a1e0b2240 -> 313aad423


KAFKA-3680; Enable Kafka clients to run in any classloader env

Configure default classes using class objects instead of class names, enable 
configurable lists of classes to be specified as class objects, add tests for 
different classloader configurations.

Author: Rajini Sivaram <[email protected]>

Reviewers: Sriharsha Chintalapani <[email protected]>, Ismael Juma 
<[email protected]>

Closes #1421 from rajinisivaram/KAFKA-3680


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/313aad42
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/313aad42
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/313aad42

Branch: refs/heads/trunk
Commit: 313aad423cb19618b8693af6f04b68138059c585
Parents: a1e0b22
Author: Rajini Sivaram <[email protected]>
Authored: Tue Aug 23 10:31:29 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Tue Aug 23 10:31:29 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |   3 +-
 .../kafka/clients/producer/ProducerConfig.java  |   2 +-
 .../kafka/common/config/AbstractConfig.java     |  17 ++-
 .../kafka/common/config/AbstractConfigTest.java | 125 ++++++++++++++++++-
 4 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 509c3a1..4ce908e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Deserializer;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -215,7 +216,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         HEARTBEAT_INTERVAL_MS_DOC)
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.LIST,
-                                        RangeAssignor.class.getName(),
+                                        
Collections.singletonList(RangeAssignor.class),
                                         Importance.MEDIUM,
                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dbbde06..2ca2183 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -293,7 +293,7 @@ public class ProducerConfig extends AbstractConfig {
                                         
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG,
                                         Type.CLASS,
-                                        DefaultPartitioner.class.getName(),
+                                        DefaultPartitioner.class,
                                         Importance.MEDIUM, 
PARTITIONER_CLASS_DOC)
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,

http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9f3cba4..096047f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -221,13 +221,18 @@ public class AbstractConfig {
         List<T> objects = new ArrayList<T>();
         if (klasses == null)
             return objects;
-        for (String klass : klasses) {
+        for (Object klass : klasses) {
             Object o;
-            try {
-                o = Utils.newInstance(klass, t);
-            } catch (ClassNotFoundException e) {
-                throw new KafkaException(klass + " ClassNotFoundException 
exception occured", e);
-            }
+            if (klass instanceof String) {
+                try {
+                    o = Utils.newInstance((String) klass, t);
+                } catch (ClassNotFoundException e) {
+                    throw new KafkaException(klass + " ClassNotFoundException 
exception occured", e);
+                }
+            } else if (klass instanceof Class<?>) {
+                o = Utils.newInstance((Class<?>) klass);
+            } else
+                throw new KafkaException("List contains element of type " + 
klass.getClass() + ", expected String or Class");
             if (!t.isInstance(o))
                 throw new KafkaException(klass + " is not an instance of " + 
t.getName());
             if (o instanceof Configurable)

http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index d9404c2..d483ef0 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -16,10 +16,13 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.FakeMetricsReporter;
+import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -96,6 +99,120 @@ public class AbstractConfigTest {
         }
     }
 
+    @Test
+    public void testClassConfigs() {
+        class RestrictedClassLoader extends ClassLoader {
+            public RestrictedClassLoader() {
+                super(null);
+            }
+            @Override
+            protected Class<?> findClass(String name) throws 
ClassNotFoundException {
+                if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || 
name.equals(ClassTestConfig.RESTRICTED_CLASS.getName()))
+                    return null;
+                else
+                    return 
ClassTestConfig.class.getClassLoader().loadClass(name);
+            }
+        }
+
+        ClassLoader restrictedClassLoader = new RestrictedClassLoader();
+        ClassLoader defaultClassLoader = AbstractConfig.class.getClassLoader();
+
+        // Test default classloading where all classes are visible to thread 
context classloader
+        Thread.currentThread().setContextClassLoader(defaultClassLoader);
+        ClassTestConfig testConfig = new ClassTestConfig();
+        testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, 
ClassTestConfig.DEFAULT_CLASS);
+
+        // Test default classloading where default classes are not visible to 
thread context classloader
+        // Static classloading is used for default classes, so instance 
creation should succeed.
+        Thread.currentThread().setContextClassLoader(restrictedClassLoader);
+        testConfig = new ClassTestConfig();
+        testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, 
ClassTestConfig.DEFAULT_CLASS);
+
+        // Test class overrides with names or classes where all classes are 
visible to thread context classloader
+        Thread.currentThread().setContextClassLoader(defaultClassLoader);
+        ClassTestConfig.testOverrides();
+
+        // Test class overrides with names or classes where all classes are 
visible to Kafka classloader, context classloader is null
+        Thread.currentThread().setContextClassLoader(null);
+        ClassTestConfig.testOverrides();
+
+        // Test class overrides where some classes are not visible to thread 
context classloader
+        Thread.currentThread().setContextClassLoader(restrictedClassLoader);
+        // Properties specified as classes should succeed
+        testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, 
Arrays.asList(ClassTestConfig.RESTRICTED_CLASS));
+        testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, 
ClassTestConfig.RESTRICTED_CLASS);
+        testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, 
Arrays.asList(ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS));
+        testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, 
ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS);
+        // Properties specified as classNames should fail to load classes
+        try {
+            new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS.getName(), 
null);
+            fail("Config created with class property that cannot be loaded");
+        } catch (ConfigException e) {
+            // Expected Exception
+        }
+        try {
+            testConfig = new ClassTestConfig(null, 
Arrays.asList(ClassTestConfig.VISIBLE_CLASS.getName(), 
ClassTestConfig.RESTRICTED_CLASS.getName()));
+            testConfig.getConfiguredInstances("list.prop", 
MetricsReporter.class);
+            fail("Should have failed to load class");
+        } catch (KafkaException e) {
+            // Expected Exception
+        }
+        try {
+            testConfig = new ClassTestConfig(null, 
ClassTestConfig.VISIBLE_CLASS.getName() + "," + 
ClassTestConfig.RESTRICTED_CLASS.getName());
+            testConfig.getConfiguredInstances("list.prop", 
MetricsReporter.class);
+            fail("Should have failed to load class");
+        } catch (KafkaException e) {
+            // Expected Exception
+        }
+    }
+
+    private static class ClassTestConfig extends AbstractConfig {
+        static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class;
+        static final Class<?> VISIBLE_CLASS = JmxReporter.class;
+        static final Class<?> RESTRICTED_CLASS = 
ConfiguredFakeMetricsReporter.class;
+
+        private static final ConfigDef CONFIG;
+        static {
+            CONFIG = new ConfigDef().define("class.prop", Type.CLASS, 
DEFAULT_CLASS, Importance.HIGH, "docs")
+                                    .define("list.prop", Type.LIST, 
Arrays.asList(DEFAULT_CLASS), Importance.HIGH, "docs");
+        }
+
+        public ClassTestConfig() {
+            super(CONFIG, new Properties());
+        }
+
+        public ClassTestConfig(Object classPropOverride, Object 
listPropOverride) {
+            super(CONFIG, overrideProps(classPropOverride, listPropOverride));
+        }
+
+        void checkInstances(Class<?> expectedClassPropClass, Class<?>... 
expectedListPropClasses) {
+            assertEquals(expectedClassPropClass, 
getConfiguredInstance("class.prop", MetricsReporter.class).getClass());
+            List<?> list = getConfiguredInstances("list.prop", 
MetricsReporter.class);
+            for (int i = 0; i < list.size(); i++)
+                assertEquals(expectedListPropClasses[i], 
list.get(i).getClass());
+        }
+
+        static void testOverrides() {
+            ClassTestConfig testConfig1 = new 
ClassTestConfig(RESTRICTED_CLASS, Arrays.asList(VISIBLE_CLASS, 
RESTRICTED_CLASS));
+            testConfig1.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, 
RESTRICTED_CLASS);
+
+            ClassTestConfig testConfig2 = new 
ClassTestConfig(RESTRICTED_CLASS.getName(), 
Arrays.asList(VISIBLE_CLASS.getName(), RESTRICTED_CLASS.getName()));
+            testConfig2.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, 
RESTRICTED_CLASS);
+
+            ClassTestConfig testConfig3 = new 
ClassTestConfig(RESTRICTED_CLASS.getName(), VISIBLE_CLASS.getName() + "," + 
RESTRICTED_CLASS.getName());
+            testConfig3.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, 
RESTRICTED_CLASS);
+        }
+
+        private static Map<String, Object> overrideProps(Object classProp, 
Object listProp) {
+            Map<String, Object> props = new HashMap<>();
+            if (classProp != null)
+                props.put("class.prop", classProp);
+            if (listProp != null)
+                props.put("list.prop", listProp);
+            return props;
+        }
+    }
+
     private static class TestConfig extends AbstractConfig {
 
         private static final ConfigDef CONFIG;
@@ -127,20 +244,16 @@ public class AbstractConfigTest {
     }
 
     public static class FakeMetricsReporterConfig extends AbstractConfig {
-        private static final ConfigDef CONFIG;
 
         public static final String EXTRA_CONFIG = "metric.extra_config";
         private static final String EXTRA_CONFIG_DOC = "An extraneous 
configuration string.";
-
-        static {
-            CONFIG = new ConfigDef().define(
+        private static final ConfigDef CONFIG = new ConfigDef().define(
                 EXTRA_CONFIG, ConfigDef.Type.STRING, "",
                 ConfigDef.Importance.LOW, EXTRA_CONFIG_DOC);
-        }
+
 
         public FakeMetricsReporterConfig(Map<?, ?> props) {
             super(CONFIG, props);
         }
     }
-
 }

Reply via email to