Repository: nifi
Updated Branches:
  refs/heads/master 4281a51c8 -> e1742c5a0


NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties set 
via UI and then maps those properties to the actual names used by Kafka, we can 
not rely on NiFi UI to display an error if user attempts to set a dynamic 
property which will eventually map to the same Kafka property. So, I’ve 
decided that any dynamic property will simply override an existing property 
with WARNING message displayed. It is actually consistent with how Kafka does 
it and displayed the overrides in the console. Updated the relevant annotation 
description.
It is also worth to mentioned that current code was using an old property from 
Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 
0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - 
Property zk.connectiontimeout.ms is not valid). The add/override strategy would 
provide for more flexibility when dealing with Kafka volatile configuration 
until things will settle down and we can get some sensible defaults in place.

While doing it addressed the following issues that were discovered while making 
modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the 
onTrigger(..) method would block due to the fact that Kafka’s 
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would 
stops successfully due to the interrupt. However in UI it would appear as ERROR 
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let 
the task exit as quick as possible and that the whole thread maintenance logic 
was there initially due to the fact that there was no way to tell Kafka 
consumer to return immediately if there are no events. In this patch we are now 
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 
millisecond (default is -1 - always block infinitely). This ensures that tasks 
that attempted to read an empty topic will exit immediately just to be 
rescheduled by NiFi based on user configurations.

ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough 
to complete the batch since it would block waiting for more messages (based on 
the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s 
ConsumerTimeoutException which is handled in the catch block where the FlowFile 
with partial batch will be released to success. Not sure if we need to put a 
WARN message. In fact in my opinion we should not as it may create unnecessary 
confusion.

ISSUE: When configuring a consumer for topic and specifying multiple concurrent 
consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ 
each consumer would bind to a topic partition. If you have less partitions then 
the value returned by 'context.getMaxConcurrentTasks()’ you would essentially 
allocate Kafka resources that would never get a chance to receive a single 
message  (see more here 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic 
and in the event where 'context.getMaxConcurrentTasks()’ value is greater 
than the amount of partitions, the partition count will be used to when 
creating ‘topicCountMap’ and WARNING message will be displayed)see code). 
Unfortunately we can’t do anything with the actual tasks, but based on 
current state of the code they will exit immediately just to be rescheduled 
where the process will repeat. NOTE: That is not ideal as it will be 
rescheduling tasks that will never have a chance to do anything, but at least 
it could be fixed on the user side after reading the warning message.

NIFI-1192 added dynamic properties support for PutKafka

NIFI-1192 polishing

NIFI-1192 polished and addressed PR comments


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d949ee1a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d949ee1a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d949ee1a

Branch: refs/heads/master
Commit: d949ee1a1ee529fbf0103b4620fef47832132a53
Parents: f1f67f6
Author: Oleg Zhurakousky <[email protected]>
Authored: Mon Nov 23 15:15:03 2015 -0500
Committer: Oleg Zhurakousky <[email protected]>
Committed: Tue Nov 24 12:14:36 2015 -0500

----------------------------------------------------------------------
 .../nifi-kafka-processors/pom.xml               |   5 +
 .../apache/nifi/processors/kafka/GetKafka.java  | 209 +++++++++++--------
 .../nifi/processors/kafka/KafkaUtils.java       |  56 +++++
 .../apache/nifi/processors/kafka/PutKafka.java  |  27 +++
 4 files changed, 213 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index 1a8dc9d..7793102 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -66,6 +66,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 4be6194..8f8d2e9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -24,14 +24,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -39,9 +40,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
@@ -56,6 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
@@ -70,6 +70,11 @@ import kafka.message.MessageAndMetadata;
                 + " the message does not have a key, or if the batch size is 
greater than 1, this attribute will not be added"),
         @WritesAttribute(attribute = "kafka.partition", description = "The 
partition of the Kafka Topic from which the message was received. This 
attribute is added only if the batch size is 1"),
         @WritesAttribute(attribute = "kafka.offset", description = "The offset 
of the message within the Kafka partition. This attribute is added only if the 
batch size is 1")})
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+            description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
 public class GetKafka extends AbstractProcessor {
 
     public static final String SMALLEST = "smallest";
@@ -167,9 +172,7 @@ public class GetKafka extends AbstractProcessor {
     private final BlockingQueue<ConsumerIterator<byte[], byte[]>> 
streamIterators = new LinkedBlockingQueue<>();
     private volatile ConsumerConnector consumer;
 
-    final Lock interruptionLock = new ReentrantLock();
-    // guarded by interruptionLock
-    private final Set<Thread> interruptableThreads = new HashSet<>();
+    private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -203,26 +206,69 @@ public class GetKafka extends AbstractProcessor {
         return relationships;
     }
 
-    @OnScheduled
     public void createConsumers(final ProcessContext context) {
         final String topic = context.getProperty(TOPIC).getValue();
 
-        final Map<String, Integer> topicCountMap = new HashMap<>(1);
-        topicCountMap.put(topic, context.getMaxConcurrentTasks());
-
         final Properties props = new Properties();
         props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
         props.setProperty("group.id", 
context.getProperty(GROUP_ID).getValue());
         props.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
         props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
-        props.setProperty("auto.commit.enable", "true"); // just be explicit
         props.setProperty("auto.offset.reset", 
context.getProperty(AUTO_OFFSET_RESET).getValue());
-        props.setProperty("zk.connectiontimeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+        props.setProperty("zookeeper.connection.timeout.ms", 
context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
         props.setProperty("socket.timeout.ms", 
context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
 
+        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (props.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
+                                    + props.getProperty(descriptor.getName()) 
+ "' with dynamically set value '"
+                                    + entry.getValue() + "'.");
+                }
+                props.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        /*
+         * Unless user sets it to some explicit value we are setting it to the
+         * lowest possible value of 1 millisecond to ensure the
+         * consumerStream.hasNext() doesn't block. See
+         * http://kafka.apache.org/documentation.html#configuration) as well as
+         * comment in 'catch ConsumerTimeoutException' in onTrigger() for more
+         * explanation as to the reasoning behind it.
+         */
+        if (!props.containsKey("consumer.timeout.ms")) {
+            this.getLogger().info("Setting 'consumer.timeout.ms' to 1 
milliseconds to avoid consumer"
+                            + " block in the event when no events are present 
in Kafka topic. If you wish to change this value "
+                            + " set it as dynamic property. If you wish to 
explicitly enable consumer block (at your own risk)"
+                            + " set its value to -1.");
+            props.setProperty("consumer.timeout.ms", "1");
+        }
+
         final ConsumerConfig consumerConfig = new ConsumerConfig(props);
         consumer = Consumer.createJavaConsumerConnector(consumerConfig);
 
+        final Map<String, Integer> topicCountMap = new HashMap<>(1);
+
+        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
+                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), 
context.getProperty(TOPIC).getValue());
+
+        int concurrentTaskToUse = context.getMaxConcurrentTasks();
+        if (context.getMaxConcurrentTasks() < partitionCount){
+            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is less than the amount of partitions '" 
+ partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
+                            + "Consider making it equal to the amount of 
partition count for most efficient event consumption.");
+        } else if (context.getMaxConcurrentTasks() > partitionCount){
+            concurrentTaskToUse = partitionCount;
+            this.getLogger().warn("The amount of concurrent tasks '" + 
context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is greater than the amount of partitions 
'" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + 
"'. "
+                            + "Therefore those tasks would never see a 
message. To avoid that the '" + partitionCount + "'(partition count) will be 
used to "
+                                    + "consume events");
+        }
+
+        topicCountMap.put(topic, concurrentTaskToUse);
+
         final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
         final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
 
@@ -231,10 +277,12 @@ public class GetKafka extends AbstractProcessor {
         for (final KafkaStream<byte[], byte[]> stream : streams) {
             streamIterators.add(stream.iterator());
         }
+        this.consumerStreamsReady.set(true);
     }
 
     @OnStopped
     public void shutdownConsumer() {
+        this.consumerStreamsReady.set(false);
         if (consumer != null) {
             try {
                 consumer.commitOffsets();
@@ -244,75 +292,57 @@ public class GetKafka extends AbstractProcessor {
         }
     }
 
-    @OnUnscheduled
-    public void interruptIterators() {
-        // Kafka doesn't provide a non-blocking API for pulling messages. We 
can, however,
-        // interrupt the Threads. We do this when the Processor is stopped so 
that we have the
-        // ability to shutdown the Processor.
-        interruptionLock.lock();
-        try {
-            for (final Thread t : interruptableThreads) {
-                t.interrupt();
-            }
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
 
-            interruptableThreads.clear();
-        } finally {
-            interruptionLock.unlock();
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        /*
+         * Will ensure that consumer streams are ready upon the first 
invocation
+         * of onTrigger. Will be reset to 'false' in the event of exception
+         */
+        synchronized (this.consumerStreamsReady) {
+            if (!this.consumerStreamsReady.get()) {
+                this.createConsumers(context);
+            }
+        }
+        ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator();
+        if (iterator != null) {
+            this.consumeFromKafka(context, session, iterator);
         }
     }
 
     protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
-        return streamIterators.poll();
+        return this.streamIterators.poll();
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
-        if (iterator == null) {
-            return;
-        }
+
+    private void consumeFromKafka(final ProcessContext context, final 
ProcessSession session,
+            ConsumerIterator<byte[], byte[]> iterator) throws ProcessException 
{
 
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
         final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
         final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
         final String topic = context.getProperty(TOPIC).getValue();
 
-        FlowFile flowFile = null;
-        try {
-            // add the current thread to the Set of those to be interrupted if 
processor stopped.
-            interruptionLock.lock();
-            try {
-                interruptableThreads.add(Thread.currentThread());
-            } finally {
-                interruptionLock.unlock();
-            }
-
-            final long start = System.nanoTime();
-            flowFile = session.create();
-
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("kafka.topic", topic);
+        FlowFile flowFile = session.create();
 
-            int numMessages = 0;
-            for (int msgCount = 0; msgCount < batchSize; msgCount++) {
-                // if the processor is stopped, iterator.hasNext() will throw 
an Exception.
-                // In this case, we just break out of the loop.
-                try {
-                    if (!iterator.hasNext()) {
-                        break;
-                    }
-                } catch (final Exception e) {
-                    break;
-                }
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("kafka.topic", topic);
+        final long start = System.nanoTime();
+        int msgCount = 0;
 
+        try {
+            for (; msgCount < batchSize && iterator.hasNext(); msgCount++) {
                 final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
-                if (mam == null) {
-                    return;
-                }
-
-                final byte[] key = mam.key();
 
                 if (batchSize == 1) {
+                    final byte[] key = mam.key();
                     // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
                     // for a batch size of 1.
                     if (key != null) {
@@ -334,33 +364,26 @@ public class GetKafka extends AbstractProcessor {
                         out.write(mam.message());
                     }
                 });
-                numMessages++;
-            }
-
-            // If we received no messages, remove the FlowFile. Otherwise, 
send to success.
-            if (flowFile.getSize() == 0L) {
-                session.remove(flowFile);
-            } else {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                session.getProvenanceReporter().receive(flowFile, "kafka://" + 
topic, "Received " + numMessages + " Kafka messages", millis);
-                getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[]{flowFile, numMessages, millis});
-                session.transfer(flowFile, REL_SUCCESS);
             }
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
+        } catch (ConsumerTimeoutException e) {
+            /*
+             * By default Kafka blocks indefinitely if topic is empty via
+             * stream.hasNext(). If 'consumer.timeout.ms' property is set (see
+             * http://kafka.apache.org/documentation.html#configuration) the
+             * hasNext() will fail with this exception. To this processor it
+             * simply means there are no messages and current task should exit
+             * in non-failure releasing the flow file if it was able to
+             * accumulate any events.
+             */
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, 
msgCount);
         } catch (final Exception e) {
+            this.shutdownConsumer();
             getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[]{e});
             if (flowFile != null) {
                 session.remove(flowFile);
             }
         } finally {
-            // Remove the current thread from the Set of Threads to interrupt.
-            interruptionLock.lock();
-            try {
-                interruptableThreads.remove(Thread.currentThread());
-            } finally {
-                interruptionLock.unlock();
-            }
-
             // Add the iterator back to the queue
             if (iterator != null) {
                 streamIterators.offer(iterator);
@@ -368,4 +391,22 @@ public class GetKafka extends AbstractProcessor {
         }
     }
 
+    /**
+     * Will release flow file. Releasing of the flow file in the context of 
this
+     * operation implies the following:
+     *
+     * If Empty then remove from session and return
+     * If has something then transfer to REL_SUCCESS
+     */
+    private void releaseFlowFile(FlowFile flowFile, ProcessSession session, 
Map<String, String> attributes, long start, String topic, int msgCount){
+        if (flowFile.getSize() == 0L) {
+            session.remove(flowFile);
+        } else {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            session.getProvenanceReporter().receive(flowFile, "kafka://" + 
topic, "Received " + msgCount + " Kafka messages", millis);
+            getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[]{flowFile, msgCount, millis});
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
new file mode 100644
index 0000000..657d88b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -0,0 +1,56 @@
+package org.apache.nifi.processors.kafka;
+
+import java.util.Collections;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer;
+import scala.collection.JavaConversions;
+
+/**
+ * Utility class to support interruction with Kafka internals.
+ *
+ */
+class KafkaUtils {
+
+    /**
+     * Will retrieve the amount of partitions for a given Kafka topic.
+     */
+    static int retrievePartitionCountForTopic(String 
zookeeperConnectionString, String topicName) {
+        ZkClient zkClient = new ZkClient(zookeeperConnectionString);
+        zkClient.setZkSerializer(new ZkSerializer() {
+            @Override
+            public byte[] serialize(Object o) throws ZkMarshallingError {
+                return ZKStringSerializer.serialize(o);
+            }
+
+            @Override
+            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+                return ZKStringSerializer.deserialize(bytes);
+            }
+        });
+        scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
+                
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)),
 zkClient);
+        return topicMetadatas.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index ea7f7bb..b5766e4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -44,6 +45,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -81,6 +83,11 @@ import scala.actors.threadpool.Arrays;
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
     + "user-specified delimiter, such as a new-line.")
 @TriggerWhenEmpty // because we have a queue of sessions that are ready to be 
committed
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+            description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
 public class PutKafka extends AbstractSessionFactoryProcessor {
 
     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
@@ -356,6 +363,18 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
         properties.setProperty("retries", "0");
         properties.setProperty("block.on.buffer.full", "false");
 
+        for (final Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (properties.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + 
descriptor.getName() + "' which had value of '"
+                                    + 
properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
+                                    + entry.getValue() + "'.");
+                }
+                properties.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+
         return properties;
     }
 
@@ -398,6 +417,14 @@ public class PutKafka extends 
AbstractSessionFactoryProcessor {
     }
 
     @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         FlowFileMessageBatch batch;
         while ((batch = completeBatches.poll()) != null) {

Reply via email to