http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..3aa101a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,72 @@
+nifi-kafka-nar
+Copyright 2014-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2014 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Kafka
+    The following NOTICE information applies:
+      Apache Kafka
+      Copyright 2012 The Apache Software Foundation.
+
+  (ASLv2) Yammer Metrics
+    The following NOTICE information applies:
+      Metrics
+      Copyright 2010-2012 Coda Hale and Yammer, Inc.
+
+      This product includes software developed by Coda Hale and Yammer, Inc.
+
+      This product includes code derived from the JSR-166 project 
(ThreadLocalRandom), which was released
+      with the following comments:
+
+          Written by Doug Lea with assistance from members of JCP JSR-166
+          Expert Group and released to the public domain, as explained at
+          http://creativecommons.org/publicdomain/zero/1.0/
+
+  (ASLv2) Snappy Java
+    The following NOTICE information applies:
+      This product includes software developed by Google
+       Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+      This product includes software developed by Apache
+       PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+       (Apache 2.0 license)
+
+      This library containd statically linked libstdc++. This inclusion is 
allowed by
+      "GCC RUntime Library Exception"
+      http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+  (ASLv2) Apache ZooKeeper
+    The following NOTICE information applies:
+      Apache ZooKeeper
+      Copyright 2009-2012 The Apache Software Foundation
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and 
Distribution License 1.1. See project link for details.
+
+    (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 
- http://kenai.com/projects/javamail/mail)
+
+************************
+Common Development and Distribution License 1.0
+************************
+
+The following binary components are provided under the Common Development and 
Distribution License 1.0.  See project link for details.
+
+    (CDDL 1.0) JavaBeans Activation Framework (JAF) 
(javax.activation:activation:jar:1.1 - 
http://java.sun.com/products/javabeans/jaf/index.jsp)

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
new file mode 100644
index 0000000..17b5d2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
@@ -0,0 +1,79 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-kafka-bundle</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>nifi-kafka-pubsub-processors</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+                   <groupId>org.apache.kafka</groupId>
+                   <artifactId>kafka-clients</artifactId>
+                   <version>0.9.0.1</version>
+               </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.9.0.1</version>
+            <exclusions>
+                <!-- Transitive dependencies excluded because they are located
+                in a legacy Maven repository, which Maven 3 doesn't support. 
-->
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
new file mode 100644
index 0000000..8bae304
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for implementing {@link Processor}s to publish and consume
+ * messages to/from Kafka
+ *
+ * @see PublishKafka
+ * @see ConsumeKafka
+ */
+abstract class AbstractKafkaProcessor<T extends Closeable> extends 
AbstractSessionFactoryProcessor {
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+
+    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+
+
+    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
+    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", 
"SASL_SSL", "SASL_SSL");
+
+    static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+            .displayName("Kafka Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+            .expressionLanguageSupported(true)
+            .defaultValue("localhost:9092")
+            .build();
+    static final PropertyDescriptor CLIENT_ID = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.CLIENT_ID_CONFIG)
+            .displayName("Client ID")
+            .description("String value uniquely identifying this client 
application. Corresponds to Kafka's 'client.id' property.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("security.protocol")
+            .displayName("Security Protocol")
+            .description("Protocol used to communicate with brokers. 
Corresponds to Kafka's 'security.protocol' property.")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, 
SEC_SASL_SSL)
+            .defaultValue(SEC_PLAINTEXT.getValue())
+            .build();
+    static final PropertyDescriptor KERBEROS_PRINCIPLE = new 
PropertyDescriptor.Builder()
+            .name("sasl.kerberos.service.name")
+            .displayName("Kerberos Service Name")
+            .description("The Kerberos principal name that Kafka runs as. This 
can be defined either in Kafka's JAAS config or in Kafka's config. "
+                    + "Corresponds to Kafka's 'security.protocol' property."
+                    + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+            .name("topic")
+            .displayName("Topic Name")
+            .description("The name of the Kafka Topic")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final Builder MESSAGE_DEMARCATOR_BUILDER = new 
PropertyDescriptor.Builder()
+            .name("message-demarcator")
+            .displayName("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true);
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are the are successfully sent to 
or received from Kafka are routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> SHARED_DESCRIPTORS = new 
ArrayList<>();
+
+    static final Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>();
+
+    private final AtomicInteger taskCounter = new AtomicInteger();
+
+    private volatile boolean acceptTask = true;
+
+    static {
+        SHARED_DESCRIPTORS.add(BOOTSTRAP_SERVERS);
+        SHARED_DESCRIPTORS.add(TOPIC);
+        SHARED_DESCRIPTORS.add(CLIENT_ID);
+        SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
+        SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
+        SHARED_RELATIONSHIPS.add(REL_SUCCESS);
+    }
+
+    /**
+     * Instance of {@link KafkaPublisher} or {@link KafkaConsumer}
+     */
+    volatile T kafkaResource;
+
+    /**
+     * This thread-safe operation will delegate to
+     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
+     * checking and creating (if necessary) Kafka resource which could be 
either
+     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
+     * destroy the underlying Kafka resource upon catching an {@link Exception}
+     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
+     * After Kafka resource is destroyed it will be re-created upon the next
+     * invocation of this operation essentially providing a self healing 
mechanism
+     * to deal with potentially corrupted resource.
+     * <p>
+     * Keep in mind that upon catching an exception the state of this processor
+     * will be set to no longer accept any more tasks, until Kafka resource is 
reset.
+     * This means that in a multi-threaded situation currently executing tasks 
will
+     * be given a chance to complete while no new tasks will be accepted.
+     */
+    @Override
+    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.acceptTask) { // acts as a circuit breaker to allow existing 
tasks to wind down so 'kafkaResource' can be reset before new tasks are 
accepted.
+            this.taskCounter.incrementAndGet();
+            final ProcessSession session = sessionFactory.createSession();
+            try {
+                /*
+                 * We can't be doing double null check here since as a pattern
+                 * it only works for lazy init but not reset, which is what we
+                 * are doing here. In fact the first null check is dangerous
+                 * since 'kafkaResource' can become null right after its null
+                 * check passed causing subsequent NPE.
+                 */
+                synchronized (this) {
+                    if (this.kafkaResource == null) {
+                        this.kafkaResource = this.buildKafkaResource(context, 
session);
+                    }
+                }
+
+                /*
+                 * The 'processed' boolean flag does not imply any failure or 
success. It simply states that:
+                 * - ConsumeKafka - some messages were received form Kafka and 
1_ FlowFile were generated
+                 * - PublishKafka - some messages were sent to Kafka based on 
existence of the input FlowFile
+                 */
+                boolean processed = this.rendezvousWithKafka(context, session);
+                session.commit();
+                if (processed) {
+                    this.postCommit(context);
+                } else {
+                    context.yield();
+                }
+            } catch (Throwable e) {
+                this.acceptTask = false;
+                session.rollback(true);
+                this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[] { this, e });
+            } finally {
+                synchronized (this) {
+                    if (this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask) {
+                        this.close();
+                        this.acceptTask = true;
+                    }
+                }
+            }
+        } else {
+            this.logger.debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
+            this.getLogger().debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
+            context.yield();
+        }
+    }
+
+    /**
+     * Will call {@link Closeable#close()} on the target resource after which
+     * the target resource will be set to null. Should only be called when 
there
+     * are no more threads being executed on this processor or when it has been
+     * verified that only a single thread remains.
+     *
+     * @see KafkaPublisher
+     * @see KafkaConsumer
+     */
+    @OnStopped
+    public void close() {
+        try {
+            if (this.kafkaResource != null) {
+                try {
+                    this.kafkaResource.close();
+                } catch (Exception e) {
+                    this.getLogger().warn("Failed while closing " + 
this.kafkaResource, e);
+                }
+            }
+        } finally {
+            this.kafkaResource = null;
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    /**
+     * This operation is called from
+     * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and
+     * contains main processing logic for this Processor.
+     */
+    protected abstract boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session);
+
+    /**
+     * Builds target resource for interacting with Kafka. The target resource
+     * could be one of {@link KafkaPublisher} or {@link KafkaConsumer}
+     */
+    protected abstract T buildKafkaResource(ProcessContext context, 
ProcessSession session);
+
+    /**
+     * This operation will be executed after {@link ProcessSession#commit()} 
has
+     * been called.
+     */
+    protected void postCommit(ProcessContext context) {
+        // no op
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+
+        String securityProtocol = 
validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+        /*
+         * validates that if one of SASL (Kerberos) option is selected for
+         * security protocol, then Kerberos principal is provided as well
+         */
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)){
+            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0){
+                results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
+                                + SECURITY_PROTOCOL.getDisplayName() + "> is 
configured as '"
+                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + 
SEC_SASL_SSL.getValue() + "'.")
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
+    /**
+     * Builds transit URI for provenance event. The transit URI will be in the
+     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
+     */
+    String buildTransitURI(String securityProtocol, String brokers, String 
topic) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(securityProtocol);
+        builder.append("://");
+        builder.append(brokers);
+        builder.append("/");
+        builder.append(topic);
+        return builder.toString();
+    }
+
+    /**
+     * Builds Kafka {@link Properties}
+     */
+    Properties buildKafkaProperties(ProcessContext context) {
+        Properties properties = new Properties();
+        for (PropertyDescriptor propertyDescriptor : 
context.getProperties().keySet()) {
+            String pName = propertyDescriptor.getName();
+            String pValue = propertyDescriptor.isExpressionLanguageSupported()
+                    ? 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
+                    : context.getProperty(propertyDescriptor).getValue();
+            if (pValue != null) {
+                if (pName.endsWith(".ms")) { // kafka standard time notation
+                    pValue = 
String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), 
TimeUnit.MILLISECONDS));
+                }
+                properties.setProperty(pName, pValue);
+            }
+        }
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
new file mode 100644
index 0000000..5949bf0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Consumes messages from Apache Kafka")
+@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" })
+public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], 
byte[]>> {
+
+    static final AllowableValue OFFSET_EARLIEST = new 
AllowableValue("earliest", "earliest", "Automatically reset the offset to the 
earliest offset");
+
+    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", 
"latest", "Automatically reset the offset to the latest offset");
+
+    static final AllowableValue OFFSET_NONE = new AllowableValue("none", 
"none", "Throw exception to the consumer if no previous offset is found for the 
consumer's group");
+
+    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
+            .name(ConsumerConfig.GROUP_ID_CONFIG)
+            .displayName("Group ID")
+            .description("A Group ID is used to identify consumers that are 
within the same consumer group. Corresponds to Kafka's 'group.id' property.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor AUTO_OFFSET_RESET = new 
PropertyDescriptor.Builder()
+            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+            .displayName("Offset Reset")
+            .description("Allows you to manage the condition when there is no 
initial offset in Kafka or if the current offset does not exist any "
+                    + "more on the server (e.g. because that data has been 
deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
+            .required(true)
+            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
+            .defaultValue(OFFSET_LATEST.getValue())
+            .build();
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = 
MESSAGE_DEMARCATOR_BUILDER
+            .description("Since KafkaConsumer receives messages in batches, 
you have an option to output a single FlowFile which contains "
+                    + "all Kafka messages in a single batch and this property 
allows you to provide a string (interpreted as UTF-8) to use "
+                    + "for demarcating apart multiple Kafka messages. This is 
an optional property and if not provided each Kafka message received "
+                    + "in a batch will result in a single FlowFile which 
essentially means that this processor may output multiple FlowFiles for each "
+                    + "time it is triggered. To enter special character such 
as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
+            .build();
+
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile byte[] demarcatorBytes;
+
+    private volatile String topic;
+
+    private volatile String brokers;
+
+    /*
+     * Will ensure that the list of the PropertyDescriptors is build only once,
+     * since all other lifecycle methods are invoked multiple times.
+     */
+    static {
+        List<PropertyDescriptor> _descriptors = new ArrayList<>();
+        _descriptors.addAll(SHARED_DESCRIPTORS);
+        _descriptors.add(GROUP_ID);
+        _descriptors.add(AUTO_OFFSET_RESET);
+        _descriptors.add(MESSAGE_DEMARCATOR);
+        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
+
+        RELATIONSHIPS = Collections.unmodifiableSet(SHARED_RELATIONSHIPS);
+    }
+
+    /**
+     *
+     */
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    /**
+     * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do
+     * the rest.
+     */
+    @Override
+    @OnStopped
+    public void close() {
+        if (this.kafkaResource != null) {
+            try {
+                this.kafkaResource.unsubscribe();
+            } finally { // in the event the above fails
+                super.close();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    /**
+     * Will rendezvous with Kafka by performing the following:
+     * <br>
+     * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a
+     * non-blocking manner, signaling yield if no records were received from
+     * Kafka
+     * <br>
+     * - if records were received form Kafka, the are written to a newly 
created
+     * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see
+     * {@link #MESSAGE_DEMARCATOR}
+     */
+    @Override
+    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession processSession) {
+        ConsumerRecords<byte[], byte[]> consumedRecords = 
this.kafkaResource.poll(100);
+        if (consumedRecords != null && !consumedRecords.isEmpty()) {
+            long start = System.nanoTime();
+            FlowFile flowFile = processSession.create();
+            final AtomicInteger messageCounter = new AtomicInteger();
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> iter = 
consumedRecords.iterator();
+            while (iter.hasNext()){
+                flowFile = processSession.append(flowFile, new 
OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        ConsumerRecord<byte[], byte[]> consumedRecord = 
iter.next();
+                        if (messageCounter.getAndIncrement() > 0 && 
ConsumeKafka.this.demarcatorBytes != null) {
+                            out.write(ConsumeKafka.this.demarcatorBytes);
+                        }
+                        out.write(consumedRecord.value());
+                    }
+                });
+                /*
+                 * Release FlowFile if there are more messages in the
+                 * ConsumerRecords batch and no demarcator was provided,
+                 * otherwise the FlowFile will be released as soon as this loop
+                 * exits.
+                 */
+                if (iter.hasNext() && ConsumeKafka.this.demarcatorBytes == 
null){
+                    this.releaseFlowFile(flowFile, context, processSession, 
start, messageCounter.get());
+                    flowFile = processSession.create();
+                    messageCounter.set(0);
+                }
+            }
+            this.releaseFlowFile(flowFile, context, processSession, start, 
messageCounter.get());
+        }
+        return consumedRecords != null && !consumedRecords.isEmpty();
+    }
+
+    /**
+     * This operation is called from
+     * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method after
+     * the process session is committed so that then kafka offset changes can 
be
+     * committed. This can mean in cases of really bad timing we could have 
data
+     * duplication upon recovery but not data loss. We want to commit the flow
+     * files in a NiFi sense before we commit them in a Kafka sense.
+     */
+    @Override
+    protected void postCommit(ProcessContext context) {
+        this.kafkaResource.commitSync();
+    }
+
+    /**
+     * Builds and instance of {@link KafkaConsumer} and subscribes to a 
provided
+     * topic.
+     */
+    @Override
+    protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext 
context, ProcessSession session) {
+        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
+                ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
+        this.topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
+        this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+        Properties kafkaProperties = this.buildKafkaProperties(context);
+        kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+        KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(kafkaProperties);
+        consumer.subscribe(Collections.singletonList(this.topic));
+        return consumer;
+    }
+
+    /**
+     * Will release flow file. Releasing of the flow file in the context of 
this
+     * operation implies the following:
+     *
+     * If Empty then remove from session and return If has something then
+     * transfer to {@link #REL_SUCCESS}
+     */
+    private void releaseFlowFile(FlowFile flowFile, ProcessContext context, 
ProcessSession session, long start, int msgCount) {
+        long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        String transitUri = 
this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), 
this.brokers, topic);
+        session.getProvenanceReporter().receive(flowFile, transitUri, 
"Received " + msgCount + " Kafka messages", executionDuration);
+        this.getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
new file mode 100644
index 0000000..f42a892
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
+ */
+class KafkaPublisher implements Closeable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
+
+    private final Producer<byte[], byte[]> kafkaProducer;
+
+    private volatile long ackWaitTime = 30000;
+
+    private volatile ProcessorLog processLog;
+
+    /**
+     * Creates an instance of this class as well as the instance of the
+     * corresponding Kafka {@link KafkaProducer} using provided Kafka
+     * configuration properties.
+     *
+     * @param kafkaProperties
+     *            instance of {@link Properties} used to bootstrap
+     *            {@link KafkaProducer}
+     */
+    KafkaPublisher(Properties kafkaProperties) {
+        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+    }
+
+    /**
+     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+     * determine how many messages to Kafka will be sent from a provided
+     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+     * It supports two publishing modes:
+     * <ul>
+     * <li>Sending all messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * <li>Sending only unacknowledged messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * </ul>
+     * The unacknowledged messages are determined from the value of
+     * {@link PublishingContext#getLastAckedMessageIndex()}.
+     * <br>
+     * This method assumes content stream affinity where it is expected that 
the
+     * content stream that represents the same Kafka message(s) will remain the
+     * same across possible retries. This is required specifically for cases
+     * where delimiter is used and a single content stream may represent
+     * multiple Kafka messages. The
+     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+     * index of the last ACKed message, so upon retry only messages with the
+     * higher index are sent.
+     *
+     * @param publishingContext
+     *            instance of {@link PublishingContext} which hold context
+     *            information about the message(s) to be sent.
+     * @return The index of the last successful offset.
+     */
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
+        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
+                publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
+
+        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
+        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+        byte[] messageBytes;
+        int tokenCounter = 0;
+        for (; (messageBytes = streamTokenizer.nextToken()) != null; 
tokenCounter++) {
+            if (prevLastAckedMessageIndex < tokenCounter) {
+                ProducerRecord<byte[], byte[]> message = new 
ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), 
messageBytes);
+                resultFutures.add(this.kafkaProducer.send(message));
+            }
+        }
+
+        int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
+        return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+    }
+
+    /**
+     * Sets the time this publisher will wait for the {@link Future#get()}
+     * operation (the Future returned by
+     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+     * out.
+     *
+     * This value will also be used as a timeout when closing the underlying
+     * {@link KafkaProducer}. See {@link #close()}.
+     */
+    void setAckWaitTime(long ackWaitTime) {
+        this.ackWaitTime = ackWaitTime;
+    }
+
+    /**
+     * This operation will process ACKs from Kafka in the order in which
+     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
+     * the index of the last ACKed message. Within this operation processing 
ACK
+     * simply means successful invocation of 'get()' operation on the
+     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+     * operation. Upon encountering any type of error while interrogating such
+     * {@link Future} the ACK loop will end. Messages that were not ACKed would
+     * be considered non-delivered and therefore could be resent at the later
+     * time.
+     *
+     * @param sendFutures
+     *            list of {@link Future}s representing results of publishing to
+     *            Kafka
+     *
+     * @param lastAckMessageIndex
+     *            the index of the last ACKed message. It is important to
+     *            provide the last ACKed message especially while re-trying so
+     *            the proper index is maintained.
+     */
+    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
+        boolean exceptionThrown = false;
+        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
+            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
+            try {
+                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+                lastAckMessageIndex++;
+            } catch (InterruptedException e) {
+                exceptionThrown = true;
+                Thread.currentThread().interrupt();
+                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
+            } catch (ExecutionException e) {
+                exceptionThrown = true;
+                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
+            } catch (TimeoutException e) {
+                exceptionThrown = true;
+                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
+            }
+        }
+
+        return lastAckMessageIndex;
+    }
+
+    /**
+     * Will close the underlying {@link KafkaProducer} waiting if necessary for
+     * the same duration as supplied {@link #setAckWaitTime(long)}
+     */
+    @Override
+    public void close() {
+        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Will set {@link ProcessorLog} as an additional logger to forward log
+     * messages to NiFi bulletin
+     */
+    void setProcessLog(ProcessorLog processLog) {
+        this.processLog = processLog;
+    }
+
+    /**
+     *
+     */
+    private void warnOrError(String message, Exception e) {
+        if (e == null) {
+            logger.warn(message);
+            if (this.processLog != null) {
+                this.processLog.warn(message);
+            }
+        } else {
+            logger.error(message, e);
+            if (this.processLog != null) {
+                this.processLog.error(message, e);
+            }
+        }
+    }
+
+    /**
+     * Encapsulates the result received from publishing messages to Kafka
+     */
+    static class KafkaPublisherResult {
+        private final int messagesSent;
+        private final int lastMessageAcked;
+        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+            this.messagesSent = messagesSent;
+            this.lastMessageAcked = lastMessageAcked;
+        }
+
+        public int getMessagesSent() {
+            return this.messagesSent;
+        }
+
+        public int getLastMessageAcked() {
+            return this.lastMessageAcked;
+        }
+
+        public boolean isAllAcked() {
+            return this.messagesSent - 1 == this.lastMessageAcked;
+        }
+
+        @Override
+        public String toString() {
+            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
new file mode 100644
index 0000000..8c948df
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+        private volatile int index;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // noop
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
+            return 
this.next(cluster.availablePartitionsForTopic(topic).size());
+        }
+
+        @Override
+        public void close() {
+            // noop
+        }
+
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
+            }
+            return index++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
new file mode 100644
index 0000000..6235f0b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
+import 
org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
+        + "user-specified delimiter, such as a new-line.")
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
+                 description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was 
already set, its value will be ignored and WARN message logged."
+        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
+public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
+
+    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
+
+    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
+
+    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
+
+    protected static final String FAILED_KEY_ATTR = "failed.key";
+
+    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
+
+    protected static final String MSG_COUNT = "msg.count";
+
+    static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+            "FlowFile will be routed to failure unless the message is 
replicated to the appropriate "
+                    + "number of Kafka Nodes according to the Topic 
configuration");
+    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", 
"Guarantee Single Node Delivery",
+            "FlowFile will be routed to success if the message is received by 
a single Kafka node, "
+                    + "whether or not it is replicated. This is faster than 
<Guarantee Replicated Delivery> "
+                    + "but can result in data loss if a Kafka node crashes");
+    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", 
"Best Effort",
+            "FlowFile will be routed to success after successfully writing the 
content to a Kafka node, "
+                    + "without waiting for a response. This provides the best 
performance but may result in data loss.");
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue(RoundRobinPartitioner.class.getName(),
+            RoundRobinPartitioner.class.getSimpleName(),
+            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+            "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.ACKS_CONFIG)
+            .displayName("Delivery Guarantee")
+            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+            .build();
+    static final PropertyDescriptor META_WAIT_TIME = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+            .displayName("Meta Data Wait Time")
+            .description("The amount of time KafkaConsumer will wait to obtain 
metadata during the 'send' call before failing the "
+                            + "entire 'send' call. Corresponds to Kafka's 
'max.block.ms' property")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("30 sec")
+            .build();
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("kafka-key")
+            .displayName("Kafka Key")
+            .description("The Key to use for the Message")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = 
MESSAGE_DEMARCATOR_BUILDER
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating apart multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                            + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                            + "To enter special character such as 'new line' 
use CTRL+Enter or Shift+Enter depending on your OS.")
+            .build();
+    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+            .displayName("Partitioner class")
+            .description("Specifies which class to use to compute a partition 
id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+            .defaultValue(RANDOM_PARTITIONING.getValue())
+            .required(false)
+            .build();
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    private volatile String brokers;
+
+    /*
+     * Will ensure that list of PropertyDescriptors is build only once, since
+     * all other lifecycle methods are invoked multiple times.
+     */
+    static {
+        List<PropertyDescriptor> _descriptors = new ArrayList<>();
+        _descriptors.addAll(SHARED_DESCRIPTORS);
+        _descriptors.add(DELIVERY_GUARANTEE);
+        _descriptors.add(KEY);
+        _descriptors.add(MESSAGE_DEMARCATOR);
+        _descriptors.add(META_WAIT_TIME);
+        _descriptors.add(PARTITION_CLASS);
+        _descriptors.add(COMPRESSION_CODEC);
+
+        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.addAll(SHARED_RELATIONSHIPS);
+        _relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
+    }
+
+    /**
+     *
+     */
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    /**
+     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link 
FlowFile}
+     * producing a result {@link FlowFile}.
+     * <br>
+     * The result {@link FlowFile} that is successful is then transfered to 
{@link #REL_SUCCESS}
+     * <br>
+     * The result {@link FlowFile} that is failed is then transfered to {@link 
#REL_FAILURE}
+     *
+     */
+    @Override
+    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session){
+        FlowFile flowFile = session.get();
+        if (flowFile != null) {
+            long start = System.nanoTime();
+            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
+            Relationship relationship = REL_SUCCESS;
+            if (!this.isFailedFlowFile(flowFile)) {
+                String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                String transitUri = 
this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), 
this.brokers, topic);
+                session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", 
executionDuration);
+                this.getLogger().info("Successfully sent {} to Kafka as {} 
message(s) in {} millis", new Object[] { flowFile, 
flowFile.getAttribute(MSG_COUNT), executionDuration });
+            } else {
+                relationship = REL_FAILURE;
+                flowFile = session.penalize(flowFile);
+            }
+            session.transfer(flowFile, relationship);
+        }
+        return flowFile != null;
+    }
+
+    /**
+     * Builds and instance of {@link KafkaPublisher}.
+     */
+    @Override
+    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
+        Properties kafkaProperties = this.buildKafkaProperties(context);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        return new KafkaPublisher(kafkaProperties);
+    }
+
+    /**
+     * Will rendezvous with {@link KafkaPublisher} after building
+     * {@link PublishingContext} and will produce the resulting {@link 
FlowFile}.
+     * The resulting FlowFile contains all required information to determine
+     * if message publishing originated from the provided FlowFile has actually
+     * succeeded fully, partially or failed completely (see
+     * {@link #isFailedFlowFile(FlowFile)}.
+     */
+    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
+        final AtomicReference<KafkaPublisherResult> publishResultRef = new 
AtomicReference<>();
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream contentStream) throws IOException {
+                PublishingContext publishingContext = 
PublishKafka.this.buildPublishingContext(flowFile, context, contentStream);
+                KafkaPublisherResult result = 
PublishKafka.this.kafkaResource.publish(publishingContext);
+                publishResultRef.set(result);
+            }
+        });
+
+        FlowFile resultFile = publishResultRef.get().isAllAcked()
+                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
+                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
+
+        if (!this.isFailedFlowFile(resultFile)) {
+            resultFile = session.putAttribute(resultFile, MSG_COUNT,  
String.valueOf(publishResultRef.get().getMessagesSent()));
+        }
+        return resultFile;
+    }
+
+    /**
+     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
+     * {@link PublishingContext} contains all contextual information required 
by
+     * {@link KafkaPublisher} to publish to Kafka. Such information contains
+     * things like topic name, content stream, delimiter, key and last ACKed
+     * message for cases where provided FlowFile is being retried (failed in 
the
+     * past).
+     * <br>
+     * For the clean FlowFile (file that has been sent for the first time),
+     * PublishingContext will be built form {@link ProcessContext} associated
+     * with this invocation.
+     * <br>
+     * For the failed FlowFile, {@link PublishingContext} will be built from
+     * attributes of that FlowFile which by then will already contain required
+     * information (e.g., topic, key, delimiter etc.). This is required to
+     * ensure the affinity of the retry in the even where processor
+     * configuration has changed. However keep in mind that failed FlowFile is
+     * only considered a failed FlowFile if it is being re-processed by the 
same
+     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
+     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent 
to
+     * another PublishKafka processor it is treated as a fresh FlowFile
+     * regardless if it has #FAILED* attributes set.
+     */
+    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context, InputStream contentStream) {
+        String topicName;
+        byte[] keyBytes;
+        byte[] delimiterBytes = null;
+        int lastAckedMessageIndex = -1;
+        if (this.isFailedFlowFile(flowFile)) {
+            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
+            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
+            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
+                    ? 
flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
+            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != 
null
+                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
+
+        } else {
+            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+            String _key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+            keyBytes = _key == null ? null : 
_key.getBytes(StandardCharsets.UTF_8);
+            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? 
context.getProperty(MESSAGE_DEMARCATOR)
+                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
+        }
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        publishingContext.setKeyBytes(keyBytes);
+        publishingContext.setDelimiterBytes(delimiterBytes);
+        return publishingContext;
+    }
+
+    /**
+     * Will remove FAILED_* attributes if FlowFile is no longer considered a
+     * failed FlowFile
+     *
+     * @see #isFailedFlowFile(FlowFile)
+     */
+    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
+        if (this.isFailedFlowFile(flowFile)) {
+            Set<String> keysToRemove = new HashSet<>();
+            keysToRemove.add(FAILED_DELIMITER_ATTR);
+            keysToRemove.add(FAILED_KEY_ATTR);
+            keysToRemove.add(FAILED_TOPIC_ATTR);
+            keysToRemove.add(FAILED_PROC_ID_ATTR);
+            keysToRemove.add(FAILED_LAST_ACK_IDX);
+            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
+        }
+        return flowFile;
+    }
+
+    /**
+     * Builds a {@link Map} of FAILED_* attributes
+     *
+     * @see #FAILED_PROC_ID_ATTR
+     * @see #FAILED_LAST_ACK_IDX
+     * @see #FAILED_TOPIC_ATTR
+     * @see #FAILED_KEY_ATTR
+     * @see #FAILED_DELIMITER_ATTR
+     */
+    private Map<String, String> buildFailedFlowFileAttributes(int 
lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
+        attributes.put(FAILED_LAST_ACK_IDX, 
String.valueOf(lastAckedMessageIndex));
+        attributes.put(FAILED_TOPIC_ATTR, 
context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_KEY_ATTR, 
context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
+        attributes.put(FAILED_DELIMITER_ATTR, 
context.getProperty(MESSAGE_DEMARCATOR).isSet()
+                        ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue()
 : null);
+        return attributes;
+    }
+
+    /**
+     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
+     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
+     */
+    private boolean isFailedFlowFile(FlowFile flowFile) {
+        return 
this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
new file mode 100644
index 0000000..bda29e6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Holder of context information used by {@link KafkaPublisher} required to
+ * publish messages to Kafka.
+ */
+class PublishingContext {
+
+    private final InputStream contentStream;
+
+    private final String topic;
+
+    private final int lastAckedMessageIndex;
+
+    /*
+     * We're using the default value from Kafka. We are using it to control the
+     * message size before it goes to to Kafka thus limiting possibility of a
+     * late failures in Kafka client.
+     */
+    private int maxRequestSize = 1048576; // kafka default
+
+    private boolean maxRequestSizeSet;
+
+    private byte[] keyBytes;
+
+    private byte[] delimiterBytes;
+
+    PublishingContext(InputStream contentStream, String topic) {
+        this(contentStream, topic, -1);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this.validateInput(contentStream, topic, lastAckedMessageIndex);
+        this.contentStream = contentStream;
+        this.topic = topic;
+        this.lastAckedMessageIndex = lastAckedMessageIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
+    }
+
+    int getLastAckedMessageIndex() {
+        return this.lastAckedMessageIndex;
+    }
+
+    int getMaxRequestSize() {
+        return this.maxRequestSize;
+    }
+
+    byte[] getKeyBytes() {
+        return this.keyBytes;
+    }
+
+    byte[] getDelimiterBytes() {
+        return this.delimiterBytes;
+    }
+
+    InputStream getContentStream() {
+        return this.contentStream;
+    }
+
+    String getTopic() {
+        return this.topic;
+    }
+
+    void setKeyBytes(byte[] keyBytes) {
+        if (this.keyBytes == null) {
+            if (keyBytes != null) {
+                this.assertBytesValid(keyBytes);
+                this.keyBytes = keyBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
+        }
+    }
+
+    void setDelimiterBytes(byte[] delimiterBytes) {
+        if (this.delimiterBytes == null) {
+            if (delimiterBytes != null) {
+                this.assertBytesValid(delimiterBytes);
+                this.delimiterBytes = delimiterBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
+        }
+    }
+
+    void setMaxRequestSize(int maxRequestSize) {
+        if (!this.maxRequestSizeSet) {
+            if (maxRequestSize > 0) {
+                this.maxRequestSize = maxRequestSize;
+                this.maxRequestSizeSet = true;
+            } else {
+                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
+            }
+        } else {
+            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
+        }
+    }
+
+    private void assertBytesValid(byte[] bytes) {
+        if (bytes != null) {
+            if (bytes.length == 0) {
+                throw new IllegalArgumentException("'bytes' must not be 
empty");
+            }
+        }
+    }
+
+    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        if (contentStream == null) {
+            throw new IllegalArgumentException("'contentStream' must not be 
null");
+        } else if (topic == null || topic.trim().length() == 0) {
+            throw new IllegalArgumentException("'topic' must not be null or 
empty");
+        } else if (lastAckedMessageIndex < -1) {
+            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..28b8393
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kafka.pubsub.PublishKafka
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
new file mode 100644
index 0000000..0e09f72
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
@@ -0,0 +1,33 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ConsumeKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
+            for data using KafkaConsumer API available with Kafka 0.9+. When a 
message is received
+            from Kafka, this Processor emits a FlowFile where the content of 
the FlowFile is the value
+            of the Kafka message.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
new file mode 100644
index 0000000..20ce03c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
@@ -0,0 +1,47 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation 
================================================== -->
+        <h2>Description:</h2>
+        <p>
+            This Processors puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/";>Apache Kafka</a> using 
KafkaProducer API available
+            with Kafka 0.9+ API. The content of a FlowFile becomes the 
contents of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka 
Key&gt; Property.
+        </p>
+
+        <p>
+            The Processor allows the user to configure an optional Message 
Demarcator that
+            can be used to send many messages per FlowFile. For example, a 
<i>\n</i> could be used
+            to indicate that the contents of the FlowFile should be used to 
send one message
+            per line of text. It also supports multi-char demarcators (e.g., 
'my custom demarcator').
+            If the property is not set, the entire contents of the FlowFile
+            will be sent as a single message. When using the demarcator, if 
some messages are
+            successfully sent but other messages fail to send, the resulting 
FlowFile will be
+            considered a failed FlowFuile and will have additional attributes 
to that effect.
+            One of such attributes is 'failed.last.idx' which indicates the 
index of the last message
+            that was successfully ACKed by Kafka. (if no demarcator is used 
the value of this index will be -1).
+            This will allow PublishKafka to only re-send un-ACKed messages on 
the next re-try.
+        </p>
+    </body>
+</html>

Reply via email to