CAMEL-10586: make the kafka endpoint a little easier to use. The producer can now automatic convert to the serializer configured.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc06080b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc06080b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc06080b Branch: refs/heads/master Commit: cc06080b0c5bdf2d970c4a9bcfb407863db883a1 Parents: ec9b418 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Mar 4 10:37:00 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Mar 4 10:59:23 2017 +0100 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 4 +- .../component/kafka/KafkaConfiguration.java | 17 +++----- .../camel/component/kafka/KafkaConstants.java | 3 ++ .../camel/component/kafka/KafkaProducer.java | 43 +++++++++++--------- .../component/kafka/KafkaProducerTest.java | 8 ++++ 5 files changed, 42 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index e4aff38..8c1d938 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -90,7 +90,7 @@ The Kafka component supports 82 endpoint options which are listed below: | compressionCodec | producer | none | String | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number of milliseconds specified by this config. | key | producer | | String | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY -| keySerializerClass | producer | | String | The serializer class for keys (defaults to the same as for messages if nothing is given). +| keySerializerClass | producer | org.apache.kafka.common.serialization.StringSerializer | String | The serializer class for keys (defaults to the same as for messages if nothing is given). | lingerMs | producer | 0 | Integer | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaythat is rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for example would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. | maxBlockMs | producer | 60000 | Integer | The configuration controls how long sending to kafka will block. These methods can be blocked for multiple reasons. For e.g: buffer full metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching metadata serialization of key and value partitioning and allocation of buffer memory when doing a send(). In case of partitionsFor() this configuration imposes a maximum time threshold on waiting for metadata | maxInFlightRequest | producer | 5 | Integer | The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends there is a risk of message re-ordering due to retries (i.e. if retries are enabled). @@ -110,7 +110,7 @@ The Kafka component supports 82 endpoint options which are listed below: | retries | producer | 0 | Integer | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition and the first fails and is retried but the second succeeds then the second record may appear first. | retryBackoffMs | producer | 100 | Integer | Before each retry the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time this property specifies the amount of time that the producer waits before refreshing the metadata. | sendBufferBytes | producer | 131072 | Integer | Socket write buffer size -| serializerClass | producer | | String | The serializer class for messages. The default encoder takes a byte and returns the same byte. The default class is kafka.serializer.DefaultEncoder +| serializerClass | producer | org.apache.kafka.common.serialization.StringSerializer | String | The serializer class for messages. | workerPool | producer | | ExecutorService | To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | workerPoolCoreSize | producer | 10 | Integer | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. | workerPoolMaxSize | producer | 20 | Integer | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 31e95db..436287b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -131,10 +131,10 @@ public class KafkaConfiguration { //Async producer config @UriParam(label = "producer", defaultValue = "10000") private Integer queueBufferingMaxMessages = 10000; - @UriParam(label = "producer") - private String serializerClass; - @UriParam(label = "producer") - private String keySerializerClass; + @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_SERIALIZER) + private String serializerClass = KafkaConstants.KAFKA_DEFAULT_SERIALIZER; + @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_SERIALIZER) + private String keySerializerClass = KafkaConstants.KAFKA_DEFAULT_SERIALIZER; @UriParam(label = "producer") private String key; @@ -684,24 +684,17 @@ public class KafkaConfiguration { } public String getSerializerClass() { - if (serializerClass == null) { - return KafkaConstants.KAFKA_DEFAULT_SERIALIZER; - } return serializerClass; } /** - * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]. - * The default class is kafka.serializer.DefaultEncoder + * The serializer class for messages. */ public void setSerializerClass(String serializerClass) { this.serializerClass = serializerClass; } public String getKeySerializerClass() { - if (keySerializerClass == null) { - return KafkaConstants.KAFKA_DEFAULT_SERIALIZER; - } return keySerializerClass; } http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java index 49371a6..70f98e2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java @@ -25,8 +25,11 @@ public final class KafkaConstants { public static final String OFFSET = "kafka.OFFSET"; public static final String LAST_RECORD_BEFORE_COMMIT = "kafka.LAST_RECORD_BEFORE_COMMIT"; + @Deprecated public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder"; + @Deprecated public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder"; + public static final String KAFKA_DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner"; http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index f78e369..53410e6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -35,6 +36,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Bytes; public class KafkaProducer extends DefaultAsyncProducer { @@ -134,7 +136,7 @@ public class KafkaProducer extends DefaultAsyncProducer { Object key = endpoint.getConfiguration().getKey() != null ? endpoint.getConfiguration().getKey() : exchange.getIn().getHeader(KafkaConstants.KEY); final Object messageKey = key != null - ? getMessageKey(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null; + ? tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializerClass()) : null; final boolean hasMessageKey = messageKey != null; Object msg = exchange.getIn().getBody(); @@ -159,7 +161,7 @@ public class KafkaProducer extends DefaultAsyncProducer { public ProducerRecord next() { // must convert each entry of the iterator into the value according to the serializer Object next = msgList.next(); - Object value = getMessageValue(exchange, next, endpoint.getConfiguration().getSerializerClass()); + Object value = tryConvertToSerializedType(exchange, next, endpoint.getConfiguration().getSerializerClass()); if (hasPartitionKey && hasMessageKey) { return new ProducerRecord(msgTopic, partitionKey, key, value); @@ -177,7 +179,7 @@ public class KafkaProducer extends DefaultAsyncProducer { } // must convert each entry of the iterator into the value according to the serializer - Object value = getMessageValue(exchange, msg, endpoint.getConfiguration().getSerializerClass()); + Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getSerializerClass()); ProducerRecord record; if (hasPartitionKey && hasMessageKey) { @@ -234,24 +236,27 @@ public class KafkaProducer extends DefaultAsyncProducer { return true; } - protected Object getMessageKey(Exchange exchange, Object key, String keySerializer) { - Object answer = key; - if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(keySerializer)) { - // its string based so ensure key is string as well - answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, key); + /** + * Attempts to convert the object to the same type as the serialized class specified + */ + protected Object tryConvertToSerializedType(Exchange exchange, Object object, String serializerClass) { + Object answer = null; + + if (KafkaConstants.KAFKA_DEFAULT_SERIALIZER.equals(serializerClass)) { + answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, object); + } else if ("org.apache.kafka.common.serialization.ByteArraySerializer".equals(serializerClass)) { + answer = exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, object); + } else if ("org.apache.kafka.common.serialization.ByteBufferSerializer".equals(serializerClass)) { + answer = exchange.getContext().getTypeConverter().tryConvertTo(ByteBuffer.class, exchange, object); + } else if ("org.apache.kafka.common.serialization.BytesSerializer".equals(serializerClass)) { + // we need to convert to byte array first + byte[] array = exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, object); + if (array != null) { + answer = new Bytes(array); + } } - // TODO: other serializers - return answer; - } - protected Object getMessageValue(Exchange exchange, Object value, String valueSerializer) { - Object answer = value; - if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(valueSerializer)) { - // its string based so ensure value is string as well - answer = exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, value); - } - // TODO: other serializers - return answer; + return answer != null ? answer : object; } private final class KafkaProducerCallBack implements Callback { http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index 946e3cd..d30e737 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -23,9 +23,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.TypeConverter; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.DefaultMessage; import org.apache.kafka.clients.producer.Callback; @@ -46,6 +48,8 @@ public class KafkaProducerTest { private KafkaProducer producer; private KafkaEndpoint endpoint; + private TypeConverter converter = Mockito.mock(TypeConverter.class); + private CamelContext context = Mockito.mock(CamelContext.class); private Exchange exchange = Mockito.mock(Exchange.class); private Message in = new DefaultMessage(); private Message out = new DefaultMessage(); @@ -65,6 +69,10 @@ public class KafkaProducerTest { org.apache.kafka.clients.producer.KafkaProducer kp = Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class); Mockito.when(kp.send(Matchers.any(ProducerRecord.class))).thenReturn(future); + Mockito.when(exchange.getContext()).thenReturn(context); + Mockito.when(context.getTypeConverter()).thenReturn(converter); + Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null); + producer.setKafkaProducer(kp); producer.setWorkerPool(Executors.newFixedThreadPool(1)); }