Repository: camel Updated Branches: refs/heads/master b8f5da747 -> ccef28fbc
[CAMEL-10069] Update to use ClassResolver to help search for the partitioner and serializers Also pull search for Partitioner out into separate try block to allow for use with 0.8 kafka client (which doesn't have partitioner) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ccef28fb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ccef28fb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ccef28fb Branch: refs/heads/master Commit: ccef28fbc003c541eb9ebd3b584092d0ea6c0f5d Parents: b8f5da7 Author: Daniel Kulp <[email protected]> Authored: Wed Jun 22 15:30:14 2016 -0400 Committer: Daniel Kulp <[email protected]> Committed: Wed Jun 22 15:30:14 2016 -0400 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 2 - .../camel/component/kafka/KafkaProducer.java | 73 ++++++++++++-------- .../component/kafka/KafkaProducerFullTest.java | 2 +- 3 files changed, 47 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index a317b54..8b995ff 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.kafka; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -30,7 +29,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index ae2f2a4..a81ec15 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -30,15 +31,20 @@ import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.spi.ClassResolver; +import org.apache.camel.util.CastUtils; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaProducer extends DefaultAsyncProducer { - + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class); + private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer; private final KafkaEndpoint endpoint; private ExecutorService workerPool; @@ -49,36 +55,22 @@ public class KafkaProducer extends DefaultAsyncProducer { this.endpoint = endpoint; } - - Class<?> loadClass(Object o, ClassLoader loader) { + <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) { if (o == null || o instanceof Class) { - return (Class<?>)o; + return CastUtils.cast((Class<?>)o); } String name = o.toString(); - Class<?> c; - try { - c = Class.forName(name, true, loader); - } catch (ClassNotFoundException e) { - c = null; - } + Class<T> c = resolver.resolveClass(name, type); if (c == null) { - try { - c = Class.forName(name, true, getClass().getClassLoader()); - } catch (ClassNotFoundException e) { - c = null; - } + c = resolver.resolveClass(name, type, getClass().getClassLoader()); } if (c == null) { - try { - c = Class.forName(name, true, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); - } catch (ClassNotFoundException e) { - c = null; - } + c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader()); } return c; } - void replaceWithClass(Properties props, String key, ClassLoader loader, Class<?> type) { - Class<?> c = loadClass(props.get(key), loader); + void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) { + Class<?> c = loadClass(props.get(key), resolver, type); if (c != null) { props.put(key, c); } @@ -86,11 +78,28 @@ public class KafkaProducer extends DefaultAsyncProducer { Properties getProps() { Properties props = endpoint.getConfiguration().createProducerProperties(); - if (endpoint.getCamelContext() != null) { - ClassLoader loader = endpoint.getCamelContext().getApplicationContextClassLoader(); - replaceWithClass(props, "key.serializer", loader, Serializer.class); - replaceWithClass(props, "value.serializer", loader, Serializer.class); - replaceWithClass(props, "partitioner.class", loader, Partitioner.class); + try { + if (endpoint.getCamelContext() != null) { + ClassResolver resolver = endpoint.getCamelContext().getClassResolver(); + replaceWithClass(props, "key.serializer", resolver, Serializer.class); + replaceWithClass(props, "value.serializer", resolver, Serializer.class); + + try { + //doesn't exist in old version of Kafka client so detect and only call the method if + //the field/config actually exists + Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG"); + if (f != null) { + loadParitionerClass(resolver, props); + } + } catch (NoSuchFieldException e) { + //ignore + } catch (SecurityException e) { + //ignore + } + } + } catch (Throwable t) { + //can ignore and Kafka itself might be able to handle it, if not, it will throw an exception + LOG.debug("Problem loading classes for Serializers", t); } if (endpoint.getBrokers() != null) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers()); @@ -98,6 +107,11 @@ public class KafkaProducer extends DefaultAsyncProducer { return props; } + private void loadParitionerClass(ClassResolver resolver, Properties props) { + replaceWithClass(props, "partitioner.class", resolver, Partitioner.class); + } + + public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() { return kafkaProducer; } @@ -184,6 +198,11 @@ public class KafkaProducer extends DefaultAsyncProducer { } return new ProducerRecord(msgTopic, msgList.next()); } + + @Override + public void remove() { + msgList.remove(); + } }; } ProducerRecord record; http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index d5b65fa..30f2b13 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -99,7 +99,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { } @Override - protected RoutesBuilder createRouteBuilder() throws Exception { + protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception {
