Repository: nifi
Updated Branches:
  refs/heads/master 088125451 -> 7a451935a


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..fba8cb5 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
 
 /**
  * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class ConsumerPool implements Closeable {
 
-    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
-    private final int maxLeases;
-    private final Queue<ConsumerLease> consumerLeases;
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
     private final List<String> topics;
     private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
     private final ComponentLog logger;
-
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
-    private final AtomicLong productivePollCountRef = new AtomicLong();
-    private final AtomicLong unproductivePollCountRef = new AtomicLong();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
-     * indicated leases. Consumers are lazily initialized.
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
      *
-     * @param maxLeases maximum number of active leases in the pool
-     * @param topics the topics to consume from
-     * @param kafkaProperties the properties for each consumer
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
      * @param logger the logger to report any errors/warnings
      */
-    public ConsumerPool(final int maxLeases, final List<String> topics, final 
Map<String, String> kafkaProperties, final ComponentLog logger) {
-        this.maxLeases = maxLeases;
-        if (maxLeases <= 0) {
-            throw new IllegalArgumentException("Max leases value must be 
greather than zero.");
-        }
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, String> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
-        if (topics == null || topics.isEmpty()) {
-            throw new IllegalArgumentException("Must have a list of one or 
more topics");
-        }
-        this.topics = topics;
-        this.kafkaProperties = new HashMap<>(kafkaProperties);
-        this.consumerLeases = new ArrayDeque<>();
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
     }
 
     /**
-     * Obtains a consumer from the pool if one is available
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
      *
-     * @return consumer from the pool
-     * @throws IllegalArgumentException if pool already contains
+     * @param session the session for which the consumer lease will be
+     * associated
+     * @return consumer to use or null if not available or necessary
      */
-    public ConsumerLease obtainConsumer() {
-        final ConsumerLease lease;
-        final int activeLeases;
-        synchronized (this) {
-            lease = consumerLeases.poll();
-            activeLeases = activeLeaseCount.get();
-        }
-        if (lease == null && activeLeases >= maxLeases) {
-            logger.warn("No available consumers and cannot create any as max 
consumer leases limit reached - verify pool settings");
-            return null;
+    public ConsumerLease obtainConsumer(final ProcessSession session) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having 
consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            consumer.subscribe(topics, lease);
         }
+        lease.setProcessSession(session);
         leasesObtainedCountRef.incrementAndGet();
-        return (lease == null) ? createConsumer() : lease;
+        return lease;
     }
 
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
     protected Consumer<byte[], byte[]> createKafkaConsumer() {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
-    private ConsumerLease createConsumer() {
-        final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
-        consumerCreatedCountRef.incrementAndGet();
-        try {
-            kafkaConsumer.subscribe(topics);
-        } catch (final KafkaException kex) {
-            try {
-                kafkaConsumer.close();
-                consumerClosedCountRef.incrementAndGet();
-            } catch (final Exception ex) {
-                consumerClosedCountRef.incrementAndGet();
-                //ignore
-            }
-            throw kex;
-        }
-
-        final ConsumerLease lease = new ConsumerLease() {
-
-            private volatile boolean poisoned = false;
-            private volatile boolean closed = false;
-
-            @Override
-            public ConsumerRecords<byte[], byte[]> poll() {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and 
should no longer be used");
-                }
-
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = 
kafkaConsumer.poll(50);
-                    if (records.isEmpty()) {
-                        unproductivePollCountRef.incrementAndGet();
-                    } else {
-                        productivePollCountRef.incrementAndGet();
-                    }
-                    return records;
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to poll from Kafka consumer so will 
poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void commitOffsets(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and 
should no longer be used");
-                }
-                try {
-                    kafkaConsumer.commitSync(offsets);
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to commit kafka consumer offsets so 
will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void close() {
-                if (closed) {
-                    return;
-                }
-                if (poisoned || activeLeaseCount.get() > maxLeases) {
-                    closeConsumer(kafkaConsumer);
-                    activeLeaseCount.decrementAndGet();
-                    closed = true;
-                } else {
-                    final boolean added;
-                    synchronized (ConsumerPool.this) {
-                        added = consumerLeases.offer(this);
-                    }
-                    if (!added) {
-                        closeConsumer(kafkaConsumer);
-                        activeLeaseCount.decrementAndGet();
-                    }
-                }
-            }
-
-            @Override
-            public void poison() {
-                poisoned = true;
-            }
-        };
-        activeLeaseCount.incrementAndGet();
-        return lease;
-    }
-
     /**
-     * Closes all consumers in the pool. Can be safely recalled.
+     * Closes all consumers in the pool. Can be safely called repeatedly.
      */
     @Override
     public void close() {
-        final List<ConsumerLease> leases = new ArrayList<>();
-        synchronized (this) {
-            ConsumerLease lease = null;
-            while ((lease = consumerLeases.poll()) != null) {
-                leases.add(lease);
-            }
-        }
-        for (final ConsumerLease lease : leases) {
-            lease.poison();
-            lease.close();
-        }
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
     }
 
     private void closeConsumer(final Consumer consumer) {
+        consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();
         } catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
 
         try {
             consumer.close();
-            consumerClosedCountRef.incrementAndGet();
         } catch (Exception e) {
-            consumerClosedCountRef.incrementAndGet();
             logger.warn("Failed while closing " + consumer, e);
         }
     }
 
     PoolStats getPoolStats() {
-        return new PoolStats(consumerCreatedCountRef.get(), 
consumerClosedCountRef.get(), leasesObtainedCountRef.get(), 
productivePollCountRef.get(), unproductivePollCountRef.get());
+        return new PoolStats(consumerCreatedCountRef.get(), 
consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, 
securityProtocol, bootstrapServers, logger);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
     }
 
     static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
         final long consumerCreatedCount;
         final long consumerClosedCount;
         final long leasesObtainedCount;
-        final long productivePollCount;
-        final long unproductivePollCount;
 
         PoolStats(
                 final long consumerCreatedCount,
                 final long consumerClosedCount,
-                final long leasesObtainedCount,
-                final long productivePollCount,
-                final long unproductivePollCount
+                final long leasesObtainedCount
         ) {
             this.consumerCreatedCount = consumerCreatedCount;
             this.consumerClosedCount = consumerClosedCount;
             this.leasesObtainedCount = leasesObtainedCount;
-            this.productivePollCount = productivePollCount;
-            this.unproductivePollCount = unproductivePollCount;
         }
 
         @Override
         public String toString() {
             return "Created Consumers [" + consumerCreatedCount + "]\n"
                     + "Closed Consumers  [" + consumerClosedCount + "]\n"
-                    + "Leases Obtained   [" + leasesObtainedCount + "]\n"
-                    + "Productive Polls  [" + productivePollCount + "]\n"
-                    + "Unproductive Polls  [" + unproductivePollCount + "]\n";
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
         }
 
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index c74ad18..707a431 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
 
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
+            "The key is interpreted as arbitrary binary data and is encoded 
using hexadecimal characters with uppercase letters");
+
     static final Pattern HEX_KEY_PATTERN = 
Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
     static final String KAFKA_KEY = "kafka.key";
@@ -96,7 +100,6 @@ final class KafkaProcessorUtils {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(false)
             .build();
-
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("ssl.context.service")
             .displayName("SSL Context Service")
@@ -227,7 +230,6 @@ final class KafkaProcessorUtils {
                     mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, 
sslContextService.getTrustStoreType());
                 }
             }
-
             String pName = propertyDescriptor.getName();
             String pValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7e4b12c..8e3fa3b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,105 +16,36 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
-    static class MockConsumerPool extends ConsumerPool {
-
-        final int actualMaxLeases;
-        final List<String> actualTopics;
-        final Map<String, String> actualKafkaProperties;
-        boolean throwKafkaExceptionOnPoll = false;
-        boolean throwKafkaExceptionOnCommit = false;
-        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new 
ArrayDeque<>();
-        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = 
null;
-        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
-        boolean wasConsumerLeasePoisoned = false;
-        boolean wasConsumerLeaseClosed = false;
-        boolean wasPoolClosed = false;
-
-        public MockConsumerPool(int maxLeases, List<String> topics, 
Map<String, String> kafkaProperties, ComponentLog logger) {
-            super(maxLeases, topics, kafkaProperties, null);
-            actualMaxLeases = maxLeases;
-            actualTopics = topics;
-            actualKafkaProperties = kafkaProperties;
-        }
-
-        @Override
-        public ConsumerLease obtainConsumer() {
-            return new ConsumerLease() {
-                @Override
-                public ConsumerRecords<byte[], byte[]> poll() {
-                    if (throwKafkaExceptionOnPoll) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    final ConsumerRecords<byte[], byte[]> records = 
nextPlannedRecordsQueue.poll();
-                    return (records == null) ? ConsumerRecords.empty() : 
records;
-                }
-
-                @Override
-                public void commitOffsets(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-                    if (throwKafkaExceptionOnCommit) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    actualCommitOffsets = offsets;
-                }
-
-                @Override
-                public void poison() {
-                    wasConsumerLeasePoisoned = true;
-                }
-
-                @Override
-                public void close() {
-                    wasConsumerLeaseClosed = true;
-                }
-            };
-        }
-
-        @Override
-        public void close() {
-            wasPoolClosed = true;
-        }
-
-        void resetState() {
-            throwKafkaExceptionOnPoll = false;
-            throwKafkaExceptionOnCommit = false;
-            nextPlannedRecordsQueue = null;
-            nextExpectedCommitOffsets = null;
-            wasConsumerLeasePoisoned = false;
-            wasConsumerLeaseClosed = false;
-            wasPoolClosed = false;
-        }
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
 
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
     }
 
     @Test
@@ -175,365 +106,45 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = 
createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
-            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("bar", 1)).offset());
-        } else {
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
-        }
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @Test
-    public void validateGetLotsOfMessages() throws Exception {
-        String groupName = "validateGetLotsOfMessages";
-
-        final byte[][] firstPassValues = new byte[10010][1];
-        for (final byte[] value : firstPassValues) {
-            value[0] = 0x12;
-        }
-        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = 
createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(10010, flowFiles.stream().map(ff -> 
ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 
0x12).count());
-        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
-        assertEquals(1, mockPool.actualCommitOffsets.size());
-        assertEquals(10011L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String 
topic, final int partition, final long startingOffset, final byte[][] 
rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-        for (final byte[] rawRecord : rawRecords) {
-            final ConsumerRecord<byte[], byte[]> rec = new 
ConsumerRecord(topic, partition, offset++, 
UUID.randomUUID().toString().getBytes(), rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String 
topic, final int partition, final long startingOffset, final Map<byte[], 
byte[]> rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-
-        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
-            final byte[] key = entry.getKey();
-            final byte[] rawRecord = entry.getValue();
-            final ConsumerRecord<byte[], byte[]> rec = new 
ConsumerRecord(topic, partition, offset++, key, rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    private ConsumerRecords<byte[], byte[]> mergeRecords(final 
ConsumerRecords<byte[], byte[]>... records) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
-        for (final ConsumerRecords<byte[], byte[]> rec : records) {
-            rec.partitions().stream().forEach((part) -> {
-                final List<ConsumerRecord<byte[], byte[]>> conRecs = 
rec.records(part);
-                if (map.get(part) != null) {
-                    throw new IllegalStateException("already have that 
topic/partition in the record map");
-                }
-                map.put(part, conRecs);
-            });
-        }
-        return new ConsumerRecords<>(map);
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws 
Exception {
-        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues),
-                createConsumerRecords("bar", 1, 1L, secondPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
-
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-1blahHello-2blahHello-3")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-4blahHello-5blahHello-6")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertEquals(2, mockPool.actualCommitOffsets.size());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("foo", 1)).offset());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new 
TopicPartition("bar", 1)).offset());
-    }
-
-    @Test
-    public void validatePollException() throws Exception {
-        String groupName = "validatePollException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnPoll = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(0, flowFiles.size());
-        assertNull(null, mockPool.actualCommitOffsets);
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateCommitOffsetException() throws Exception {
-        String groupName = "validateCommitOffsetException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnCommit = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(1, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> 
content.equals("Hello-1blahHello-2blahHello-3")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertNull(null, mockPool.actualCommitOffsets);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
     @Test
-    public void validateUtf8Key() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
 
-        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -542,89 +153,15 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> 
"key1".equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == 
null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> 
"".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
-    @Test
-    public void validateHexKey() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, 
expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, 
final List<String> topics, final Map<String, String> props, final ComponentLog 
log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, 
ConsumeKafka.HEX_ENCODING);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new 
String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        final String expectedHex = (Integer.toHexString('k') + 
Integer.toHexString('e') + Integer.toHexString('y') + 
Integer.toHexString('1')).toUpperCase();
-
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> 
expectedHex.equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == 
null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> 
ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> 
"".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
     Consumer<byte[], byte[]> consumer = null;
+    ProcessSession mockSession = null;
+    ProvenanceReporter mockReporter = null;
+    ConsumerPool testPool = null;
+    ConsumerPool testDemarcatedPool = null;
     ComponentLog logger = null;
 
     @Before
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
-    }
-
-    @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
             }
         };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
 
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, 
new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
             lease.poll();
-            lease.commitOffsets(Collections.emptyMap());
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
-        assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(1, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
+        assertEquals(4, stats.leasesObtainedCount);
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(5, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, 
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
 
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, 
new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            try (final ConsumerLease lease = 
testPool.obtainConsumer(mockSession)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
-                lease.commitOffsets(Collections.emptyMap());
             }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(100, stats.leasesObtainedCount);
-        assertEquals(10000, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
 
     @Test
-    public void validatePoolConsumerFails() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, 
Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, 
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = 
testDemarcatedPool.obtainConsumer(mockSession)) {
             lease.poll();
-            fail();
-        } catch (final KafkaException ke) {
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) 
{
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
 
+            }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(0, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String 
topic, final int partition, final long startingOffset, final byte[][] 
rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new 
ConsumerRecord(topic, partition, offset++, 
UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
 }

Reply via email to