Repository: kafka Updated Branches: refs/heads/trunk a1e0b2240 -> 313aad423
KAFKA-3680; Enable Kafka clients to run in any classloader env Configure default classes using class objects instead of class names, enable configurable lists of classes to be specified as class objects, add tests for different classloader configurations. Author: Rajini Sivaram <[email protected]> Reviewers: Sriharsha Chintalapani <[email protected]>, Ismael Juma <[email protected]> Closes #1421 from rajinisivaram/KAFKA-3680 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/313aad42 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/313aad42 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/313aad42 Branch: refs/heads/trunk Commit: 313aad423cb19618b8693af6f04b68138059c585 Parents: a1e0b22 Author: Rajini Sivaram <[email protected]> Authored: Tue Aug 23 10:31:29 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Aug 23 10:31:29 2016 +0100 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 3 +- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../kafka/common/config/AbstractConfig.java | 17 ++- .../kafka/common/config/AbstractConfigTest.java | 125 ++++++++++++++++++- 4 files changed, 133 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 509c3a1..4ce908e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Deserializer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -215,7 +216,7 @@ public class ConsumerConfig extends AbstractConfig { HEARTBEAT_INTERVAL_MS_DOC) .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.LIST, - RangeAssignor.class.getName(), + Collections.singletonList(RangeAssignor.class), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dbbde06..2ca2183 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -293,7 +293,7 @@ public class ProducerConfig extends AbstractConfig { CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, - DefaultPartitioner.class.getName(), + DefaultPartitioner.class, Importance.MEDIUM, PARTITIONER_CLASS_DOC) .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 9f3cba4..096047f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -221,13 +221,18 @@ public class AbstractConfig { List<T> objects = new ArrayList<T>(); if (klasses == null) return objects; - for (String klass : klasses) { + for (Object klass : klasses) { Object o; - try { - o = Utils.newInstance(klass, t); - } catch (ClassNotFoundException e) { - throw new KafkaException(klass + " ClassNotFoundException exception occured", e); - } + if (klass instanceof String) { + try { + o = Utils.newInstance((String) klass, t); + } catch (ClassNotFoundException e) { + throw new KafkaException(klass + " ClassNotFoundException exception occured", e); + } + } else if (klass instanceof Class<?>) { + o = Utils.newInstance((Class<?>) klass); + } else + throw new KafkaException("List contains element of type " + klass.getClass() + ", expected String or Class"); if (!t.isInstance(o)) throw new KafkaException(klass + " is not an instance of " + t.getName()); if (o instanceof Configurable) http://git-wip-us.apache.org/repos/asf/kafka/blob/313aad42/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index d9404c2..d483ef0 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -16,10 +16,13 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.FakeMetricsReporter; +import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricsReporter; import org.junit.Test; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -96,6 +99,120 @@ public class AbstractConfigTest { } } + @Test + public void testClassConfigs() { + class RestrictedClassLoader extends ClassLoader { + public RestrictedClassLoader() { + super(null); + } + @Override + protected Class<?> findClass(String name) throws ClassNotFoundException { + if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName())) + return null; + else + return ClassTestConfig.class.getClassLoader().loadClass(name); + } + } + + ClassLoader restrictedClassLoader = new RestrictedClassLoader(); + ClassLoader defaultClassLoader = AbstractConfig.class.getClassLoader(); + + // Test default classloading where all classes are visible to thread context classloader + Thread.currentThread().setContextClassLoader(defaultClassLoader); + ClassTestConfig testConfig = new ClassTestConfig(); + testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS); + + // Test default classloading where default classes are not visible to thread context classloader + // Static classloading is used for default classes, so instance creation should succeed. + Thread.currentThread().setContextClassLoader(restrictedClassLoader); + testConfig = new ClassTestConfig(); + testConfig.checkInstances(ClassTestConfig.DEFAULT_CLASS, ClassTestConfig.DEFAULT_CLASS); + + // Test class overrides with names or classes where all classes are visible to thread context classloader + Thread.currentThread().setContextClassLoader(defaultClassLoader); + ClassTestConfig.testOverrides(); + + // Test class overrides with names or classes where all classes are visible to Kafka classloader, context classloader is null + Thread.currentThread().setContextClassLoader(null); + ClassTestConfig.testOverrides(); + + // Test class overrides where some classes are not visible to thread context classloader + Thread.currentThread().setContextClassLoader(restrictedClassLoader); + // Properties specified as classes should succeed + testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Arrays.asList(ClassTestConfig.RESTRICTED_CLASS)); + testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.RESTRICTED_CLASS); + testConfig = new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS, Arrays.asList(ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS)); + testConfig.checkInstances(ClassTestConfig.RESTRICTED_CLASS, ClassTestConfig.VISIBLE_CLASS, ClassTestConfig.RESTRICTED_CLASS); + // Properties specified as classNames should fail to load classes + try { + new ClassTestConfig(ClassTestConfig.RESTRICTED_CLASS.getName(), null); + fail("Config created with class property that cannot be loaded"); + } catch (ConfigException e) { + // Expected Exception + } + try { + testConfig = new ClassTestConfig(null, Arrays.asList(ClassTestConfig.VISIBLE_CLASS.getName(), ClassTestConfig.RESTRICTED_CLASS.getName())); + testConfig.getConfiguredInstances("list.prop", MetricsReporter.class); + fail("Should have failed to load class"); + } catch (KafkaException e) { + // Expected Exception + } + try { + testConfig = new ClassTestConfig(null, ClassTestConfig.VISIBLE_CLASS.getName() + "," + ClassTestConfig.RESTRICTED_CLASS.getName()); + testConfig.getConfiguredInstances("list.prop", MetricsReporter.class); + fail("Should have failed to load class"); + } catch (KafkaException e) { + // Expected Exception + } + } + + private static class ClassTestConfig extends AbstractConfig { + static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class; + static final Class<?> VISIBLE_CLASS = JmxReporter.class; + static final Class<?> RESTRICTED_CLASS = ConfiguredFakeMetricsReporter.class; + + private static final ConfigDef CONFIG; + static { + CONFIG = new ConfigDef().define("class.prop", Type.CLASS, DEFAULT_CLASS, Importance.HIGH, "docs") + .define("list.prop", Type.LIST, Arrays.asList(DEFAULT_CLASS), Importance.HIGH, "docs"); + } + + public ClassTestConfig() { + super(CONFIG, new Properties()); + } + + public ClassTestConfig(Object classPropOverride, Object listPropOverride) { + super(CONFIG, overrideProps(classPropOverride, listPropOverride)); + } + + void checkInstances(Class<?> expectedClassPropClass, Class<?>... expectedListPropClasses) { + assertEquals(expectedClassPropClass, getConfiguredInstance("class.prop", MetricsReporter.class).getClass()); + List<?> list = getConfiguredInstances("list.prop", MetricsReporter.class); + for (int i = 0; i < list.size(); i++) + assertEquals(expectedListPropClasses[i], list.get(i).getClass()); + } + + static void testOverrides() { + ClassTestConfig testConfig1 = new ClassTestConfig(RESTRICTED_CLASS, Arrays.asList(VISIBLE_CLASS, RESTRICTED_CLASS)); + testConfig1.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); + + ClassTestConfig testConfig2 = new ClassTestConfig(RESTRICTED_CLASS.getName(), Arrays.asList(VISIBLE_CLASS.getName(), RESTRICTED_CLASS.getName())); + testConfig2.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); + + ClassTestConfig testConfig3 = new ClassTestConfig(RESTRICTED_CLASS.getName(), VISIBLE_CLASS.getName() + "," + RESTRICTED_CLASS.getName()); + testConfig3.checkInstances(RESTRICTED_CLASS, VISIBLE_CLASS, RESTRICTED_CLASS); + } + + private static Map<String, Object> overrideProps(Object classProp, Object listProp) { + Map<String, Object> props = new HashMap<>(); + if (classProp != null) + props.put("class.prop", classProp); + if (listProp != null) + props.put("list.prop", listProp); + return props; + } + } + private static class TestConfig extends AbstractConfig { private static final ConfigDef CONFIG; @@ -127,20 +244,16 @@ public class AbstractConfigTest { } public static class FakeMetricsReporterConfig extends AbstractConfig { - private static final ConfigDef CONFIG; public static final String EXTRA_CONFIG = "metric.extra_config"; private static final String EXTRA_CONFIG_DOC = "An extraneous configuration string."; - - static { - CONFIG = new ConfigDef().define( + private static final ConfigDef CONFIG = new ConfigDef().define( EXTRA_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, EXTRA_CONFIG_DOC); - } + public FakeMetricsReporterConfig(Map<?, ?> props) { super(CONFIG, props); } } - }
