http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
new file mode 100644
index 0000000..a7bd96d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+/**
+ * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
+ * calling 'obtainConsumer'. Once closed the pool is ready to be immediately
+ * used again.
+ */
+public class ConsumerPool implements Closeable {
+
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
+    private final List<String> topics;
+    private final Pattern topicPattern;
+    private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private final boolean honorTransactions;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Charset headerCharacterSet;
+    private final Pattern headerNamePattern;
+    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
+    private final AtomicLong consumerClosedCountRef = new AtomicLong();
+    private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+
+    /**
+     * Creates a pool of KafkaConsumer objects that will grow up to the maximum
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
+     *
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
+     * @param logger the logger to report any errors/warnings
+     */
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
+        this.topicPattern = null;
+        this.readerFactory = null;
+        this.writerFactory = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
+        this.readerFactory = null;
+        this.writerFactory = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = topics;
+        this.topicPattern = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    /**
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
+     *
+     * @param session the session for which the consumer lease will be
+     *            associated
+     * @param processContext the ProcessContext for which the consumer
+     *            lease will be associated
+     * @return consumer to use or null if not available or necessary
+     */
+    public ConsumerLease obtainConsumer(final ProcessSession session, final 
ProcessContext processContext) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having 
consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            if (topics != null) {
+              consumer.subscribe(topics, lease);
+            } else {
+              consumer.subscribe(topicPattern, lease);
+            }
+        }
+        lease.setProcessSession(session, processContext);
+
+        leasesObtainedCountRef.incrementAndGet();
+        return lease;
+    }
+
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
+    protected Consumer<byte[], byte[]> createKafkaConsumer() {
+        final Map<String, Object> properties = new HashMap<>(kafkaProperties);
+        if (honorTransactions) {
+            properties.put("isolation.level", "read_committed");
+        } else {
+            properties.put("isolation.level", "read_uncommitted");
+        }
+        final Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties);
+        return consumer;
+    }
+
+    /**
+     * Closes all consumers in the pool. Can be safely called repeatedly.
+     */
+    @Override
+    public void close() {
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
+    }
+
+    private void closeConsumer(final Consumer<?, ?> consumer) {
+        consumerClosedCountRef.incrementAndGet();
+        try {
+            consumer.unsubscribe();
+        } catch (Exception e) {
+            logger.warn("Failed while unsubscribing " + consumer, e);
+        }
+
+        try {
+            consumer.close();
+        } catch (Exception e) {
+            logger.warn("Failed while closing " + consumer, e);
+        }
+    }
+
+    PoolStats getPoolStats() {
+        return new PoolStats(consumerCreatedCountRef.get(), 
consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile ProcessContext processContext;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, 
securityProtocol, bootstrapServers,
+                readerFactory, writerFactory, logger, headerCharacterSet, 
headerNamePattern);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session, final 
ProcessContext context) {
+            this.session = session;
+            this.processContext = context;
+        }
+
+        @Override
+        public void yield() {
+            if (processContext != null) {
+                processContext.yield();
+            }
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null, null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
+    }
+
+    static final class PoolStats {
+
+        final long consumerCreatedCount;
+        final long consumerClosedCount;
+        final long leasesObtainedCount;
+
+        PoolStats(
+                final long consumerCreatedCount,
+                final long consumerClosedCount,
+                final long leasesObtainedCount
+        ) {
+            this.consumerCreatedCount = consumerCreatedCount;
+            this.consumerClosedCount = consumerClosedCount;
+            this.leasesObtainedCount = leasesObtainedCount;
+        }
+
+        @Override
+        public String toString() {
+            return "Created Consumers [" + consumerCreatedCount + "]\n"
+                    + "Closed Consumers  [" + consumerClosedCount + "]\n"
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
new file mode 100644
index 0000000..317b274
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+
+public class InFlightMessageTracker {
+    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Exception> failures = new 
ConcurrentHashMap<>();
+    private final Object progressMutex = new Object();
+    private final ComponentLog logger;
+
+    public InFlightMessageTracker(final ComponentLog logger) {
+        this.logger = logger;
+    }
+
+    public void incrementAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = 
messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementAcknowledgedCount();
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public void trackEmpty(final FlowFile flowFile) {
+        messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+    }
+
+    public int getAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getAcknowledgedCount();
+    }
+
+    public void incrementSentCount(final FlowFile flowFile) {
+        final Counts counter = 
messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementSentCount();
+    }
+
+    public int getSentCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getSentCount();
+    }
+
+    public void fail(final FlowFile flowFile, final Exception exception) {
+        failures.putIfAbsent(flowFile, exception);
+        logger.error("Failed to send " + flowFile + " to Kafka", exception);
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public Exception getFailure(final FlowFile flowFile) {
+        return failures.get(flowFile);
+    }
+
+    public boolean isFailed(final FlowFile flowFile) {
+        return getFailure(flowFile) != null;
+    }
+
+    public void reset() {
+        messageCountsByFlowFile.clear();
+        failures.clear();
+    }
+
+    public PublishResult failOutstanding(final Exception exception) {
+        messageCountsByFlowFile.keySet().stream()
+            .filter(ff -> !isComplete(ff))
+            .filter(ff -> !failures.containsKey(ff))
+            .forEach(ff -> failures.put(ff, exception));
+
+        return createPublishResult();
+    }
+
+    private boolean isComplete(final FlowFile flowFile) {
+        final Counts counts = messageCountsByFlowFile.get(flowFile);
+        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
+            // all messages received successfully.
+            return true;
+        }
+
+        if (failures.containsKey(flowFile)) {
+            // FlowFile failed so is complete
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean isComplete() {
+        return messageCountsByFlowFile.keySet().stream()
+            .allMatch(flowFile -> isComplete(flowFile));
+    }
+
+    void awaitCompletion(final long millis) throws InterruptedException, 
TimeoutException {
+        final long startTime = System.nanoTime();
+        final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
+
+        while (System.nanoTime() < maxTime) {
+            synchronized (progressMutex) {
+                if (isComplete()) {
+                    return;
+                }
+
+                progressMutex.wait(millis);
+            }
+        }
+
+        throw new TimeoutException();
+    }
+
+
+    PublishResult createPublishResult() {
+        return new PublishResult() {
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(final FlowFile flowFile) {
+                return getAcknowledgedCount(flowFile);
+            }
+
+            @Override
+            public Exception getReasonForFailure(final FlowFile flowFile) {
+                return getFailure(flowFile);
+            }
+        };
+    }
+
+    public static class Counts {
+        private final AtomicInteger sentCount = new AtomicInteger(0);
+        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
+
+        public void incrementSentCount() {
+            sentCount.incrementAndGet();
+        }
+
+        public void incrementAcknowledgedCount() {
+            acknowledgedCount.incrementAndGet();
+        }
+
+        public int getAcknowledgedCount() {
+            return acknowledgedCount.get();
+        }
+
+        public int getSentCount() {
+            return sentCount.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
new file mode 100644
index 0000000..de28995
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class KafkaProcessorUtils {
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
+            "The key is interpreted as arbitrary binary data and is encoded 
using hexadecimal characters with uppercase letters");
+
+    static final Pattern HEX_KEY_PATTERN = 
Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
+    static final String KAFKA_KEY = "kafka.key";
+    static final String KAFKA_TOPIC = "kafka.topic";
+    static final String KAFKA_PARTITION = "kafka.partition";
+    static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_COUNT = "kafka.count";
+    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
+    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", 
"SASL_SSL", "SASL_SSL");
+
+    static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+            .displayName("Kafka Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+            .required(true)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("localhost:9092")
+            .build();
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("security.protocol")
+            .displayName("Security Protocol")
+            .description("Protocol used to communicate with brokers. 
Corresponds to Kafka's 'security.protocol' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, 
SEC_SASL_SSL)
+            .defaultValue(SEC_PLAINTEXT.getValue())
+            .build();
+    static final PropertyDescriptor KERBEROS_PRINCIPLE = new 
PropertyDescriptor.Builder()
+            .name("sasl.kerberos.service.name")
+            .displayName("Kerberos Service Name")
+            .description("The Kerberos principal name that Kafka runs as. This 
can be defined either in Kafka's JAAS config or in Kafka's config. "
+                    + "Corresponds to Kafka's 'security.protocol' property."
+                    + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor USER_PRINCIPAL = new 
PropertyDescriptor.Builder()
+            .name("sasl.kerberos.principal")
+            .displayName("Kerberos Principal")
+            .description("The Kerberos principal that will be used to connect 
to brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor USER_KEYTAB = new 
PropertyDescriptor.Builder()
+            .name("sasl.kerberos.keytab")
+            .displayName("Kerberos Keytab")
+            .description("The Kerberos keytab that will be used to connect to 
brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf 
file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("ssl.context.service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for 
communicating with Kafka.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    static List<PropertyDescriptor> getCommonPropertyDescriptors() {
+        return Arrays.asList(
+                BOOTSTRAP_SERVERS,
+                SECURITY_PROTOCOL,
+                KERBEROS_PRINCIPLE,
+                USER_PRINCIPAL,
+                USER_KEYTAB,
+                SSL_CONTEXT_SERVICE
+        );
+    }
+
+    static Collection<ValidationResult> validateCommonProperties(final 
ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+
+        String securityProtocol = 
validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+        /*
+         * validates that if one of SASL (Kerberos) option is selected for
+         * security protocol, then Kerberos principal is provided as well
+         */
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0) {
+                results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
+                                + SECURITY_PROTOCOL.getDisplayName() + "> is 
configured as '"
+                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + 
SEC_SASL_SSL.getValue() + "'.")
+                        .build());
+            }
+
+            String userKeytab = 
validationContext.getProperty(USER_KEYTAB).getValue();
+            String userPrincipal = 
validationContext.getProperty(USER_PRINCIPAL).getValue();
+            if((StringUtils.isBlank(userKeytab) && 
!StringUtils.isBlank(userPrincipal))
+                    || (!StringUtils.isBlank(userKeytab) && 
StringUtils.isBlank(userPrincipal))) {
+                results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  
+ "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                                + "must be set.")
+                        .build());
+            }
+        }
+
+        //If SSL or SASL_SSL then CS must be set.
+        final boolean sslProtocol = 
SEC_SSL.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol);
+        final boolean csSet = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
+        if (csSet && !sslProtocol) {
+            results.add(new 
ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
+                    .explanation("If you set the SSL Controller Service you 
should also choose an SSL based security protocol.").build());
+        }
+        if (!csSet && sslProtocol) {
+            results.add(new 
ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
+                    .explanation("If you set to an SSL based protocol you need 
to set the SSL Controller Service").build());
+        }
+
+        final String enableAutoCommit = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
+        if (enableAutoCommit != null && 
!enableAutoCommit.toLowerCase().equals("false")) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                    .explanation("Enable auto commit must be false.  It is 
managed by the processor.").build());
+        }
+
+        final String keySerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keySerializer != null && 
!ByteArraySerializer.class.getName().equals(keySerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Key Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+        }
+
+        final String valueSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueSerializer != null && 
!ByteArraySerializer.class.getName().equals(valueSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Value Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + valueSerializer + 
"'").build());
+        }
+
+        final String keyDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keyDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Key De-Serializer must be '" + 
ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + 
"'").build());
+        }
+
+        final String valueDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
+            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Value De-Serializer must be " + 
ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + 
"'").build());
+        }
+
+        return results;
+    }
+
+    static final class KafkaConfigValidator implements Validator {
+
+        final Class<?> classType;
+
+        public KafkaConfigValidator(final Class<?> classType) {
+            this.classType = classType;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final boolean knownValue = 
KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, 
CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
+            return new 
ValidationResult.Builder().subject(subject).explanation("Must be a known 
configuration parameter for this kafka client").valid(knownValue).build();
+        }
+    };
+
+    /**
+     * Builds transit URI for provenance event. The transit URI will be in the
+     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
+     */
+    static String buildTransitURI(String securityProtocol, String brokers, 
String topic) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(securityProtocol);
+        builder.append("://");
+        builder.append(brokers);
+        builder.append("/");
+        builder.append(topic);
+        return builder.toString();
+    }
+
+
+    static void buildCommonKafkaProperties(final ProcessContext context, final 
Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+        for (PropertyDescriptor propertyDescriptor : 
context.getProperties().keySet()) {
+            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+                // Translate SSLContext Service configuration into Kafka 
properties
+                final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+                if (sslContextService != null && 
sslContextService.isKeyStoreConfigured()) {
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
sslContextService.getKeyStoreFile());
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
sslContextService.getKeyStorePassword());
+                    final String keyPass = sslContextService.getKeyPassword() 
== null ? sslContextService.getKeyStorePassword() : 
sslContextService.getKeyPassword();
+                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 
keyPass);
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, 
sslContextService.getKeyStoreType());
+                }
+
+                if (sslContextService != null && 
sslContextService.isTrustStoreConfigured()) {
+                    
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslContextService.getTrustStoreFile());
+                    
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
sslContextService.getTrustStorePassword());
+                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
sslContextService.getTrustStoreType());
+                }
+            }
+
+            String propertyName = propertyDescriptor.getName();
+            String propertyValue = 
propertyDescriptor.isExpressionLanguageSupported()
+                    ? 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
+                    : context.getProperty(propertyDescriptor).getValue();
+
+            if (propertyValue != null && 
!propertyName.equals(USER_PRINCIPAL.getName()) && 
!propertyName.equals(USER_KEYTAB.getName())) {
+                // If the property name ends in ".ms" then it is a time 
period. We want to accept either an integer as number of milliseconds
+                // or the standard NiFi time period such as "5 secs"
+                if (propertyName.endsWith(".ms") && 
!StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
+                    propertyValue = 
String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), 
TimeUnit.MILLISECONDS));
+                }
+
+                if (isStaticStringFieldNamePresent(propertyName, 
kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, 
SaslConfigs.class)) {
+                    mapToPopulate.put(propertyName, propertyValue);
+                }
+            }
+        }
+
+        String securityProtocol = 
context.getProperty(SECURITY_PROTOCOL).getValue();
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            setJaasConfig(mapToPopulate, context);
+        }
+    }
+
+    /**
+     * Method used to configure the 'sasl.jaas.config' property based on 
KAFKA-4259<br />
+     * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br
 />
+     * <br />
+     * It expects something with the following format: <br />
+     * <br />
+     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; 
*(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
+     * ControlFlag = required / requisite / sufficient / optional
+     *
+     * @param mapToPopulate Map of configuration properties
+     * @param context Context
+     */
+    private static void setJaasConfig(Map<String, Object> mapToPopulate, 
ProcessContext context) {
+        String keytab = context.getProperty(USER_KEYTAB).getValue();
+        String principal = context.getProperty(USER_PRINCIPAL).getValue();
+        String serviceName = 
context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
+            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, 
"com.sun.security.auth.module.Krb5LoginModule required "
+                    + "useTicketCache=false "
+                    + "renewTicket=true "
+                    + "serviceName=\"" + serviceName + "\" "
+                    + "useKeyTab=true "
+                    + "keyTab=\"" + keytab + "\" "
+                    + "principal=\"" + principal + "\";");
+        }
+    }
+
+    private static boolean isStaticStringFieldNamePresent(final String name, 
final Class<?>... classes) {
+        return 
KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
+    }
+
+    private static Set<String> getPublicStaticStringFieldValues(final 
Class<?>... classes) {
+        final Set<String> strings = new HashSet<>();
+        for (final Class<?> classType : classes) {
+            for (final Field field : classType.getDeclaredFields()) {
+                if (Modifier.isPublic(field.getModifiers()) && 
Modifier.isStatic(field.getModifiers()) && 
field.getType().equals(String.class)) {
+                    try {
+                        strings.add(String.valueOf(field.get(null)));
+                    } catch (IllegalArgumentException | IllegalAccessException 
ex) {
+                        //ignore
+                    }
+                }
+            }
+        }
+        return strings;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
new file mode 100644
index 0000000..64ab4ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+
+        private volatile int index;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // noop
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
+            return 
this.next(cluster.availablePartitionsForTopic(topic).size());
+        }
+
+        @Override
+        public void close() {
+            // noop
+        }
+
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
+            }
+            return index++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
new file mode 100644
index 0000000..c125d62
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual records 
to Apache Kafka using the Kafka 1.0 Producer API. "
+    + "The contents of the FlowFile are expected to be record-oriented data 
that can be read by the configured Record Reader. "
+    + "The complementary NiFi processor for fetching messages is 
ConsumeKafkaRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+    description = "These properties will be added on the Kafka configuration 
after loading any provided configuration properties."
+    + " In the event a dynamic property represents a property that was already 
set, its value will be ignored and WARN message logged."
+    + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration. ")
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
+    + "FlowFiles that are routed to success.")
+@SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, 
ConsumeKafkaRecord_1_0.class})
+public class PublishKafkaRecord_1_0 extends AbstractProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+        "FlowFile will be routed to failure unless the message is replicated 
to the appropriate "
+            + "number of Kafka Nodes according to the Topic configuration");
+    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", 
"Guarantee Single Node Delivery",
+        "FlowFile will be routed to success if the message is received by a 
single Kafka node, "
+            + "whether or not it is replicated. This is faster than <Guarantee 
Replicated Delivery> "
+            + "but can result in data loss if a Kafka node crashes");
+    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", 
"Best Effort",
+        "FlowFile will be routed to success after successfully writing the 
content to a Kafka node, "
+            + "without waiting for a response. This provides the best 
performance but may result in data loss.");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+        Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+        "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+            + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+        "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
+        "The key is interpreted as arbitrary binary data that is encoded using 
hexadecimal characters with uppercase letters.");
+
+    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+        .name("topic")
+        .displayName("Topic Name")
+        .description("The name of the Kafka Topic to publish to.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("The Record Reader to use for incoming FlowFiles")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use in order to serialize the data 
before sending to Kafka")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+        .name("message-key-field")
+        .displayName("Message Key Field")
+        .description("The name of a field in the Input Records that should be 
used as the Key for the Kafka message.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+        .name("acks")
+        .displayName("Delivery Guarantee")
+        .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+        .build();
+
+    static final PropertyDescriptor METADATA_WAIT_TIME = new 
PropertyDescriptor.Builder()
+        .name("max.block.ms")
+        .displayName("Max Metadata Wait Time")
+        .description("The amount of time publisher will wait to obtain 
metadata or wait for the buffer to flush during the 'send' call before failing 
the "
+            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' 
property")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("5 sec")
+        .build();
+
+    static final PropertyDescriptor ACK_WAIT_TIME = new 
PropertyDescriptor.Builder()
+        .name("ack.wait.time")
+        .displayName("Acknowledgment Wait Time")
+        .description("After sending a message to Kafka, this indicates the 
amount of time that we are willing to wait for a response from Kafka. "
+            + "If Kafka does not acknowledge the message within this time 
period, the FlowFile will be routed to 'failure'.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .defaultValue("5 secs")
+        .build();
+
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+        .name("max.request.size")
+        .displayName("Max Request Size")
+        .description("The maximum size of a request in bytes. Corresponds to 
Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
+
+    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
+        .name("partitioner.class")
+        .displayName("Partitioner class")
+        .description("Specifies which class to use to compute a partition id 
for a message. Corresponds to Kafka's 'partitioner.class' property.")
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .defaultValue(RANDOM_PARTITIONING.getValue())
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+        .name("compression.type")
+        .displayName("Compression Type")
+        .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .allowableValues("none", "gzip", "snappy", "lz4")
+        .defaultValue("none")
+        .build();
+
+    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+        .name("attribute-name-regex")
+        .displayName("Attributes to Send as Headers (Regex)")
+        .description("A Regular Expression that is matched against all 
FlowFile attribute names. "
+            + "Any attribute whose name matches the regex will be added to the 
Kafka messages as a Header. "
+            + "If not specified, no FlowFile attributes will be added as 
headers.")
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor USE_TRANSACTIONS = new 
PropertyDescriptor.Builder()
+        .name("use-transactions")
+        .displayName("Use Transactions")
+        .description("Specifies whether or not NiFi should provide 
Transactional guarantees when communicating with Kafka. If there is a problem 
sending data to Kafka, "
+            + "and this property is set to false, then the messages that have 
already been sent to Kafka will continue on and be delivered to consumers. "
+            + "If this is set to true, then the Kafka transaction will be 
rolled back so that those messages are not available to consumers. Setting this 
to true "
+            + "requires that the <Delivery Guarantee> property be set to 
\"Guarantee Replicated Delivery.\"")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new 
PropertyDescriptor.Builder()
+        .name("message-header-encoding")
+        .displayName("Message Header Encoding")
+        .description("For any attribute that is added as a message header, as 
configured via the <Attributes to Send as Headers> property, "
+            + "this property indicates the Character Encoding to use for 
serializing the headers.")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .required(false)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles for which all content was sent to Kafka.")
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be sent to Kafka will be routed 
to this Relationship")
+        .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES;
+    private static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile PublisherPool publisherPool = null;
+
+    static {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(TOPIC);
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(USE_TRANSACTIONS);
+        properties.add(DELIVERY_GUARANTEE);
+        properties.add(ATTRIBUTE_NAME_REGEX);
+        properties.add(MESSAGE_HEADER_ENCODING);
+        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+        properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+        properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
+        properties.add(KafkaProcessorUtils.USER_KEYTAB);
+        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(MESSAGE_KEY_FIELD);
+        properties.add(MAX_REQUEST_SIZE);
+        properties.add(ACK_WAIT_TIME);
+        properties.add(METADATA_WAIT_TIME);
+        properties.add(PARTITION_CLASS);
+        properties.add(COMPRESSION_CODEC);
+
+        PROPERTIES = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .description("Specifies the value for '" + propertyDescriptorName 
+ "' Kafka Configuration.")
+            .name(propertyDescriptorName)
+            .addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .dynamic(true)
+            .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+        
results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+
+        final boolean useTransactions = 
validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
+        if (useTransactions) {
+            final String deliveryGuarantee = 
validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
+            if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Delivery Guarantee")
+                    .valid(false)
+                    .explanation("In order to use Transactions, the Delivery 
Guarantee must be \"Guarantee Replicated Delivery.\" "
+                        + "Either change the <Use Transactions> property or 
the <Delivery Guarantee> property.")
+                    .build());
+            }
+        }
+
+        return results;
+    }
+
+    private synchronized PublisherPool getPublisherPool(final ProcessContext 
context) {
+        PublisherPool pool = publisherPool;
+        if (pool != null) {
+            return pool;
+        }
+
+        return publisherPool = createPublisherPool(context);
+    }
+
+    protected PublisherPool createPublisherPool(final ProcessContext context) {
+        final int maxMessageSize = 
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+        final long maxAckWaitMillis = 
context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+        final String attributeNameRegex = 
context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
+        final Pattern attributeNamePattern = attributeNameRegex == null ? null 
: Pattern.compile(attributeNameRegex);
+        final boolean useTransactions = 
context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+        final String charsetName = 
context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+        final Charset charset = Charset.forName(charsetName);
+
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProperties);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put("max.request.size", 
String.valueOf(maxMessageSize));
+
+        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, 
maxAckWaitMillis, useTransactions, attributeNamePattern, charset);
+    }
+
+    @OnStopped
+    public void closePool() {
+        if (publisherPool != null) {
+            publisherPool.close();
+        }
+
+        publisherPool = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final PublisherPool pool = getPublisherPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
+
+        final String securityProtocol = 
context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final boolean useTransactions = 
context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+        final long startTime = System.nanoTime();
+        try (final PublisherLease lease = pool.obtainPublisher()) {
+            if (useTransactions) {
+                lease.beginTransaction();
+            }
+
+            // Send each FlowFile to Kafka asynchronously.
+            for (final FlowFile flowFile : flowFiles) {
+                if (!isScheduled()) {
+                    // If stopped, re-queue FlowFile instead of sending it
+                    if (useTransactions) {
+                        session.rollback();
+                        lease.rollback();
+                        return;
+                    }
+
+                    session.transfer(flowFile);
+                    continue;
+                }
+
+                final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                final String messageKeyField = 
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+
+                try {
+                    session.read(flowFile, new InputStreamCallback() {
+                        @Override
+                        public void process(final InputStream rawIn) throws 
IOException {
+                            try (final InputStream in = new 
BufferedInputStream(rawIn)) {
+                                final RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger());
+                                final RecordSet recordSet = 
reader.createRecordSet();
+
+                                final RecordSchema schema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+                                lease.publish(flowFile, recordSet, 
writerFactory, schema, messageKeyField, topic);
+                            } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
+                                throw new ProcessException(e);
+                            }
+                        }
+                    });
+                } catch (final Exception e) {
+                    // The FlowFile will be obtained and the error logged 
below, when calling publishResult.getFailedFlowFiles()
+                    lease.fail(flowFile, e);
+                    continue;
+                }
+            }
+
+            // Complete the send
+            final PublishResult publishResult = lease.complete();
+
+            if (publishResult.isFailure()) {
+                getLogger().info("Failed to send FlowFile to kafka; 
transferring to failure");
+                session.transfer(flowFiles, REL_FAILURE);
+                return;
+            }
+
+            // Transfer any successful FlowFiles.
+            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            for (FlowFile success : flowFiles) {
+                final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+
+                final int msgCount = 
publishResult.getSuccessfulMessageCount(success);
+                success = session.putAttribute(success, MSG_COUNT, 
String.valueOf(msgCount));
+                session.adjustCounter("Messages Sent", msgCount, true);
+
+                final String transitUri = 
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                session.getProvenanceReporter().send(success, transitUri, 
"Sent " + msgCount + " messages", transmissionMillis);
+                session.transfer(success, REL_SUCCESS);
+            }
+        }
+    }
+}

Reply via email to