This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 74ea3840ac98c8deff1ab83f673cc8fcb7072bcd Author: Mark Payne <[email protected]> AuthorDate: Tue Mar 23 13:53:26 2021 -0400 NIFI-8357: Updated Kafka 2.6 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions This closes #4926. Signed-off-by: Peter Turcsanyi <[email protected]> --- .../processors/kafka/pubsub/ConsumerLease.java | 4 + .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++------- .../processors/kafka/pubsub/ConsumerPoolTest.java | 85 ++++++++++++++ 3 files changed, 175 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index e3e6124..4ba7c8b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -164,6 +164,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } + public List<TopicPartition> getAssignedPartitions() { + return null; + } + /** * Executes a poll on the underlying Kafka Consumer and creates any new * flowfiles necessary or appends to existing ones if in demarcation mode. diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index 591480a..0895733 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +67,7 @@ public class ConsumerPool implements Closeable { private final AtomicLong consumerCreatedCountRef = new AtomicLong(); private final AtomicLong consumerClosedCountRef = new AtomicLong(); private final AtomicLong leasesObtainedCountRef = new AtomicLong(); + private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<>(); /** * Creates a pool of KafkaConsumer objects that will grow up to the maximum @@ -119,7 +121,7 @@ public class ConsumerPool implements Closeable { this.headerNamePattern = headerNamePattern; this.separateByKey = separateByKey; this.partitionsToConsume = partitionsToConsume; - enqueueLeases(partitionsToConsume); + enqueueAssignedPartitions(partitionsToConsume); } public ConsumerPool( @@ -154,7 +156,7 @@ public class ConsumerPool implements Closeable { this.headerNamePattern = headerNamePattern; this.separateByKey = separateByKey; this.partitionsToConsume = partitionsToConsume; - enqueueLeases(partitionsToConsume); + enqueueAssignedPartitions(partitionsToConsume); } public ConsumerPool( @@ -190,7 +192,7 @@ public class ConsumerPool implements Closeable { this.separateByKey = separateByKey; this.keyEncoding = keyEncoding; this.partitionsToConsume = partitionsToConsume; - enqueueLeases(partitionsToConsume); + enqueueAssignedPartitions(partitionsToConsume); } public ConsumerPool( @@ -226,7 +228,7 @@ public class ConsumerPool implements Closeable { this.separateByKey = separateByKey; this.keyEncoding = keyEncoding; this.partitionsToConsume = partitionsToConsume; - enqueueLeases(partitionsToConsume); + enqueueAssignedPartitions(partitionsToConsume); } public int getPartitionCount() { @@ -262,66 +264,97 @@ public class ConsumerPool implements Closeable { * @return consumer to use or null if not available or necessary */ public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) { + // If there are any partition assignments that do not have leases in our pool, create the leases and add them to the pool. + // This is not necessary for us to handle if using automatic subscriptions because the Kafka protocol will ensure that each consumer + // has the appropriate partitions. However, if we are using explicit assignment, it's important to create these leases and add them + // to our pool in order to avoid starvation. E.g., if we have only a single concurrent task and 5 partitions assigned, we cannot simply + // wait until pooledLeases.poll() returns null to create a new ConsumerLease, as doing so may result in constantly pulling from only a + // single partition (since we'd get a Lease for Partition 1, then use it, and put it back in the pool). + recreateAssignedConsumers(); + SimpleConsumerLease lease = pooledLeases.poll(); if (lease == null) { - final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); - consumerCreatedCountRef.incrementAndGet(); - /** - * For now return a new consumer lease. But we could later elect to - * have this return null if we determine the broker indicates that - * the lag time on all topics being monitored is sufficiently low. - * For now we should encourage conservative use of threads because - * having too many means we'll have at best useless threads sitting - * around doing frequent network calls and at worst having consumers - * sitting idle which could prompt excessive rebalances. - */ - lease = new SimpleConsumerLease(consumer); - - if (partitionsToConsume == null) { - // This subscription tightly couples the lease to the given - // consumer. They cannot be separated from then on. - if (topics != null) { - consumer.subscribe(topics, lease); - } else { - consumer.subscribe(topicPattern, lease); - } - } else { - logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease."); + lease = createConsumerLease(); + if (lease == null) { return null; } } + lease.setProcessSession(session, processContext); leasesObtainedCountRef.incrementAndGet(); return lease; } - private SimpleConsumerLease createConsumerLease(final int partition) { - final List<TopicPartition> topicPartitions = new ArrayList<>(); - for (final String topic : topics) { - final TopicPartition topicPartition = new TopicPartition(topic, partition); - topicPartitions.add(topicPartition); + private void recreateAssignedConsumers() { + List<TopicPartition> topicPartitions; + while ((topicPartitions = availableTopicPartitions.poll()) != null) { + final SimpleConsumerLease simpleConsumerLease = createConsumerLease(topicPartitions); + pooledLeases.add(simpleConsumerLease); + } + } + + private SimpleConsumerLease createConsumerLease() { + if (partitionsToConsume != null) { + logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease."); + return null; } final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); consumerCreatedCountRef.incrementAndGet(); + + /* + * For now return a new consumer lease. But we could later elect to + * have this return null if we determine the broker indicates that + * the lag time on all topics being monitored is sufficiently low. + * For now we should encourage conservative use of threads because + * having too many means we'll have at best useless threads sitting + * around doing frequent network calls and at worst having consumers + * sitting idle which could prompt excessive rebalances. + */ + final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null); + + // This subscription tightly couples the lease to the given + // consumer. They cannot be separated from then on. + if (topics == null) { + consumer.subscribe(topicPattern, lease); + } else { + consumer.subscribe(topics, lease); + } + + return lease; + } + + private SimpleConsumerLease createConsumerLease(final List<TopicPartition> topicPartitions) { + final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); + consumerCreatedCountRef.incrementAndGet(); consumer.assign(topicPartitions); - final SimpleConsumerLease lease = new SimpleConsumerLease(consumer); + final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions); return lease; } - private void enqueueLeases(final int[] partitionsToConsume) { + private void enqueueAssignedPartitions(final int[] partitionsToConsume) { if (partitionsToConsume == null) { return; } for (final int partition : partitionsToConsume) { - final SimpleConsumerLease lease = createConsumerLease(partition); - pooledLeases.add(lease); + final List<TopicPartition> topicPartitions = createTopicPartitions(partition); + availableTopicPartitions.offer(topicPartitions); } } + private List<TopicPartition> createTopicPartitions(final int partition) { + final List<TopicPartition> topicPartitions = new ArrayList<>(); + for (final String topic : topics) { + final TopicPartition topicPartition = new TopicPartition(topic, partition); + topicPartitions.add(topicPartition); + } + + return topicPartitions; + } + /** * Exposed as protected method for easier unit testing * @@ -371,16 +404,17 @@ public class ConsumerPool implements Closeable { } private class SimpleConsumerLease extends ConsumerLease { - private final Consumer<byte[], byte[]> consumer; + private final List<TopicPartition> assignedPartitions; private volatile ProcessSession session; private volatile ProcessContext processContext; private volatile boolean closedConsumer; - private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) { + private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) { super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey); this.consumer = consumer; + this.assignedPartitions = assignedPartitions; } void setProcessSession(final ProcessSession session, final ProcessContext context) { @@ -389,6 +423,11 @@ public class ConsumerPool implements Closeable { } @Override + public List<TopicPartition> getAssignedPartitions() { + return assignedPartitions; + } + + @Override public void yield() { if (processContext != null) { processContext.yield(); @@ -410,18 +449,27 @@ public class ConsumerPool implements Closeable { if (closedConsumer) { return; } + super.close(); if (session != null) { session.rollback(); setProcessSession(null, null); } + if (forceClose || isPoisoned() || !pooledLeases.offer(this)) { closedConsumer = true; closeConsumer(consumer); + + // If explicit topic/partition assignment is used, make the assignments for this Lease available again. + if (assignedPartitions != null) { + logger.debug("Adding partitions {} back to the pool", assignedPartitions); + availableTopicPartitions.offer(assignedPartitions); + } } } } + static final class PoolStats { final long consumerCreatedCount; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 195d2cb..218fb25 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -41,6 +41,9 @@ import java.util.UUID; import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -158,6 +161,88 @@ public class ConsumerPoolTest { } @Test + public void testConsumerCreatedOnDemand() { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { + final List<ConsumerLease> created = new ArrayList<>(); + try { + for (int i = 0; i < 3; i++) { + final ConsumerLease newLease = testPool.obtainConsumer(mockSession, mockContext); + created.add(newLease); + assertNotSame(lease, newLease); + } + } finally { + created.forEach(ConsumerLease::close); + } + } + } + + @Test + public void testConsumerNotCreatedOnDemandWhenUsingStaticAssignment() { + final ConsumerPool staticAssignmentPool = new ConsumerPool( + 1, + null, + false, + Collections.emptyMap(), + Collections.singletonList("nifi"), + 100L, + "utf-8", + "ssl", + "localhost", + logger, + true, + StandardCharsets.UTF_8, + null, + new int[] {1, 2, 3}) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + + try (final ConsumerLease lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext)) { + ConsumerLease partition2Lease = null; + ConsumerLease partition3Lease = null; + + try { + partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext); + assertNotSame(lease, partition2Lease); + assertEquals(1, partition2Lease.getAssignedPartitions().size()); + assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition()); + + partition3Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext); + assertNotSame(lease, partition3Lease); + assertNotSame(partition2Lease, partition3Lease); + assertEquals(1, partition3Lease.getAssignedPartitions().size()); + assertEquals(3, partition3Lease.getAssignedPartitions().get(0).partition()); + + final ConsumerLease nullLease = staticAssignmentPool.obtainConsumer(mockSession, mockContext); + assertNull(nullLease); + + // Close the lease for Partition 2. We should now be able to get another Lease for Partition 2. + partition2Lease.close(); + + partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext); + assertNotNull(partition2Lease); + + assertEquals(1, partition2Lease.getAssignedPartitions().size()); + assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition()); + + assertNull(staticAssignmentPool.obtainConsumer(mockSession, mockContext)); + } finally { + closeLeases(partition2Lease, partition3Lease); + } + } + } + + private void closeLeases(final ConsumerLease... leases) { + for (final ConsumerLease lease : leases) { + if (lease != null) { + lease.close(); + } + } + } + + @Test public void validatePoolSimpleBatchCreateClose() throws Exception { when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); for (int i = 0; i < 100; i++) {
