http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
deleted file mode 100644
index 4677e33..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyDescriptor.Builder;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.FormatUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for implementing {@link Processor}s to publish and consume
- * messages to/from Kafka
- *
- * @see PublishKafka
- * @see ConsumeKafka
- */
-abstract class AbstractKafkaProcessor<T extends Closeable> extends 
AbstractSessionFactoryProcessor {
-
-    final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
-
-    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
-
-
-    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
-    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
-    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
-    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", 
"SASL_SSL", "SASL_SSL");
-
-    static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-            .displayName("Kafka Brokers")
-            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
-            .expressionLanguageSupported(true)
-            .defaultValue("localhost:9092")
-            .build();
-    static final PropertyDescriptor CLIENT_ID = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.CLIENT_ID_CONFIG)
-            .displayName("Client ID")
-            .description("String value uniquely identifying this client 
application. Corresponds to Kafka's 'client.id' property.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
-            .name("security.protocol")
-            .displayName("Security Protocol")
-            .description("Protocol used to communicate with brokers. 
Corresponds to Kafka's 'security.protocol' property.")
-            .required(false)
-            .expressionLanguageSupported(false)
-            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, 
SEC_SASL_SSL)
-            .defaultValue(SEC_PLAINTEXT.getValue())
-            .build();
-    static final PropertyDescriptor KERBEROS_PRINCIPLE = new 
PropertyDescriptor.Builder()
-            .name("sasl.kerberos.service.name")
-            .displayName("Kerberos Service Name")
-            .description("The Kerberos principal name that Kafka runs as. This 
can be defined either in Kafka's JAAS config or in Kafka's config. "
-                    + "Corresponds to Kafka's 'security.protocol' property."
-                    + "It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-
-    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
-            .name("topic")
-            .displayName("Topic Name")
-            .description("The name of the Kafka Topic")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-
-    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("ssl.context.service")
-            .displayName("SSL Context Service")
-            .description("Specifies the SSL Context Service to use for 
communicating with Kafka.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-
-    static final Builder MESSAGE_DEMARCATOR_BUILDER = new 
PropertyDescriptor.Builder()
-            .name("message-demarcator")
-            .displayName("Message Demarcator")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true);
-
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles that are the are successfully sent to 
or received from Kafka are routed to this relationship")
-            .build();
-
-    static final List<PropertyDescriptor> SHARED_DESCRIPTORS = new 
ArrayList<>();
-
-    static final Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>();
-
-    private final AtomicInteger taskCounter = new AtomicInteger();
-
-    private volatile boolean acceptTask = true;
-
-    static {
-        SHARED_DESCRIPTORS.add(BOOTSTRAP_SERVERS);
-        SHARED_DESCRIPTORS.add(TOPIC);
-        SHARED_DESCRIPTORS.add(CLIENT_ID);
-        SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
-        SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
-        SHARED_DESCRIPTORS.add(SSL_CONTEXT_SERVICE);
-
-        SHARED_RELATIONSHIPS.add(REL_SUCCESS);
-    }
-
-    /**
-     * Instance of {@link KafkaPublisher} or {@link KafkaConsumer}
-     */
-    volatile T kafkaResource;
-
-    /**
-     * This thread-safe operation will delegate to
-     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
-     * checking and creating (if necessary) Kafka resource which could be 
either
-     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
-     * destroy the underlying Kafka resource upon catching an {@link Exception}
-     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
-     * After Kafka resource is destroyed it will be re-created upon the next
-     * invocation of this operation essentially providing a self healing 
mechanism
-     * to deal with potentially corrupted resource.
-     * <p>
-     * Keep in mind that upon catching an exception the state of this processor
-     * will be set to no longer accept any more tasks, until Kafka resource is 
reset.
-     * This means that in a multi-threaded situation currently executing tasks 
will
-     * be given a chance to complete while no new tasks will be accepted.
-     */
-    @Override
-    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        if (this.acceptTask) { // acts as a circuit breaker to allow existing 
tasks to wind down so 'kafkaResource' can be reset before new tasks are 
accepted.
-            this.taskCounter.incrementAndGet();
-            final ProcessSession session = sessionFactory.createSession();
-            try {
-                /*
-                 * We can't be doing double null check here since as a pattern
-                 * it only works for lazy init but not reset, which is what we
-                 * are doing here. In fact the first null check is dangerous
-                 * since 'kafkaResource' can become null right after its null
-                 * check passed causing subsequent NPE.
-                 */
-                synchronized (this) {
-                    if (this.kafkaResource == null) {
-                        this.kafkaResource = this.buildKafkaResource(context, 
session);
-                    }
-                }
-
-                /*
-                 * The 'processed' boolean flag does not imply any failure or 
success. It simply states that:
-                 * - ConsumeKafka - some messages were received form Kafka and 
1_ FlowFile were generated
-                 * - PublishKafka - some messages were sent to Kafka based on 
existence of the input FlowFile
-                 */
-                boolean processed = this.rendezvousWithKafka(context, session);
-                session.commit();
-                if (processed) {
-                    this.postCommit(context);
-                } else {
-                    context.yield();
-                }
-            } catch (Throwable e) {
-                this.acceptTask = false;
-                session.rollback(true);
-                this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[] { this, e });
-            } finally {
-                synchronized (this) {
-                    if (this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask) {
-                        this.close();
-                        this.acceptTask = true;
-                    }
-                }
-            }
-        } else {
-            this.logger.debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
-            this.getLogger().debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
-            context.yield();
-        }
-    }
-
-    /**
-     * Will call {@link Closeable#close()} on the target resource after which
-     * the target resource will be set to null. Should only be called when 
there
-     * are no more threads being executed on this processor or when it has been
-     * verified that only a single thread remains.
-     *
-     * @see KafkaPublisher
-     * @see KafkaConsumer
-     */
-    @OnStopped
-    public void close() {
-        try {
-            if (this.kafkaResource != null) {
-                try {
-                    this.kafkaResource.close();
-                } catch (Exception e) {
-                    this.getLogger().warn("Failed while closing " + 
this.kafkaResource, e);
-                }
-            }
-        } finally {
-            this.kafkaResource = null;
-        }
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
-                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
-                .build();
-    }
-
-    /**
-     * This operation is called from
-     * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and
-     * contains main processing logic for this Processor.
-     */
-    protected abstract boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session);
-
-    /**
-     * Builds target resource for interacting with Kafka. The target resource
-     * could be one of {@link KafkaPublisher} or {@link KafkaConsumer}
-     */
-    protected abstract T buildKafkaResource(ProcessContext context, 
ProcessSession session);
-
-    /**
-     * This operation will be executed after {@link ProcessSession#commit()} 
has
-     * been called.
-     */
-    protected void postCommit(ProcessContext context) {
-        // no op
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
-        List<ValidationResult> results = new ArrayList<>();
-
-        String securityProtocol = 
validationContext.getProperty(SECURITY_PROTOCOL).getValue();
-
-        /*
-         * validates that if one of SASL (Kerberos) option is selected for
-         * security protocol, then Kerberos principal is provided as well
-         */
-        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || 
SEC_SASL_SSL.getValue().equals(securityProtocol)){
-            String kerberosPrincipal = 
validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
-            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() 
== 0){
-                results.add(new 
ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
-                        .explanation("The <" + 
KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
-                                + SECURITY_PROTOCOL.getDisplayName() + "> is 
configured as '"
-                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + 
SEC_SASL_SSL.getValue() + "'.")
-                        .build());
-            }
-        }
-
-        String keySerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build())
-                .getValue();
-        if (keySerializer != null && 
!ByteArraySerializer.class.getName().equals(keySerializer)) {
-            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Key Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
-        }
-        String valueSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build())
-                .getValue();
-        if (valueSerializer != null && 
!ByteArraySerializer.class.getName().equals(valueSerializer)) {
-            results.add(new 
ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                    .explanation("Value Serializer must be " + 
ByteArraySerializer.class.getName() + "' was '" + valueSerializer + 
"'").build());
-        }
-        String keyDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build())
-                .getValue();
-        if (keyDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
-            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Key De-Serializer must be '" + 
ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + 
"'").build());
-        }
-        String valueDeSerializer = validationContext.getProperty(new 
PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build())
-                .getValue();
-        if (valueDeSerializer != null && 
!ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
-            results.add(new 
ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                    .explanation("Value De-Serializer must be " + 
ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + 
"'").build());
-        }
-
-        return results;
-    }
-
-    /**
-     * Builds transit URI for provenance event. The transit URI will be in the
-     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
-     */
-    String buildTransitURI(String securityProtocol, String brokers, String 
topic) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(securityProtocol);
-        builder.append("://");
-        builder.append(brokers);
-        builder.append("/");
-        builder.append(topic);
-        return builder.toString();
-    }
-
-    /**
-     * Builds Kafka {@link Properties}
-     */
-    Properties buildKafkaProperties(ProcessContext context) {
-        Properties properties = new Properties();
-        for (PropertyDescriptor propertyDescriptor : 
context.getProperties().keySet()) {
-            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
-                // Translate SSLContext Service configuration into Kafka 
properties
-                final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                buildSSLKafkaProperties(sslContextService, properties);
-                continue;
-            }
-
-            String pName = propertyDescriptor.getName();
-            String pValue = propertyDescriptor.isExpressionLanguageSupported()
-                    ? 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
-                    : context.getProperty(propertyDescriptor).getValue();
-            if (pValue != null) {
-                if (pName.endsWith(".ms")) { // kafka standard time notation
-                    pValue = 
String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), 
TimeUnit.MILLISECONDS));
-                }
-                properties.setProperty(pName, pValue);
-            }
-        }
-        return properties;
-    }
-
-    private void buildSSLKafkaProperties(final SSLContextService 
sslContextService, final Properties properties) {
-        if (sslContextService == null) {
-            return;
-        }
-
-        if (sslContextService.isKeyStoreConfigured()) {
-            properties.setProperty("ssl.keystore.location", 
sslContextService.getKeyStoreFile());
-            properties.setProperty("ssl.keystore.password", 
sslContextService.getKeyStorePassword());
-            final String keyPass = sslContextService.getKeyPassword() == null 
? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
-            properties.setProperty("ssl.key.password", keyPass);
-            properties.setProperty("ssl.keystore.type", 
sslContextService.getKeyStoreType());
-        }
-
-        if (sslContextService.isTrustStoreConfigured()) {
-            properties.setProperty("ssl.truststore.location", 
sslContextService.getTrustStoreFile());
-            properties.setProperty("ssl.truststore.password", 
sslContextService.getTrustStorePassword());
-            properties.setProperty("ssl.truststore.type", 
sslContextService.getTrustStoreType());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
deleted file mode 100644
index ac5b4c5..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Consumes messages from Apache Kafka")
-@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" })
-public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], 
byte[]>> {
-
-    static final AllowableValue OFFSET_EARLIEST = new 
AllowableValue("earliest", "earliest", "Automatically reset the offset to the 
earliest offset");
-
-    static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", 
"latest", "Automatically reset the offset to the latest offset");
-
-    static final AllowableValue OFFSET_NONE = new AllowableValue("none", 
"none", "Throw exception to the consumer if no previous offset is found for the 
consumer's group");
-
-    static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
-            .name(ConsumerConfig.GROUP_ID_CONFIG)
-            .displayName("Group ID")
-            .description("A Group ID is used to identify consumers that are 
within the same consumer group. Corresponds to Kafka's 'group.id' property.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .build();
-    static final PropertyDescriptor AUTO_OFFSET_RESET = new 
PropertyDescriptor.Builder()
-            .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
-            .displayName("Offset Reset")
-            .description("Allows you to manage the condition when there is no 
initial offset in Kafka or if the current offset does not exist any "
-                    + "more on the server (e.g. because that data has been 
deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
-            .required(true)
-            .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE)
-            .defaultValue(OFFSET_LATEST.getValue())
-            .build();
-    static final PropertyDescriptor MESSAGE_DEMARCATOR = 
MESSAGE_DEMARCATOR_BUILDER
-            .description("Since KafkaConsumer receives messages in batches, 
you have an option to output a single FlowFile which contains "
-                    + "all Kafka messages in a single batch and this property 
allows you to provide a string (interpreted as UTF-8) to use "
-                    + "for demarcating apart multiple Kafka messages. This is 
an optional property and if not provided each Kafka message received "
-                    + "in a batch will result in a single FlowFile which 
essentially means that this processor may output multiple FlowFiles for each "
-                    + "time it is triggered. To enter special character such 
as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
-            .build();
-
-
-    static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static final Set<Relationship> RELATIONSHIPS;
-
-    private volatile byte[] demarcatorBytes;
-
-    private volatile String topic;
-
-    private volatile String brokers;
-
-    /*
-     * Will ensure that the list of the PropertyDescriptors is build only once,
-     * since all other lifecycle methods are invoked multiple times.
-     */
-    static {
-        List<PropertyDescriptor> _descriptors = new ArrayList<>();
-        _descriptors.addAll(SHARED_DESCRIPTORS);
-        _descriptors.add(GROUP_ID);
-        _descriptors.add(AUTO_OFFSET_RESET);
-        _descriptors.add(MESSAGE_DEMARCATOR);
-        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
-
-        RELATIONSHIPS = Collections.unmodifiableSet(SHARED_RELATIONSHIPS);
-    }
-
-    /**
-     *
-     */
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
-    }
-
-    /**
-     * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do
-     * the rest.
-     */
-    @Override
-    @OnStopped
-    public void close() {
-        if (this.kafkaResource != null) {
-            try {
-                this.kafkaResource.unsubscribe();
-            } finally { // in the event the above fails
-                super.close();
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    /**
-     * Will rendezvous with Kafka by performing the following:
-     * <br>
-     * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a
-     * non-blocking manner, signaling yield if no records were received from
-     * Kafka
-     * <br>
-     * - if records were received form Kafka, the are written to a newly 
created
-     * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see
-     * {@link #MESSAGE_DEMARCATOR}
-     */
-    @Override
-    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession processSession) {
-        ConsumerRecords<byte[], byte[]> consumedRecords = 
this.kafkaResource.poll(100);
-        if (consumedRecords != null && !consumedRecords.isEmpty()) {
-            long start = System.nanoTime();
-            FlowFile flowFile = processSession.create();
-            final AtomicInteger messageCounter = new AtomicInteger();
-            final Map<String, String> kafkaAttributes = new HashMap<>();
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> iter = 
consumedRecords.iterator();
-            while (iter.hasNext()){
-                flowFile = processSession.append(flowFile, new 
OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        ConsumerRecord<byte[], byte[]> consumedRecord = 
iter.next();
-
-                        kafkaAttributes.put("kafka.offset", 
String.valueOf(consumedRecord.offset()));
-                        if (consumedRecord.key() != null) {
-                            kafkaAttributes.put("kafka.key", new 
String(consumedRecord.key(), StandardCharsets.UTF_8));
-                        }
-                        kafkaAttributes.put("kafka.partition", 
String.valueOf(consumedRecord.partition()));
-                        kafkaAttributes.put("kafka.topic", 
consumedRecord.topic());
-
-                        if (messageCounter.getAndIncrement() > 0 && 
ConsumeKafka.this.demarcatorBytes != null) {
-                            out.write(ConsumeKafka.this.demarcatorBytes);
-                        }
-                        out.write(consumedRecord.value());
-                    }
-                });
-
-                flowFile = processSession.putAllAttributes(flowFile, 
kafkaAttributes);
-                /*
-                 * Release FlowFile if there are more messages in the
-                 * ConsumerRecords batch and no demarcator was provided,
-                 * otherwise the FlowFile will be released as soon as this loop
-                 * exits.
-                 */
-                if (iter.hasNext() && ConsumeKafka.this.demarcatorBytes == 
null){
-                    this.releaseFlowFile(flowFile, context, processSession, 
start, messageCounter.get());
-                    flowFile = processSession.create();
-                    messageCounter.set(0);
-                }
-            }
-            this.releaseFlowFile(flowFile, context, processSession, start, 
messageCounter.get());
-        }
-        return consumedRecords != null && !consumedRecords.isEmpty();
-    }
-
-    /**
-     * This operation is called from
-     * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method after
-     * the process session is committed so that then kafka offset changes can 
be
-     * committed. This can mean in cases of really bad timing we could have 
data
-     * duplication upon recovery but not data loss. We want to commit the flow
-     * files in a NiFi sense before we commit them in a Kafka sense.
-     */
-    @Override
-    protected void postCommit(ProcessContext context) {
-        this.kafkaResource.commitSync();
-    }
-
-    /**
-     * Builds and instance of {@link KafkaConsumer} and subscribes to a 
provided
-     * topic.
-     */
-    @Override
-    protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext 
context, ProcessSession session) {
-        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
-                : null;
-        this.topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
-        this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        Properties kafkaProperties = this.buildKafkaProperties(context);
-
-        /*
-         * Since we are using unconventional way to validate if connectivity to
-         * broker is possible we need a mechanism to be able to disable it.
-         * 'check.connection' property will serve as such mechanism
-         */
-        if (!"false".equals(kafkaProperties.get("check.connection"))) {
-            this.checkIfInitialConnectionPossible();
-        }
-
-        kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-
-        KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(kafkaProperties);
-        consumer.subscribe(Collections.singletonList(this.topic));
-        return consumer;
-    }
-
-    /**
-     * Checks via brute force if it is possible to establish connection to at
-     * least one broker. If not this method will throw {@link 
ProcessException}.
-     */
-    private void checkIfInitialConnectionPossible(){
-        String[] br = this.brokers.split(",");
-        boolean connectionPossible = false;
-        for (int i = 0; i < br.length && !connectionPossible; i++) {
-            String hostPortCombo = br[i];
-            String[] hostPort = hostPortCombo.split(":");
-            Socket client = null;
-            try {
-                client = new Socket();
-                client.connect(new InetSocketAddress(hostPort[0].trim(), 
Integer.parseInt(hostPort[1].trim())), 10000);
-                connectionPossible = true;
-            } catch (Exception e) {
-                this.logger.error("Connection to '" + hostPortCombo + "' is 
not possible", e);
-            } finally {
-                try {
-                    client.close();
-                } catch (IOException e) {
-                    // ignore
-                }
-            }
-        }
-        if (!connectionPossible){
-            throw new ProcessException("Connection to " + this.brokers + " is 
not possible. See logs for more details");
-        }
-    }
-    /**
-     * Will release flow file. Releasing of the flow file in the context of 
this
-     * operation implies the following:
-     *
-     * If Empty then remove from session and return If has something then
-     * transfer to {@link #REL_SUCCESS}
-     */
-    private void releaseFlowFile(FlowFile flowFile, ProcessContext context, 
ProcessSession session, long start, int msgCount) {
-        long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        String transitUri = 
this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), 
this.brokers, topic);
-        session.getProvenanceReporter().receive(flowFile, transitUri, 
"Received " + msgCount + " Kafka messages", executionDuration);
-        this.getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
-        session.transfer(flowFile, REL_SUCCESS);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
deleted file mode 100644
index e2cdea2..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * A pool of Kafka Consumers for a given topic. Clients must create the 
ConsumerPool and call initialize() before
- * acquiring consumers. Consumers should be returned by calling 
returnConsumerResource.
- */
-public class ConsumerPool implements Closeable {
-
-    private final int size;
-    private final BlockingQueue<ConsumerResource> consumers;
-    private final String topic;
-    private final Properties kafkaProperties;
-    private final ComponentLog logger;
-    private boolean initialized = false;
-
-    /**
-     * Initializes the pool with the given size, topic, properties, and 
logger, but does not create any consumers until initialize() is called.
-     *
-     * @param size the number of consumers to pool
-     * @param topic the topic to consume from
-     * @param kafkaProperties the properties for each consumer
-     * @param logger the logger to report any errors/warnings
-     */
-    public ConsumerPool(final int size, final String topic, final Properties 
kafkaProperties, final ComponentLog logger) {
-        this.size = size;
-        this.logger = logger;
-        this.topic = topic;
-        this.kafkaProperties = kafkaProperties;
-        this.consumers = new LinkedBlockingQueue<>(size);
-    }
-
-    /**
-     * Creates the consumers and subscribes them to the given topic. This 
method must be called before
-     * acquiring any consumers.
-     */
-    public synchronized void initialize() {
-        if (initialized) {
-            return;
-        }
-
-        for (int i=0; i < size; i++) {
-            ConsumerResource resource = createConsumerResource();
-            consumers.offer(resource);
-        }
-
-        initialized = true;
-    }
-
-    /**
-     * @return a ConsumerResource from the pool, or a newly created 
ConsumerResource if none were available in the pool
-     * @throws IllegalStateException if attempting to get a consumer before 
calling initialize()
-     */
-    public synchronized ConsumerResource getConsumerResource() {
-        if (!initialized) {
-            throw new IllegalStateException("ConsumerPool must be initialized 
before acquiring consumers");
-        }
-
-        ConsumerResource consumerResource = consumers.poll();
-        if (consumerResource == null) {
-            consumerResource = createConsumerResource();
-        }
-        return consumerResource;
-    }
-
-    /**
-     * If the given ConsumerResource has been poisoned then it is closed and 
not returned to the pool,
-     * otherwise it is attempted to be returned to the pool. If the pool is 
already full then it is closed
-     * and not returned.
-     *
-     * @param consumerResource
-     */
-    public synchronized void returnConsumerResource(final ConsumerResource 
consumerResource) {
-        if (consumerResource == null) {
-            return;
-        }
-
-        if (consumerResource.isPoisoned()) {
-            closeConsumer(consumerResource.getConsumer());
-        } else {
-            boolean added = consumers.offer(consumerResource);
-            if (!added) {
-                closeConsumer(consumerResource.getConsumer());
-            }
-        }
-    }
-
-    /**
-     * Closes all ConsumerResources in the pool and resets the initialization 
state of this pool.
-     *
-     * @throws IOException should never throw
-     */
-    @Override
-    public synchronized void close() throws IOException {
-        ConsumerResource consumerResource;
-        while ((consumerResource = consumers.poll()) != null) {
-            closeConsumer(consumerResource.getConsumer());
-        }
-        initialized = false;
-    }
-
-    private ConsumerResource createConsumerResource() {
-        final Consumer<byte[],byte[]> kafkaConsumer = new 
KafkaConsumer<>(kafkaProperties);
-        kafkaConsumer.subscribe(Collections.singletonList(this.topic));
-        return new ConsumerResource(kafkaConsumer, this, logger);
-    }
-
-    private void closeConsumer(Consumer consumer) {
-        try {
-            consumer.unsubscribe();
-        } catch (Exception e) {
-            logger.warn("Failed while unsubscribing " + consumer, e);
-        }
-
-        try {
-            consumer.close();
-        } catch (Exception e) {
-            logger.warn("Failed while closing " + consumer, e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java
deleted file mode 100644
index baaf39f..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * A wrapper for a Kafka Consumer obtained from a ConsumerPool. Client's 
should call poison() to indicate that this
- * consumer should no longer be used by other clients, and should always call 
close(). Calling close() will pass
- * this consumer back to the pool and the pool will determine the appropriate 
handling based on whether the consumer
- * has been poisoned and whether the pool is already full.
- */
-public class ConsumerResource implements Closeable {
-
-    private final ComponentLog logger;
-    private final Consumer<byte[],byte[]> consumer;
-    private final ConsumerPool consumerPool;
-    private boolean poisoned = false;
-
-    /**
-     * @param consumer the Kafka Consumer
-     * @param consumerPool the ConsumerPool this ConsumerResource was obtained 
from
-     * @param logger the logger to report any errors/warnings
-     */
-    public ConsumerResource(Consumer<byte[], byte[]> consumer, ConsumerPool 
consumerPool, ComponentLog logger) {
-        this.logger = logger;
-        this.consumer = consumer;
-        this.consumerPool = consumerPool;
-    }
-
-    /**
-     * @return the Kafka Consumer for this
-     */
-    public Consumer<byte[],byte[]> getConsumer() {
-        return consumer;
-    }
-
-    /**
-     * Sets the poison flag for this consumer to true.
-     */
-    public void poison() {
-        poisoned = true;
-    }
-
-    /**
-     * @return true if this consumer has been poisoned, false otherwise
-     */
-    public boolean isPoisoned() {
-        return poisoned;
-    }
-
-    @Override
-    public void close() throws IOException {
-        consumerPool.returnConsumerResource(this);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
deleted file mode 100644
index f684bfa..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.util.StreamDemarcator;
-
-/**
- * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
- * with sending contents of the {@link FlowFile}s to Kafka.
- */
-class KafkaPublisher implements Closeable {
-    private final Producer<byte[], byte[]> kafkaProducer;
-
-    private volatile long ackWaitTime = 30000;
-
-    private final ComponentLog componentLog;
-
-    private final int ackCheckSize;
-
-    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
-        this(kafkaProperties, 100, componentLog);
-    }
-
-    /**
-     * Creates an instance of this class as well as the instance of the
-     * corresponding Kafka {@link KafkaProducer} using provided Kafka
-     * configuration properties.
-     *
-     * @param kafkaProperties
-     *            instance of {@link Properties} used to bootstrap
-     *            {@link KafkaProducer}
-     */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog 
componentLog) {
-        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
-        this.ackCheckSize = ackCheckSize;
-        this.componentLog = componentLog;
-    }
-
-    /**
-     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
-     * determine how many messages to Kafka will be sent from a provided
-     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
-     * It supports two publishing modes:
-     * <ul>
-     * <li>Sending all messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * <li>Sending only unacknowledged messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * </ul>
-     * The unacknowledged messages are determined from the value of
-     * {@link PublishingContext#getLastAckedMessageIndex()}.
-     * <br>
-     * This method assumes content stream affinity where it is expected that 
the
-     * content stream that represents the same Kafka message(s) will remain the
-     * same across possible retries. This is required specifically for cases
-     * where delimiter is used and a single content stream may represent
-     * multiple Kafka messages. The
-     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
-     * index of the last ACKed message, so upon retry only messages with the
-     * higher index are sent.
-     *
-     * @param publishingContext
-     *            instance of {@link PublishingContext} which hold context
-     *            information about the message(s) to be sent.
-     * @return The index of the last successful offset.
-     */
-    KafkaPublisherResult publish(PublishingContext publishingContext) {
-        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
-                publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
-
-        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
-        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
-
-        byte[] messageBytes;
-        int tokenCounter = 0;
-        boolean continueSending = true;
-        KafkaPublisherResult result = null;
-        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) 
!= null; tokenCounter++) {
-            if (prevLastAckedMessageIndex < tokenCounter) {
-                ProducerRecord<byte[], byte[]> message = new 
ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), 
messageBytes);
-                resultFutures.add(this.kafkaProducer.send(message));
-
-                if (tokenCounter % this.ackCheckSize == 0){
-                    int lastAckedMessageIndex = 
this.processAcks(resultFutures, prevLastAckedMessageIndex);
-                    resultFutures.clear();
-                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
-                        continueSending = false;
-                        result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
-                    }
-                    prevLastAckedMessageIndex = lastAckedMessageIndex;
-                }
-            }
-        }
-
-        if (result == null) {
-            int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
-            resultFutures.clear();
-            result = new KafkaPublisherResult(tokenCounter, 
lastAckedMessageIndex);
-        }
-        return result;
-    }
-
-    /**
-     * Sets the time this publisher will wait for the {@link Future#get()}
-     * operation (the Future returned by
-     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
-     * out.
-     *
-     * This value will also be used as a timeout when closing the underlying
-     * {@link KafkaProducer}. See {@link #close()}.
-     */
-    void setAckWaitTime(long ackWaitTime) {
-        this.ackWaitTime = ackWaitTime;
-    }
-
-    /**
-     * This operation will process ACKs from Kafka in the order in which
-     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
-     * the index of the last ACKed message. Within this operation processing 
ACK
-     * simply means successful invocation of 'get()' operation on the
-     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
-     * operation. Upon encountering any type of error while interrogating such
-     * {@link Future} the ACK loop will end. Messages that were not ACKed would
-     * be considered non-delivered and therefore could be resent at the later
-     * time.
-     *
-     * @param sendFutures
-     *            list of {@link Future}s representing results of publishing to
-     *            Kafka
-     *
-     * @param lastAckMessageIndex
-     *            the index of the last ACKed message. It is important to
-     *            provide the last ACKed message especially while re-trying so
-     *            the proper index is maintained.
-     */
-    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
-        boolean exceptionThrown = false;
-        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
-            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
-            try {
-                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
-                lastAckMessageIndex++;
-            } catch (InterruptedException e) {
-                exceptionThrown = true;
-                Thread.currentThread().interrupt();
-                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
-            } catch (ExecutionException e) {
-                exceptionThrown = true;
-                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
-            } catch (TimeoutException e) {
-                exceptionThrown = true;
-                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
-            }
-        }
-
-        return lastAckMessageIndex;
-    }
-
-    /**
-     * Will close the underlying {@link KafkaProducer} waiting if necessary for
-     * the same duration as supplied {@link #setAckWaitTime(long)}
-     */
-    @Override
-    public void close() {
-        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     *
-     */
-    private void warnOrError(String message, Exception e) {
-        if (e == null) {
-            this.componentLog.warn(message);
-        } else {
-            this.componentLog.error(message, e);
-        }
-    }
-
-    /**
-     * Encapsulates the result received from publishing messages to Kafka
-     */
-    static class KafkaPublisherResult {
-        private final int messagesSent;
-        private final int lastMessageAcked;
-        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
-            this.messagesSent = messagesSent;
-            this.lastMessageAcked = lastMessageAcked;
-        }
-
-        public int getMessagesSent() {
-            return this.messagesSent;
-        }
-
-        public int getLastMessageAcked() {
-            return this.lastMessageAcked;
-        }
-
-        public boolean isAllAcked() {
-            return this.lastMessageAcked > -1 && this.messagesSent - 1 == 
this.lastMessageAcked;
-        }
-
-        @Override
-        public String toString() {
-            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
deleted file mode 100644
index 8c948df..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-
-/**
- * Collection of implementation of common Kafka {@link Partitioner}s.
- */
-final public class Partitioners {
-
-    private Partitioners() {
-    }
-
-    /**
-     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
-     * distributes load between all available partitions.
-     */
-    public static class RoundRobinPartitioner implements Partitioner {
-        private volatile int index;
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-            // noop
-        }
-
-        @Override
-        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
-            return 
this.next(cluster.availablePartitionsForTopic(topic).size());
-        }
-
-        @Override
-        public void close() {
-            // noop
-        }
-
-        private synchronized int next(int numberOfPartitions) {
-            if (this.index >= numberOfPartitions) {
-                this.index = 0;
-            }
-            return index++;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
deleted file mode 100644
index 6703c04..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
-import 
org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner;
-
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
-@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka. The messages to send may be individual FlowFiles or may be 
delimited, using a "
-        + "user-specified delimiter, such as a new-line.")
-@DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
-                 description = "These properties will be added on the Kafka 
configuration after loading any provided configuration properties."
-        + " In the event a dynamic property represents a property that was 
already set, its value will be ignored and WARN message logged."
-        + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration.";)
-public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
-
-    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
-
-    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
-
-    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
-
-    protected static final String FAILED_KEY_ATTR = "failed.key";
-
-    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
-
-    protected static final String MSG_COUNT = "msg.count";
-
-    static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
-            "FlowFile will be routed to failure unless the message is 
replicated to the appropriate "
-                    + "number of Kafka Nodes according to the Topic 
configuration");
-    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", 
"Guarantee Single Node Delivery",
-            "FlowFile will be routed to success if the message is received by 
a single Kafka node, "
-                    + "whether or not it is replicated. This is faster than 
<Guarantee Replicated Delivery> "
-                    + "but can result in data loss if a Kafka node crashes");
-    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", 
"Best Effort",
-            "FlowFile will be routed to success after successfully writing the 
content to a Kafka node, "
-                    + "without waiting for a response. This provides the best 
performance but may result in data loss.");
-
-    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue(RoundRobinPartitioner.class.getName(),
-            RoundRobinPartitioner.class.getSimpleName(),
-            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
-                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
-    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
-            "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
-
-    static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.ACKS_CONFIG)
-            .displayName("Delivery Guarantee")
-            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
-            .required(true)
-            .expressionLanguageSupported(false)
-            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
-            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-            .build();
-    static final PropertyDescriptor META_WAIT_TIME = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
-            .displayName("Meta Data Wait Time")
-            .description("The amount of time KafkaConsumer will wait to obtain 
metadata during the 'send' call before failing the "
-                            + "entire 'send' call. Corresponds to Kafka's 
'max.block.ms' property")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .defaultValue("30 sec")
-            .build();
-    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
-            .name("kafka-key")
-            .displayName("Kafka Key")
-            .description("The Key to use for the Message")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
-    static final PropertyDescriptor MESSAGE_DEMARCATOR = 
MESSAGE_DEMARCATOR_BUILDER
-            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating apart multiple messages within "
-                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
-                            + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
-                            + "To enter special character such as 'new line' 
use CTRL+Enter or Shift+Enter depending on your OS.")
-            .build();
-    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
-            .displayName("Partitioner class")
-            .description("Specifies which class to use to compute a partition 
id for a message. Corresponds to Kafka's 'partitioner.class' property.")
-            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
-            .defaultValue(RANDOM_PARTITIONING.getValue())
-            .required(false)
-            .build();
-    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
-            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
-            .displayName("Compression Type")
-            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .allowableValues("none", "gzip", "snappy", "lz4")
-            .defaultValue("none")
-            .build();
-
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
-            .build();
-
-    static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static final Set<Relationship> RELATIONSHIPS;
-
-    private volatile String brokers;
-
-    /*
-     * Will ensure that list of PropertyDescriptors is build only once, since
-     * all other lifecycle methods are invoked multiple times.
-     */
-    static {
-        List<PropertyDescriptor> _descriptors = new ArrayList<>();
-        _descriptors.addAll(SHARED_DESCRIPTORS);
-        _descriptors.add(DELIVERY_GUARANTEE);
-        _descriptors.add(KEY);
-        _descriptors.add(MESSAGE_DEMARCATOR);
-        _descriptors.add(META_WAIT_TIME);
-        _descriptors.add(PARTITION_CLASS);
-        _descriptors.add(COMPRESSION_CODEC);
-
-        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
-
-        Set<Relationship> _relationships = new HashSet<>();
-        _relationships.addAll(SHARED_RELATIONSHIPS);
-        _relationships.add(REL_FAILURE);
-        RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
-    }
-
-    /**
-     *
-     */
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
-    }
-
-    /**
-     *
-     */
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
-    }
-
-    /**
-     * Will rendezvous with Kafka if {@link ProcessSession} contains {@link 
FlowFile}
-     * producing a result {@link FlowFile}.
-     * <br>
-     * The result {@link FlowFile} that is successful is then transfered to 
{@link #REL_SUCCESS}
-     * <br>
-     * The result {@link FlowFile} that is failed is then transfered to {@link 
#REL_FAILURE}
-     *
-     */
-    @Override
-    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session){
-        FlowFile flowFile = session.get();
-        if (flowFile != null) {
-            long start = System.nanoTime();
-            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
-            Relationship relationship = REL_SUCCESS;
-            if (!this.isFailedFlowFile(flowFile)) {
-                String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-                long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                String transitUri = 
this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), 
this.brokers, topic);
-                session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", 
executionDuration);
-                this.getLogger().info("Successfully sent {} to Kafka as {} 
message(s) in {} millis", new Object[] { flowFile, 
flowFile.getAttribute(MSG_COUNT), executionDuration });
-            } else {
-                relationship = REL_FAILURE;
-                flowFile = session.penalize(flowFile);
-            }
-            session.transfer(flowFile, relationship);
-        }
-        return flowFile != null;
-    }
-
-    /**
-     * Builds and instance of {@link KafkaPublisher}.
-     */
-    @Override
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
-        Properties kafkaProperties = this.buildKafkaProperties(context);
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        this.brokers = 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
this.getLogger());
-        return publisher;
-    }
-
-    /**
-     * Will rendezvous with {@link KafkaPublisher} after building
-     * {@link PublishingContext} and will produce the resulting {@link 
FlowFile}.
-     * The resulting FlowFile contains all required information to determine
-     * if message publishing originated from the provided FlowFile has actually
-     * succeeded fully, partially or failed completely (see
-     * {@link #isFailedFlowFile(FlowFile)}.
-     */
-    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
-        final AtomicReference<KafkaPublisherResult> publishResultRef = new 
AtomicReference<>();
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(InputStream contentStream) throws IOException {
-                PublishingContext publishingContext = 
PublishKafka.this.buildPublishingContext(flowFile, context, contentStream);
-                KafkaPublisherResult result = 
PublishKafka.this.kafkaResource.publish(publishingContext);
-                publishResultRef.set(result);
-            }
-        });
-
-        FlowFile resultFile = publishResultRef.get().isAllAcked()
-                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
-                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
-
-        if (!this.isFailedFlowFile(resultFile)) {
-            resultFile = session.putAttribute(resultFile, MSG_COUNT,  
String.valueOf(publishResultRef.get().getMessagesSent()));
-        }
-        return resultFile;
-    }
-
-    /**
-     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
-     * {@link PublishingContext} contains all contextual information required 
by
-     * {@link KafkaPublisher} to publish to Kafka. Such information contains
-     * things like topic name, content stream, delimiter, key and last ACKed
-     * message for cases where provided FlowFile is being retried (failed in 
the
-     * past).
-     * <br>
-     * For the clean FlowFile (file that has been sent for the first time),
-     * PublishingContext will be built form {@link ProcessContext} associated
-     * with this invocation.
-     * <br>
-     * For the failed FlowFile, {@link PublishingContext} will be built from
-     * attributes of that FlowFile which by then will already contain required
-     * information (e.g., topic, key, delimiter etc.). This is required to
-     * ensure the affinity of the retry in the even where processor
-     * configuration has changed. However keep in mind that failed FlowFile is
-     * only considered a failed FlowFile if it is being re-processed by the 
same
-     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
-     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent 
to
-     * another PublishKafka processor it is treated as a fresh FlowFile
-     * regardless if it has #FAILED* attributes set.
-     */
-    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context, InputStream contentStream) {
-        String topicName;
-        byte[] keyBytes;
-        byte[] delimiterBytes = null;
-        int lastAckedMessageIndex = -1;
-        if (this.isFailedFlowFile(flowFile)) {
-            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
-            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
-            keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
-                    ? 
flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
-            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != 
null
-                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
-
-        } else {
-            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-            String _key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-            keyBytes = _key == null ? null : 
_key.getBytes(StandardCharsets.UTF_8);
-            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? 
context.getProperty(MESSAGE_DEMARCATOR)
-                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
-        }
-
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setKeyBytes(keyBytes);
-        publishingContext.setDelimiterBytes(delimiterBytes);
-        return publishingContext;
-    }
-
-    /**
-     * Will remove FAILED_* attributes if FlowFile is no longer considered a
-     * failed FlowFile
-     *
-     * @see #isFailedFlowFile(FlowFile)
-     */
-    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, 
ProcessSession session) {
-        if (this.isFailedFlowFile(flowFile)) {
-            Set<String> keysToRemove = new HashSet<>();
-            keysToRemove.add(FAILED_DELIMITER_ATTR);
-            keysToRemove.add(FAILED_KEY_ATTR);
-            keysToRemove.add(FAILED_TOPIC_ATTR);
-            keysToRemove.add(FAILED_PROC_ID_ATTR);
-            keysToRemove.add(FAILED_LAST_ACK_IDX);
-            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
-        }
-        return flowFile;
-    }
-
-    /**
-     * Builds a {@link Map} of FAILED_* attributes
-     *
-     * @see #FAILED_PROC_ID_ATTR
-     * @see #FAILED_LAST_ACK_IDX
-     * @see #FAILED_TOPIC_ATTR
-     * @see #FAILED_KEY_ATTR
-     * @see #FAILED_DELIMITER_ATTR
-     */
-    private Map<String, String> buildFailedFlowFileAttributes(int 
lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) {
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
-        attributes.put(FAILED_LAST_ACK_IDX, 
String.valueOf(lastAckedMessageIndex));
-        attributes.put(FAILED_TOPIC_ATTR, 
context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_KEY_ATTR, 
context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_DELIMITER_ATTR, 
context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                        ? 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue()
 : null);
-        return attributes;
-    }
-
-    /**
-     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
-     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
-     */
-    private boolean isFailedFlowFile(FlowFile flowFile) {
-        return 
this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
deleted file mode 100644
index bda29e6..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Holder of context information used by {@link KafkaPublisher} required to
- * publish messages to Kafka.
- */
-class PublishingContext {
-
-    private final InputStream contentStream;
-
-    private final String topic;
-
-    private final int lastAckedMessageIndex;
-
-    /*
-     * We're using the default value from Kafka. We are using it to control the
-     * message size before it goes to to Kafka thus limiting possibility of a
-     * late failures in Kafka client.
-     */
-    private int maxRequestSize = 1048576; // kafka default
-
-    private boolean maxRequestSizeSet;
-
-    private byte[] keyBytes;
-
-    private byte[] delimiterBytes;
-
-    PublishingContext(InputStream contentStream, String topic) {
-        this(contentStream, topic, -1);
-    }
-
-    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
-        this.validateInput(contentStream, topic, lastAckedMessageIndex);
-        this.contentStream = contentStream;
-        this.topic = topic;
-        this.lastAckedMessageIndex = lastAckedMessageIndex;
-    }
-
-    @Override
-    public String toString() {
-        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
-    }
-
-    int getLastAckedMessageIndex() {
-        return this.lastAckedMessageIndex;
-    }
-
-    int getMaxRequestSize() {
-        return this.maxRequestSize;
-    }
-
-    byte[] getKeyBytes() {
-        return this.keyBytes;
-    }
-
-    byte[] getDelimiterBytes() {
-        return this.delimiterBytes;
-    }
-
-    InputStream getContentStream() {
-        return this.contentStream;
-    }
-
-    String getTopic() {
-        return this.topic;
-    }
-
-    void setKeyBytes(byte[] keyBytes) {
-        if (this.keyBytes == null) {
-            if (keyBytes != null) {
-                this.assertBytesValid(keyBytes);
-                this.keyBytes = keyBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
-        }
-    }
-
-    void setDelimiterBytes(byte[] delimiterBytes) {
-        if (this.delimiterBytes == null) {
-            if (delimiterBytes != null) {
-                this.assertBytesValid(delimiterBytes);
-                this.delimiterBytes = delimiterBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
-        }
-    }
-
-    void setMaxRequestSize(int maxRequestSize) {
-        if (!this.maxRequestSizeSet) {
-            if (maxRequestSize > 0) {
-                this.maxRequestSize = maxRequestSize;
-                this.maxRequestSizeSet = true;
-            } else {
-                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
-            }
-        } else {
-            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
-        }
-    }
-
-    private void assertBytesValid(byte[] bytes) {
-        if (bytes != null) {
-            if (bytes.length == 0) {
-                throw new IllegalArgumentException("'bytes' must not be 
empty");
-            }
-        }
-    }
-
-    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
-        if (contentStream == null) {
-            throw new IllegalArgumentException("'contentStream' must not be 
null");
-        } else if (topic == null || topic.trim().length() == 0) {
-            throw new IllegalArgumentException("'topic' must not be null or 
empty");
-        } else if (lastAckedMessageIndex < -1) {
-            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index 28b8393..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.kafka.pubsub.PublishKafka
-org.apache.nifi.processors.kafka.pubsub.ConsumeKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
deleted file mode 100644
index 7f5cbf7..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html
+++ /dev/null
@@ -1,33 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>ConsumeKafka</title>
-        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
-    </head>
-
-    <body>
-        <!-- Processor Documentation 
================================================== -->
-        <h2>Description:</h2>
-        <p>
-            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
-            for data using KafkaConsumer API available with Kafka 0.9+. When a 
message is received 
-            from Kafka, this Processor emits a FlowFile where the content of 
the FlowFile is the value 
-            of the Kafka message.
-        </p>
-    </body>
-</html>

Reply via email to