Repository: nifi
Updated Branches:
  refs/heads/master f6ba92229 -> 18f415001


NIFI-2515 This closes #814. fixed Kafka serialization/deserialization settings


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/18f41500
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/18f41500
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/18f41500

Branch: refs/heads/master
Commit: 18f415001589a7298f3d59c3f844d8865d719050
Parents: f6ba922
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Aug 8 17:03:12 2016 -0400
Committer: joewitt <[email protected]>
Committed: Tue Aug 9 14:28:51 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/AbstractKafkaProcessor.java    | 36 +++++++++++++++++---
 .../processors/kafka/pubsub/ConsumeKafka.java   |  8 ++---
 .../processors/kafka/pubsub/PublishKafka.java   |  8 ++---
 .../kafka/pubsub/ConsumeKafkaTest.java          | 16 +++++++++
 .../kafka/pubsub/PublishKafkaTest.java          | 15 ++++++++
 5 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/18f41500/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
index c2c2321..4677e33 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
@@ -27,8 +27,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -117,11 +120,11 @@ abstract class AbstractKafkaProcessor<T extends 
Closeable> extends AbstractSessi
             .build();
 
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-        .name("ssl.context.service")
-        .displayName("SSL Context Service")
-        .description("Specifies the SSL Context Service to use for 
communicating with Kafka.")
+            .name("ssl.context.service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for 
communicating with Kafka.")
             .required(false)
-        .identifiesControllerService(SSLContextService.class)
+            .identifiesControllerService(SSLContextService.class)
             .build();
 
     static final Builder MESSAGE_DEMARCATOR_BUILDER = new 
PropertyDescriptor.Builder()
@@ -306,6 +309,31 @@ abstract class AbstractKafkaProcessor<T extends Closeable> 
extends AbstractSessi
             }
         }
 
+        String keySerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build())
+                .getValue();
+        if (keySerializer != null && 
!ByteArraySerializer.class.getName().equals(keySerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Key Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+        }
+        String valueSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build())
+                .getValue();
+        if (valueSerializer != null && 
!ByteArraySerializer.class.getName().equals(valueSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Value Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + valueSerializer + 
"'").build());
+        }
+        String keyDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build())
+                .getValue();
+        if (keyDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Key De-Serializer must be '" + 
ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + 
"'").build());
+        }
+        String valueDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build())
+                .getValue();
+        if (valueDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Value De-Serializer must be " + 
ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + 
"'").build());
+        }
+
         return results;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/18f41500/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 05fa0e3..ac5b4c5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -242,12 +242,8 @@ public class ConsumeKafka extends 
AbstractKafkaProcessor<Consumer<byte[], byte[]
             this.checkIfInitialConnectionPossible();
         }
 
-        if 
(!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-            kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        }
-        if 
(!kafkaProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-            
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        }
+        kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(kafkaProperties);
         consumer.subscribe(Collections.singletonList(this.topic));

http://git-wip-us.apache.org/repos/asf/nifi/blob/18f41500/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 059c5f3..6703c04 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -225,12 +225,8 @@ public class PublishKafka extends 
AbstractKafkaProcessor<KafkaPublisher> {
     @Override
     protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
         Properties kafkaProperties = this.buildKafkaProperties(context);
-        if 
(!kafkaProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
-            kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        }
-        if 
(!kafkaProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
-            kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        }
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
this.getLogger());
         return publisher;

http://git-wip-us.apache.org/repos/asf/nifi/blob/18f41500/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7daac98..8e17a21 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -33,6 +34,21 @@ import org.junit.Test;
 public class ConsumeKafkaTest {
 
     @Test
+    public void validateCustomSerilaizerDeserializerSettings() throws 
Exception {
+        ConsumeKafka consumeKafka = new ConsumeKafka();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka.TOPIC, "foo");
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+        runner.setProperty("key.deserializer", 
ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty("key.deserializer", "Foo");
+        runner.assertNotValid();
+    }
+
+    @Test
     public void validatePropertiesValidation() throws Exception {
         ConsumeKafka consumeKafka = new ConsumeKafka();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);

http://git-wip-us.apache.org/repos/asf/nifi/blob/18f41500/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index be97578..01c5fdd 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -38,6 +39,20 @@ import org.mockito.Mockito;
 public class PublishKafkaTest {
 
     @Test
+    public void validateCustomSerilaizerDeserializerSettings() throws 
Exception {
+        PublishKafka publishKafka = new PublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(PublishKafka.TOPIC, "foo");
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec");
+        runner.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
+        runner.assertValid();
+        runner.setProperty("key.serializer", "Foo");
+        runner.assertNotValid();
+    }
+
+    @Test
     public void validatePropertiesValidation() throws Exception {
         PublishKafka publishKafka = new PublishKafka();
         TestRunner runner = TestRunners.newTestRunner(publishKafka);

Reply via email to