Repository: nifi Updated Branches: refs/heads/0.x 4927e8c4e -> fcf85ecda
NIFI-2515 fixed Kafka serialization/deserialization settings Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fcf85ecd Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fcf85ecd Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fcf85ecd Branch: refs/heads/0.x Commit: fcf85ecdae114745e92c59d06073b7eb6cb8e4a0 Parents: 4927e8c Author: Oleg Zhurakousky <[email protected]> Authored: Mon Aug 8 17:03:12 2016 -0400 Committer: joewitt <[email protected]> Committed: Tue Aug 9 14:30:15 2016 -0400 ---------------------------------------------------------------------- .../kafka/pubsub/AbstractKafkaProcessor.java | 36 +++++++++++++++++--- .../processors/kafka/pubsub/ConsumeKafka.java | 8 ++--- .../processors/kafka/pubsub/PublishKafka.java | 8 ++--- .../kafka/pubsub/ConsumeKafkaTest.java | 16 +++++++++ .../kafka/pubsub/PublishKafkaTest.java | 15 ++++++++ 5 files changed, 67 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fcf85ecd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java index 092de31..736d93e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java @@ -27,8 +27,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -117,11 +120,11 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("ssl.context.service") - .displayName("SSL Context Service") - .description("Specifies the SSL Context Service to use for communicating with Kafka.") + .name("ssl.context.service") + .displayName("SSL Context Service") + .description("Specifies the SSL Context Service to use for communicating with Kafka.") .required(false) - .identifiesControllerService(SSLContextService.class) + .identifiesControllerService(SSLContextService.class) .build(); static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder() @@ -306,6 +309,31 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi } } + String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()) + .getValue(); + if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + } + String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()) + .getValue(); + if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + } + String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()) + .getValue(); + if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + } + String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()) + .getValue(); + if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + } + return results; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fcf85ecd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index 05fa0e3..ac5b4c5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -242,12 +242,8 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[] this.checkIfInitialConnectionPossible(); } - if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - if (!kafkaProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } + kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties); consumer.subscribe(Collections.singletonList(this.topic)); http://git-wip-us.apache.org/repos/asf/nifi/blob/fcf85ecd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 059c5f3..6703c04 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -225,12 +225,8 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { Properties kafkaProperties = this.buildKafkaProperties(context); - if (!kafkaProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - } - if (!kafkaProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - } + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); return publisher; http://git-wip-us.apache.org/repos/asf/nifi/blob/fcf85ecd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 7daac98..8e17a21 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; import java.util.List; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -33,6 +34,21 @@ import org.junit.Test; public class ConsumeKafkaTest { @Test + public void validateCustomSerilaizerDeserializerSettings() throws Exception { + ConsumeKafka consumeKafka = new ConsumeKafka(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka.TOPIC, "foo"); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty("key.deserializer", "Foo"); + runner.assertNotValid(); + } + + @Test public void validatePropertiesValidation() throws Exception { ConsumeKafka consumeKafka = new ConsumeKafka(); TestRunner runner = TestRunners.newTestRunner(consumeKafka); http://git-wip-us.apache.org/repos/asf/nifi/blob/fcf85ecd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index be97578..01c5fdd 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -38,6 +39,20 @@ import org.mockito.Mockito; public class PublishKafkaTest { @Test + public void validateCustomSerilaizerDeserializerSettings() throws Exception { + PublishKafka publishKafka = new PublishKafka(); + TestRunner runner = TestRunners.newTestRunner(publishKafka); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(PublishKafka.TOPIC, "foo"); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec"); + runner.setProperty("key.serializer", ByteArraySerializer.class.getName()); + runner.assertValid(); + runner.setProperty("key.serializer", "Foo"); + runner.assertNotValid(); + } + + @Test public void validatePropertiesValidation() throws Exception { PublishKafka publishKafka = new PublishKafka(); TestRunner runner = TestRunners.newTestRunner(publishKafka);
