This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2f08d1f466b9f6f0b0b8a7b5893341a0d1433a4e
Author: Mark Payne <[email protected]>
AuthorDate: Tue Mar 23 13:53:13 2021 -0400

    NIFI-8357: Updated Kafka 2.0 processors to automatically handle recreating 
Consumer Lease objects when an existing one is poisoned, even if using 
statically assigned partitions
    
    This closes #4926.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../processors/kafka/pubsub/ConsumerLease.java     |   4 +
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++-------
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  86 ++++++++++++++
 3 files changed, 175 insertions(+), 39 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 729c801..d1f47e4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -164,6 +164,10 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
         logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' 
with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
     }
 
+    public List<TopicPartition> getAssignedPartitions() {
+        return null;
+    }
+
     /**
      * Executes a poll on the underlying Kafka Consumer and creates any new
      * flowfiles necessary or appends to existing ones if in demarcation mode.
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 60f301b..e8603ff 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ public class ConsumerPool implements Closeable {
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+    private final Queue<List<TopicPartition>> availableTopicPartitions = new 
LinkedBlockingQueue<>();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
@@ -119,7 +121,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -154,7 +156,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -190,7 +192,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -226,7 +228,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public int getPartitionCount() {
@@ -262,67 +264,97 @@ public class ConsumerPool implements Closeable {
      * @return consumer to use or null if not available or necessary
      */
     public ConsumerLease obtainConsumer(final ProcessSession session, final 
ProcessContext processContext) {
+        // If there are any partition assignments that do not have leases in 
our pool, create the leases and add them to the pool.
+        // This is not necessary for us to handle if using automatic 
subscriptions because the Kafka protocol will ensure that each consumer
+        // has the appropriate partitions. However, if we are using explicit 
assignment, it's important to create these leases and add them
+        // to our pool in order to avoid starvation. E.g., if we have only a 
single concurrent task and 5 partitions assigned, we cannot simply
+        // wait until pooledLeases.poll() returns null to create a new 
ConsumerLease, as doing so may result in constantly pulling from only a
+        // single partition (since we'd get a Lease for Partition 1, then use 
it, and put it back in the pool).
+        recreateAssignedConsumers();
+
         SimpleConsumerLease lease = pooledLeases.poll();
         if (lease == null) {
-            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
-            consumerCreatedCountRef.incrementAndGet();
-
-            /*
-             * For now return a new consumer lease. But we could later elect to
-             * have this return null if we determine the broker indicates that
-             * the lag time on all topics being monitored is sufficiently low.
-             * For now we should encourage conservative use of threads because
-             * having too many means we'll have at best useless threads sitting
-             * around doing frequent network calls and at worst having 
consumers
-             * sitting idle which could prompt excessive rebalances.
-             */
-            lease = new SimpleConsumerLease(consumer);
-
-            if (partitionsToConsume == null) {
-                // This subscription tightly couples the lease to the given
-                // consumer. They cannot be separated from then on.
-                if (topics != null) {
-                    consumer.subscribe(topics, lease);
-                } else {
-                    consumer.subscribe(topicPattern, lease);
-                }
-            } else {
-                logger.debug("Cannot obtain lease to communicate with Kafka. 
Since partitions are explicitly assigned, cannot create a new lease.");
+            lease = createConsumerLease();
+            if (lease == null) {
                 return null;
             }
         }
+
         lease.setProcessSession(session, processContext);
 
         leasesObtainedCountRef.incrementAndGet();
         return lease;
     }
 
-    private SimpleConsumerLease createConsumerLease(final int partition) {
-        final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (final String topic : topics) {
-            final TopicPartition topicPartition = new TopicPartition(topic, 
partition);
-            topicPartitions.add(topicPartition);
+    private void recreateAssignedConsumers() {
+        List<TopicPartition> topicPartitions;
+        while ((topicPartitions = availableTopicPartitions.poll()) != null) {
+            final SimpleConsumerLease simpleConsumerLease = 
createConsumerLease(topicPartitions);
+            pooledLeases.add(simpleConsumerLease);
+        }
+    }
+
+    private SimpleConsumerLease createConsumerLease() {
+        if (partitionsToConsume != null) {
+            logger.debug("Cannot obtain lease to communicate with Kafka. Since 
partitions are explicitly assigned, cannot create a new lease.");
+            return null;
         }
 
         final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
         consumerCreatedCountRef.incrementAndGet();
+
+        /*
+         * For now return a new consumer lease. But we could later elect to
+         * have this return null if we determine the broker indicates that
+         * the lag time on all topics being monitored is sufficiently low.
+         * For now we should encourage conservative use of threads because
+         * having too many means we'll have at best useless threads sitting
+         * around doing frequent network calls and at worst having consumers
+         * sitting idle which could prompt excessive rebalances.
+         */
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, 
null);
+
+        // This subscription tightly couples the lease to the given
+        // consumer. They cannot be separated from then on.
+        if (topics == null) {
+            consumer.subscribe(topicPattern, lease);
+        } else {
+            consumer.subscribe(topics, lease);
+        }
+
+        return lease;
+    }
+
+    private SimpleConsumerLease createConsumerLease(final List<TopicPartition> 
topicPartitions) {
+        final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+        consumerCreatedCountRef.incrementAndGet();
         consumer.assign(topicPartitions);
 
-        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, 
topicPartitions);
         return lease;
     }
 
-    private void enqueueLeases(final int[] partitionsToConsume) {
+    private void enqueueAssignedPartitions(final int[] partitionsToConsume) {
         if (partitionsToConsume == null) {
             return;
         }
 
         for (final int partition : partitionsToConsume) {
-            final SimpleConsumerLease lease = createConsumerLease(partition);
-            pooledLeases.add(lease);
+            final List<TopicPartition> topicPartitions = 
createTopicPartitions(partition);
+            availableTopicPartitions.offer(topicPartitions);
         }
     }
 
+    private List<TopicPartition> createTopicPartitions(final int partition) {
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (final String topic : topics) {
+            final TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+            topicPartitions.add(topicPartition);
+        }
+
+        return topicPartitions;
+    }
+
     /**
      * Exposed as protected method for easier unit testing
      *
@@ -372,16 +404,17 @@ public class ConsumerPool implements Closeable {
     }
 
     private class SimpleConsumerLease extends ConsumerLease {
-
         private final Consumer<byte[], byte[]> consumer;
+        private final List<TopicPartition> assignedPartitions;
         private volatile ProcessSession session;
         private volatile ProcessContext processContext;
         private volatile boolean closedConsumer;
 
-        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, 
final List<TopicPartition> assignedPartitions) {
             super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, 
securityProtocol, bootstrapServers,
                 readerFactory, writerFactory, logger, headerCharacterSet, 
headerNamePattern, separateByKey);
             this.consumer = consumer;
+            this.assignedPartitions = assignedPartitions;
         }
 
         void setProcessSession(final ProcessSession session, final 
ProcessContext context) {
@@ -390,6 +423,11 @@ public class ConsumerPool implements Closeable {
         }
 
         @Override
+        public List<TopicPartition> getAssignedPartitions() {
+            return assignedPartitions;
+        }
+
+        @Override
         public void yield() {
             if (processContext != null) {
                 processContext.yield();
@@ -411,14 +449,22 @@ public class ConsumerPool implements Closeable {
             if (closedConsumer) {
                 return;
             }
+
             super.close();
             if (session != null) {
                 session.rollback();
                 setProcessSession(null, null);
             }
+
             if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
                 closedConsumer = true;
                 closeConsumer(consumer);
+
+                // If explicit topic/partition assignment is used, make the 
assignments for this Lease available again.
+                if (assignedPartitions != null) {
+                    logger.debug("Adding partitions {} back to the pool", 
assignedPartitions);
+                    availableTopicPartitions.offer(assignedPartitions);
+                }
             }
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 195d2cb..347bf02 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -41,6 +41,9 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -158,6 +161,89 @@ public class ConsumerPoolTest {
     }
 
     @Test
+    public void testConsumerCreatedOnDemand() {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            final List<ConsumerLease> created = new ArrayList<>();
+            try {
+                for (int i = 0; i < 3; i++) {
+                    final ConsumerLease newLease = 
testPool.obtainConsumer(mockSession, mockContext);
+                    created.add(newLease);
+                    assertNotSame(lease, newLease);
+                }
+            } finally {
+                created.forEach(ConsumerLease::close);
+            }
+        }
+    }
+
+    @Test
+    public void testConsumerNotCreatedOnDemandWhenUsingStaticAssignment() {
+        final ConsumerPool staticAssignmentPool = new ConsumerPool(
+            1,
+            null,
+            false,
+            Collections.emptyMap(),
+            Collections.singletonList("nifi"),
+            100L,
+            "utf-8",
+            "ssl",
+            "localhost",
+            logger,
+            true,
+            StandardCharsets.UTF_8,
+            null,
+            new int[] {1, 2, 3}) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        try (final ConsumerLease lease = 
staticAssignmentPool.obtainConsumer(mockSession, mockContext)) {
+            ConsumerLease partition2Lease = null;
+            ConsumerLease partition3Lease = null;
+
+            try {
+                partition2Lease = 
staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition2Lease);
+                assertEquals(1, 
partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, 
partition2Lease.getAssignedPartitions().get(0).partition());
+
+                partition3Lease = 
staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition3Lease);
+                assertNotSame(partition2Lease, partition3Lease);
+                assertEquals(1, 
partition3Lease.getAssignedPartitions().size());
+                assertEquals(3, 
partition3Lease.getAssignedPartitions().get(0).partition());
+
+                final ConsumerLease nullLease = 
staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNull(nullLease);
+
+                // Close the lease for Partition 2. We should now be able to 
get another Lease for Partition 2.
+                partition2Lease.close();
+
+                partition2Lease = 
staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotNull(partition2Lease);
+
+                assertEquals(1, 
partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, 
partition2Lease.getAssignedPartitions().get(0).partition());
+
+                assertNull(staticAssignmentPool.obtainConsumer(mockSession, 
mockContext));
+            } finally {
+                closeLeases(partition2Lease, partition3Lease);
+            }
+        }
+    }
+
+    private void closeLeases(final ConsumerLease... leases) {
+        for (final ConsumerLease lease : leases) {
+            if (lease != null) {
+                lease.close();
+            }
+        }
+    }
+
+
+    @Test
     public void validatePoolSimpleBatchCreateClose() throws Exception {
         
when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi",
 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {

Reply via email to