CAMEL-10586: make the kafka endpoint a little easier to use. The producer can 
now automatic convert to the serializer configured.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc06080b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc06080b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc06080b

Branch: refs/heads/master
Commit: cc06080b0c5bdf2d970c4a9bcfb407863db883a1
Parents: ec9b418
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Mar 4 10:37:00 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Mar 4 10:59:23 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          |  4 +-
 .../component/kafka/KafkaConfiguration.java     | 17 +++-----
 .../camel/component/kafka/KafkaConstants.java   |  3 ++
 .../camel/component/kafka/KafkaProducer.java    | 43 +++++++++++---------
 .../component/kafka/KafkaProducerTest.java      |  8 ++++
 5 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index e4aff38..8c1d938 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -90,7 +90,7 @@ The Kafka component supports 82 endpoint options which are 
listed below:
 | compressionCodec | producer | none | String | This parameter allows you to 
specify the compression codec for all data generated by this producer. Valid 
values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections 
after the number of milliseconds specified by this config.
 | key | producer |  | String | The record key (or null if no key is 
specified). If this option has been configured then it take precedence over 
header link KafkaConstantsKEY
-| keySerializerClass | producer |  | String | The serializer class for keys 
(defaults to the same as for messages if nothing is given).
+| keySerializerClass | producer | 
org.apache.kafka.common.serialization.StringSerializer | String | The 
serializer class for keys (defaults to the same as for messages if nothing is 
given).
 | lingerMs | producer | 0 | Integer | The producer groups together any records 
that arrive in between request transmissions into a single batched request. 
Normally this occurs only under load when records arrive faster than they can 
be sent out. However in some circumstances the client may want to reduce the 
number of requests even under moderate load. This setting accomplishes this by 
adding a small amount of artificial delaythat is rather than immediately 
sending out a record the producer will wait for up to the given delay to allow 
other records to be sent so that the sends can be batched together. This can be 
thought of as analogous to Nagle's algorithm in TCP. This setting gives the 
upper bound on the delay for batching: once we get batch.size worth of records 
for a partition it will be sent immediately regardless of this setting however 
if we have fewer than this many bytes accumulated for this partition we will 
'linger' for the specified time waiting for more records to show 
 up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5 for 
example would have the effect of reducing the number of requests sent but would 
add up to 5ms of latency to records sent in the absense of load.
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how 
long sending to kafka will block. These methods can be blocked for multiple 
reasons. For e.g: buffer full metadata unavailable.This configuration imposes 
maximum limit on the total time spent in fetching metadata serialization of key 
and value partitioning and allocation of buffer memory when doing a send(). In 
case of partitionsFor() this configuration imposes a maximum time threshold on 
waiting for metadata
 | maxInFlightRequest | producer | 5 | Integer | The maximum number of 
unacknowledged requests the client will send on a single connection before 
blocking. Note that if this setting is set to be greater than 1 and there are 
failed sends there is a risk of message re-ordering due to retries (i.e. if 
retries are enabled).
@@ -110,7 +110,7 @@ The Kafka component supports 82 endpoint options which are 
listed below:
 | retries | producer | 0 | Integer | Setting a value greater than zero will 
cause the client to resend any record whose send fails with a potentially 
transient error. Note that this retry is no different than if the client resent 
the record upon receiving the error. Allowing retries will potentially change 
the ordering of records because if two records are sent to a single partition 
and the first fails and is retried but the second succeeds then the second 
record may appear first.
 | retryBackoffMs | producer | 100 | Integer | Before each retry the producer 
refreshes the metadata of relevant topics to see if a new leader has been 
elected. Since leader election takes a bit of time this property specifies the 
amount of time that the producer waits before refreshing the metadata.
 | sendBufferBytes | producer | 131072 | Integer | Socket write buffer size
-| serializerClass | producer |  | String | The serializer class for messages. 
The default encoder takes a byte and returns the same byte. The default class 
is kafka.serializer.DefaultEncoder
+| serializerClass | producer | 
org.apache.kafka.common.serialization.StringSerializer | String | The 
serializer class for messages.
 | workerPool | producer |  | ExecutorService | To use a custom worker pool for 
continue routing Exchange after kafka server has acknowledge the message that 
was sent to it from KafkaProducer using asynchronous non-blocking processing.
 | workerPoolCoreSize | producer | 10 | Integer | Number of core threads for 
the worker pool for continue routing Exchange after kafka server has 
acknowledge the message that was sent to it from KafkaProducer using 
asynchronous non-blocking processing.
 | workerPoolMaxSize | producer | 20 | Integer | Maximum number of threads for 
the worker pool for continue routing Exchange after kafka server has 
acknowledge the message that was sent to it from KafkaProducer using 
asynchronous non-blocking processing.

http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 31e95db..436287b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -131,10 +131,10 @@ public class KafkaConfiguration {
     //Async producer config
     @UriParam(label = "producer", defaultValue = "10000")
     private Integer queueBufferingMaxMessages = 10000;
-    @UriParam(label = "producer")
-    private String serializerClass;
-    @UriParam(label = "producer")
-    private String keySerializerClass;
+    @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER)
+    private String serializerClass = KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+    @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER)
+    private String keySerializerClass = 
KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
 
     @UriParam(label = "producer")
     private String key;
@@ -684,24 +684,17 @@ public class KafkaConfiguration {
     }
 
     public String getSerializerClass() {
-        if (serializerClass == null) {
-            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
-        }
         return serializerClass;
     }
 
     /**
-     * The serializer class for messages. The default encoder takes a byte[] 
and returns the same byte[].
-     * The default class is kafka.serializer.DefaultEncoder
+     * The serializer class for messages.
      */
     public void setSerializerClass(String serializerClass) {
         this.serializerClass = serializerClass;
     }
 
     public String getKeySerializerClass() {
-        if (keySerializerClass == null) {
-            return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
-        }
         return keySerializerClass;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index 49371a6..70f98e2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -25,8 +25,11 @@ public final class KafkaConstants {
     public static final String OFFSET = "kafka.OFFSET";
     public static final String LAST_RECORD_BEFORE_COMMIT = 
"kafka.LAST_RECORD_BEFORE_COMMIT";
 
+    @Deprecated
     public static final String KAFKA_DEFAULT_ENCODER = 
"kafka.serializer.DefaultEncoder";
+    @Deprecated
     public static final String KAFKA_STRING_ENCODER = 
"kafka.serializer.StringEncoder";
+
     public static final String KAFKA_DEFAULT_SERIALIZER  = 
"org.apache.kafka.common.serialization.StringSerializer";
     public static final String KAFKA_DEFAULT_DESERIALIZER  = 
"org.apache.kafka.common.serialization.StringDeserializer";
     public static final String KAFKA_DEFAULT_PARTITIONER = 
"org.apache.kafka.clients.producer.internals.DefaultPartitioner";

http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index f78e369..53410e6 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Bytes;
 
 public class KafkaProducer extends DefaultAsyncProducer {
 
@@ -134,7 +136,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Object key = endpoint.getConfiguration().getKey() != null
             ? endpoint.getConfiguration().getKey() : 
exchange.getIn().getHeader(KafkaConstants.KEY);
         final Object messageKey = key != null
-            ? getMessageKey(exchange, key, 
endpoint.getConfiguration().getKeySerializerClass()) : null;
+            ? tryConvertToSerializedType(exchange, key, 
endpoint.getConfiguration().getKeySerializerClass()) : null;
         final boolean hasMessageKey = messageKey != null;
 
         Object msg = exchange.getIn().getBody();
@@ -159,7 +161,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
                 public ProducerRecord next() {
                     // must convert each entry of the iterator into the value 
according to the serializer
                     Object next = msgList.next();
-                    Object value = getMessageValue(exchange, next, 
endpoint.getConfiguration().getSerializerClass());
+                    Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());
 
                     if (hasPartitionKey && hasMessageKey) {
                         return new ProducerRecord(msgTopic, partitionKey, key, 
value);
@@ -177,7 +179,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         }
 
         // must convert each entry of the iterator into the value according to 
the serializer
-        Object value = getMessageValue(exchange, msg, 
endpoint.getConfiguration().getSerializerClass());
+        Object value = tryConvertToSerializedType(exchange, msg, 
endpoint.getConfiguration().getSerializerClass());
 
         ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
@@ -234,24 +236,27 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return true;
     }
 
-    protected Object getMessageKey(Exchange exchange, Object key, String 
keySerializer) {
-        Object answer = key;
-        if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(keySerializer)) {
-            // its string based so ensure key is string as well
-            answer = 
exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, 
key);
+    /**
+     * Attempts to convert the object to the same type as the serialized class 
specified
+     */
+    protected Object tryConvertToSerializedType(Exchange exchange, Object 
object, String serializerClass) {
+        Object answer = null;
+
+        if (KafkaConstants.KAFKA_DEFAULT_SERIALIZER.equals(serializerClass)) {
+            answer = 
exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, 
object);
+        } else if 
("org.apache.kafka.common.serialization.ByteArraySerializer".equals(serializerClass))
 {
+            answer = 
exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, 
object);
+        } else if 
("org.apache.kafka.common.serialization.ByteBufferSerializer".equals(serializerClass))
 {
+            answer = 
exchange.getContext().getTypeConverter().tryConvertTo(ByteBuffer.class, 
exchange, object);
+        } else if 
("org.apache.kafka.common.serialization.BytesSerializer".equals(serializerClass))
 {
+            // we need to convert to byte array first
+            byte[] array = 
exchange.getContext().getTypeConverter().tryConvertTo(byte[].class, exchange, 
object);
+            if (array != null) {
+                answer = new Bytes(array);
+            }
         }
-        // TODO: other serializers
-        return answer;
-    }
 
-    protected Object getMessageValue(Exchange exchange, Object value, String 
valueSerializer) {
-        Object answer = value;
-        if (KafkaConstants.KAFKA_DEFAULT_DESERIALIZER.equals(valueSerializer)) 
{
-            // its string based so ensure value is string as well
-            answer = 
exchange.getContext().getTypeConverter().tryConvertTo(String.class, exchange, 
value);
-        }
-        // TODO: other serializers
-        return answer;
+        return answer != null ? answer : object;
     }
 
     private final class KafkaProducerCallBack implements Callback {

http://git-wip-us.apache.org/repos/asf/camel/blob/cc06080b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 946e3cd..d30e737 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -23,9 +23,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.kafka.clients.producer.Callback;
@@ -46,6 +48,8 @@ public class KafkaProducerTest {
     private KafkaProducer producer;
     private KafkaEndpoint endpoint;
 
+    private TypeConverter converter = Mockito.mock(TypeConverter.class);
+    private CamelContext context = Mockito.mock(CamelContext.class);
     private Exchange exchange = Mockito.mock(Exchange.class);
     private Message in = new DefaultMessage();
     private Message out = new DefaultMessage();
@@ -65,6 +69,10 @@ public class KafkaProducerTest {
         org.apache.kafka.clients.producer.KafkaProducer kp = 
Mockito.mock(org.apache.kafka.clients.producer.KafkaProducer.class);
         
Mockito.when(kp.send(Matchers.any(ProducerRecord.class))).thenReturn(future);
 
+        Mockito.when(exchange.getContext()).thenReturn(context);
+        Mockito.when(context.getTypeConverter()).thenReturn(converter);
+        Mockito.when(converter.tryConvertTo(String.class, exchange, 
null)).thenReturn(null);
+
         producer.setKafkaProducer(kp);
         producer.setWorkerPool(Executors.newFixedThreadPool(1));
     }

Reply via email to