exceptionfactory commented on code in PR #10538:
URL: https://github.com/apache/nifi/pull/10538#discussion_r2558557142


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.processors.consumer;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestConsumerPartitionsUtil {
+    private final ComponentLog logger = mock();
+    private String hostname;
+
+    @BeforeEach
+    public void setup() throws UnknownHostException {
+        hostname = InetAddress.getLocalHost().getHostName();
+    }
+
+    @Test
+    public void testNoPartitionAssignments() throws UnknownHostException {
+        final Map<String, String> properties = Collections.singletonMap("key", 
"value");

Review Comment:
   ```suggestion
           final Map<String, String> properties = Map.of("key", "value");
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Subscription.java:
##########
@@ -31,19 +31,22 @@ public class Subscription {
 
     private final String groupId;
     private final Collection<String> topics;
+    private final Integer partition;
     private final Pattern topicPattern;
     private final AutoOffsetReset autoOffsetReset;
 
-    public Subscription(final String groupId, final Collection<String> topics, 
final AutoOffsetReset autoOffsetReset) {
+    public Subscription(final String groupId, final Integer partition, final 
Collection<String> topics, final AutoOffsetReset autoOffsetReset) {

Review Comment:
   Very minor, but I recommend placing `partition` after `topics` to align with 
general hierarchy:
   
   ```suggestion
       public Subscription(final String groupId, final Collection<String> 
topics, final Integer partition, final AutoOffsetReset autoOffsetReset) {
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/ConsumerPartitionsUtil.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.processors.consumer;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+    public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+    public static int[] getPartitionsForHost(final Map<String, String> 
properties, final ComponentLog logger) throws UnknownHostException {
+        final Map<String, String> hostnameToPartitionString = 
mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitionsByHost = 
mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+        if (partitionsByHost.isEmpty()) {
+            // Explicit partitioning is not enabled.
+            logger.debug("No explicit Consumer Partitions have been 
declared.");
+            return null;
+        }
+
+        logger.info("Found the following mapping of hosts to partitions: {}", 
hostnameToPartitionString);

Review Comment:
   This seems more appropriate as a debug level message.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.processors.consumer;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestConsumerPartitionsUtil {

Review Comment:
   Minor note, the `public` modifiers on the class and method level are not 
necessary for JUnit 5. Since this is a new test class, recommend removing them.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         final PollingContext pollingContext = createPollingContext(context);
-        final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext);
 
-        final ConfigVerificationResult.Builder verificationPartitions = new 
ConfigVerificationResult.Builder()
-                .verificationStepName("Verify Topic Partitions");
-
-        try {
-            final List<PartitionState> partitionStates = 
consumerService.getPartitionStates();
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                    .explanation(String.format("Partitions [%d] found for 
Topics %s", partitionStates.size(), pollingContext.getTopics()));
-        } catch (final Exception e) {
-            getLogger().error("Topics {} Partition verification failed", 
pollingContext.getTopics(), e);
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.FAILED)
-                    .explanation(String.format("Topics %s Partition access 
failed: %s", pollingContext.getTopics(), e));
+        final Collection<String> topics = pollingContext.getTopics();
+        if (!topics.isEmpty()) {
+            try (final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext)) {
+                final ConfigVerificationResult.Builder verificationPartitions 
= new ConfigVerificationResult.Builder()
+                        .verificationStepName("Verify Topic Partitions");
+
+                try {
+                    final List<PartitionState> partitionStates = 
consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
+                    verificationPartitions
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Partitions [%d] found 
for Topics %s", partitionStates.size(), pollingContext.getTopics()));
+                } catch (final Exception e) {
+                    getLogger().error("Topics {} Partition verification 
failed", pollingContext.getTopics(), e);
+                    verificationPartitions
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("Topics %s Partition 
access failed: %s", pollingContext.getTopics(), e));
+                }
+                verificationResults.add(verificationPartitions.build());
+            } catch (IOException e) {
+                getLogger().warn("Couldn't close KafkaConsumerService after 
verification.", e);
+            }
         }
-        verificationResults.add(verificationPartitions.build());
 
         return verificationResults;
     }
 
     private KafkaConsumerService getConsumerService(final ProcessContext 
context) {
+        recreatePartitionedConsumerServices();
+
         final KafkaConsumerService consumerService = consumerServices.poll();
         if (consumerService != null) {
             return consumerService;
         }
 
-        final int activeCount = activeConsumerCount.incrementAndGet();
-        if (activeCount > getMaxConsumerCount()) {
-            getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
-            activeConsumerCount.decrementAndGet();
+        final boolean isExplicitPartitionMapping = 
ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
+
+        if (isExplicitPartitionMapping) {
+            getLogger().trace("No Partitioned Kafka Consumer Service 
available, all specified partitions are being consumed from.");
             return null;
+        } else {
+            final int activeCount = activeConsumerCount.incrementAndGet();
+            if (activeCount > getMaxConsumerCount()) {
+                getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
+                activeConsumerCount.decrementAndGet();
+                return null;
+            }
+
+            getLogger().info("No Kafka Consumer Service available; creating a 
new one. Active count: {}", activeCount);
+            return connectionService.getConsumerService(pollingContext);
         }
+    }
 
-        getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
-        final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+    private int getPartitionCount(final KafkaConnectionService 
connectionService) {
+        Collection<String> topics = this.pollingContext.getTopics();
+
+        if (topics.isEmpty()) {
+            return -1;
+        }
+
+        int partitionsEachTopic = 0;
+        try (KafkaConsumerService kafkaConsumerService = 
connectionService.getConsumerService(this.pollingContext)) {
+            Map<String, List<PartitionState>> topicToPartitionStates = 
kafkaConsumerService.getPartitionStatesByTopic();
+            for (List<PartitionState> partitionStatesForTopic : 
topicToPartitionStates.values()) {
+                final int partitionsThisTopic = partitionStatesForTopic.size();
+                if (partitionsEachTopic != 0 && partitionsThisTopic != 
partitionsEachTopic) {
+                    throw new IllegalStateException("The specific topic names 
do not have the same number of partitions");

Review Comment:
   The topic names should be included in the message for troubleshooting.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.processors.consumer;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestConsumerPartitionsUtil {
+    private final ComponentLog logger = mock();
+    private String hostname;
+
+    @BeforeEach
+    public void setup() throws UnknownHostException {
+        hostname = InetAddress.getLocalHost().getHostName();
+    }
+
+    @Test
+    public void testNoPartitionAssignments() throws UnknownHostException {

Review Comment:
   `UnknownHostException` does not appear to be thrown in this and other 
methods.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -389,12 +474,18 @@ public void onStopped() {
         while ((service = consumerServices.poll()) != null) {
             close(service, "Processor stopped");
         }
+
+        availablePartitionedPollingContexts.clear();
+        consumerServiceToPartitionedPollingContext.clear();
     }
 
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final KafkaConsumerService consumerService = 
getConsumerService(context);
+        final PollingContext pollingContext = 
Optional.ofNullable(consumerServiceToPartitionedPollingContext.get(consumerService))

Review Comment:
   Creating an `Optional` wrapper is not necessary, and does not add sufficient 
value for running on every invocation of `onTrigger`, so I recommend replacing 
with a standard conditional.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         final PollingContext pollingContext = createPollingContext(context);
-        final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext);
 
-        final ConfigVerificationResult.Builder verificationPartitions = new 
ConfigVerificationResult.Builder()
-                .verificationStepName("Verify Topic Partitions");
-
-        try {
-            final List<PartitionState> partitionStates = 
consumerService.getPartitionStates();
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                    .explanation(String.format("Partitions [%d] found for 
Topics %s", partitionStates.size(), pollingContext.getTopics()));
-        } catch (final Exception e) {
-            getLogger().error("Topics {} Partition verification failed", 
pollingContext.getTopics(), e);
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.FAILED)
-                    .explanation(String.format("Topics %s Partition access 
failed: %s", pollingContext.getTopics(), e));
+        final Collection<String> topics = pollingContext.getTopics();
+        if (!topics.isEmpty()) {
+            try (final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext)) {
+                final ConfigVerificationResult.Builder verificationPartitions 
= new ConfigVerificationResult.Builder()
+                        .verificationStepName("Verify Topic Partitions");
+
+                try {
+                    final List<PartitionState> partitionStates = 
consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
+                    verificationPartitions
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Partitions [%d] found 
for Topics %s", partitionStates.size(), pollingContext.getTopics()));
+                } catch (final Exception e) {
+                    getLogger().error("Topics {} Partition verification 
failed", pollingContext.getTopics(), e);

Review Comment:
   This is carried over from the previous implementation, but on review, the 
Verification Logger instance should be used instead of `getLogger()`.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -379,6 +437,33 @@ public void onScheduled(final ProcessContext context) {
         if (maxUncommittedSizeConfigured) {
             maxUncommittedSize = 
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
         }
+
+        if 
(ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()))
 {
+            final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+            final int partitionCount = getPartitionCount(connectionService);
+
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+
+                throw new ProcessException("Illegal Partition Assignment: 
There are "
+                        + numAssignedPartitions + " partitions statically 
assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names,"
+                        + " but the Kafka topic(s) have " + partitionCount + " 
partitions");
+            }
+
+            final int[] assignedPartitions;
+            try {
+                assignedPartitions = 
ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), 
getLogger());
+            } catch (final UnknownHostException uhe) {
+                throw new ProcessException("Could not determine localhost's 
hostname", uhe);

Review Comment:
   ```suggestion
                   throw new ProcessException("Failed to resolve local host 
address", uhe);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         final PollingContext pollingContext = createPollingContext(context);
-        final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext);
 
-        final ConfigVerificationResult.Builder verificationPartitions = new 
ConfigVerificationResult.Builder()
-                .verificationStepName("Verify Topic Partitions");
-
-        try {
-            final List<PartitionState> partitionStates = 
consumerService.getPartitionStates();
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                    .explanation(String.format("Partitions [%d] found for 
Topics %s", partitionStates.size(), pollingContext.getTopics()));
-        } catch (final Exception e) {
-            getLogger().error("Topics {} Partition verification failed", 
pollingContext.getTopics(), e);
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.FAILED)
-                    .explanation(String.format("Topics %s Partition access 
failed: %s", pollingContext.getTopics(), e));
+        final Collection<String> topics = pollingContext.getTopics();
+        if (!topics.isEmpty()) {
+            try (final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext)) {
+                final ConfigVerificationResult.Builder verificationPartitions 
= new ConfigVerificationResult.Builder()
+                        .verificationStepName("Verify Topic Partitions");
+
+                try {
+                    final List<PartitionState> partitionStates = 
consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
+                    verificationPartitions
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Partitions [%d] found 
for Topics %s", partitionStates.size(), pollingContext.getTopics()));
+                } catch (final Exception e) {
+                    getLogger().error("Topics {} Partition verification 
failed", pollingContext.getTopics(), e);
+                    verificationPartitions
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("Topics %s Partition 
access failed: %s", pollingContext.getTopics(), e));
+                }
+                verificationResults.add(verificationPartitions.build());
+            } catch (IOException e) {
+                getLogger().warn("Couldn't close KafkaConsumerService after 
verification.", e);
+            }
         }
-        verificationResults.add(verificationPartitions.build());
 
         return verificationResults;
     }
 
     private KafkaConsumerService getConsumerService(final ProcessContext 
context) {
+        recreatePartitionedConsumerServices();
+
         final KafkaConsumerService consumerService = consumerServices.poll();
         if (consumerService != null) {
             return consumerService;
         }
 
-        final int activeCount = activeConsumerCount.incrementAndGet();
-        if (activeCount > getMaxConsumerCount()) {
-            getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
-            activeConsumerCount.decrementAndGet();
+        final boolean isExplicitPartitionMapping = 
ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
+
+        if (isExplicitPartitionMapping) {
+            getLogger().trace("No Partitioned Kafka Consumer Service 
available, all specified partitions are being consumed from.");
             return null;
+        } else {
+            final int activeCount = activeConsumerCount.incrementAndGet();
+            if (activeCount > getMaxConsumerCount()) {
+                getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
+                activeConsumerCount.decrementAndGet();
+                return null;
+            }
+
+            getLogger().info("No Kafka Consumer Service available; creating a 
new one. Active count: {}", activeCount);
+            return connectionService.getConsumerService(pollingContext);
         }
+    }
 
-        getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
-        final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+    private int getPartitionCount(final KafkaConnectionService 
connectionService) {
+        Collection<String> topics = this.pollingContext.getTopics();
+
+        if (topics.isEmpty()) {
+            return -1;
+        }
+
+        int partitionsEachTopic = 0;
+        try (KafkaConsumerService kafkaConsumerService = 
connectionService.getConsumerService(this.pollingContext)) {
+            Map<String, List<PartitionState>> topicToPartitionStates = 
kafkaConsumerService.getPartitionStatesByTopic();
+            for (List<PartitionState> partitionStatesForTopic : 
topicToPartitionStates.values()) {
+                final int partitionsThisTopic = partitionStatesForTopic.size();
+                if (partitionsEachTopic != 0 && partitionsThisTopic != 
partitionsEachTopic) {
+                    throw new IllegalStateException("The specific topic names 
do not have the same number of partitions");
+                }
+
+                partitionsEachTopic = partitionsThisTopic;
+            }
+        } catch (IOException e) {
+            getLogger().warn("Couldn't close KafkaConsumerService after 
partition assignment check.", e);

Review Comment:
   It is best to avoid conjunctions and avoid ending with `.` characters in 
messages:
   ```suggestion
               getLogger().warn("Failed to close KafkaConsumerService after 
partition assignment check", e);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         final PollingContext pollingContext = createPollingContext(context);
-        final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext);
 
-        final ConfigVerificationResult.Builder verificationPartitions = new 
ConfigVerificationResult.Builder()
-                .verificationStepName("Verify Topic Partitions");
-
-        try {
-            final List<PartitionState> partitionStates = 
consumerService.getPartitionStates();
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                    .explanation(String.format("Partitions [%d] found for 
Topics %s", partitionStates.size(), pollingContext.getTopics()));
-        } catch (final Exception e) {
-            getLogger().error("Topics {} Partition verification failed", 
pollingContext.getTopics(), e);
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.FAILED)
-                    .explanation(String.format("Topics %s Partition access 
failed: %s", pollingContext.getTopics(), e));
+        final Collection<String> topics = pollingContext.getTopics();
+        if (!topics.isEmpty()) {
+            try (final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext)) {
+                final ConfigVerificationResult.Builder verificationPartitions 
= new ConfigVerificationResult.Builder()
+                        .verificationStepName("Verify Topic Partitions");
+
+                try {
+                    final List<PartitionState> partitionStates = 
consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
+                    verificationPartitions
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Partitions [%d] found 
for Topics %s", partitionStates.size(), pollingContext.getTopics()));
+                } catch (final Exception e) {
+                    getLogger().error("Topics {} Partition verification 
failed", pollingContext.getTopics(), e);
+                    verificationPartitions
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("Topics %s Partition 
access failed: %s", pollingContext.getTopics(), e));
+                }
+                verificationResults.add(verificationPartitions.build());
+            } catch (IOException e) {
+                getLogger().warn("Couldn't close KafkaConsumerService after 
verification.", e);

Review Comment:
   ```suggestion
                   getLogger().warn("Failed to close KafkaConsumerService after 
verification", e);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final 
ProcessContext context, final
 
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         final PollingContext pollingContext = createPollingContext(context);
-        final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext);
 
-        final ConfigVerificationResult.Builder verificationPartitions = new 
ConfigVerificationResult.Builder()
-                .verificationStepName("Verify Topic Partitions");
-
-        try {
-            final List<PartitionState> partitionStates = 
consumerService.getPartitionStates();
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
-                    .explanation(String.format("Partitions [%d] found for 
Topics %s", partitionStates.size(), pollingContext.getTopics()));
-        } catch (final Exception e) {
-            getLogger().error("Topics {} Partition verification failed", 
pollingContext.getTopics(), e);
-            verificationPartitions
-                    .outcome(ConfigVerificationResult.Outcome.FAILED)
-                    .explanation(String.format("Topics %s Partition access 
failed: %s", pollingContext.getTopics(), e));
+        final Collection<String> topics = pollingContext.getTopics();
+        if (!topics.isEmpty()) {
+            try (final KafkaConsumerService consumerService = 
connectionService.getConsumerService(pollingContext)) {
+                final ConfigVerificationResult.Builder verificationPartitions 
= new ConfigVerificationResult.Builder()
+                        .verificationStepName("Verify Topic Partitions");
+
+                try {
+                    final List<PartitionState> partitionStates = 
consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
+                    verificationPartitions
+                            
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Partitions [%d] found 
for Topics %s", partitionStates.size(), pollingContext.getTopics()));
+                } catch (final Exception e) {
+                    getLogger().error("Topics {} Partition verification 
failed", pollingContext.getTopics(), e);
+                    verificationPartitions
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("Topics %s Partition 
access failed: %s", pollingContext.getTopics(), e));
+                }
+                verificationResults.add(verificationPartitions.build());
+            } catch (IOException e) {
+                getLogger().warn("Couldn't close KafkaConsumerService after 
verification.", e);
+            }
         }
-        verificationResults.add(verificationPartitions.build());
 
         return verificationResults;
     }
 
     private KafkaConsumerService getConsumerService(final ProcessContext 
context) {
+        recreatePartitionedConsumerServices();
+
         final KafkaConsumerService consumerService = consumerServices.poll();
         if (consumerService != null) {
             return consumerService;
         }
 
-        final int activeCount = activeConsumerCount.incrementAndGet();
-        if (activeCount > getMaxConsumerCount()) {
-            getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
-            activeConsumerCount.decrementAndGet();
+        final boolean isExplicitPartitionMapping = 
ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
+
+        if (isExplicitPartitionMapping) {
+            getLogger().trace("No Partitioned Kafka Consumer Service 
available, all specified partitions are being consumed from.");
             return null;
+        } else {
+            final int activeCount = activeConsumerCount.incrementAndGet();
+            if (activeCount > getMaxConsumerCount()) {
+                getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
+                activeConsumerCount.decrementAndGet();
+                return null;
+            }
+
+            getLogger().info("No Kafka Consumer Service available; creating a 
new one. Active count: {}", activeCount);
+            return connectionService.getConsumerService(pollingContext);
         }
+    }
 
-        getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
-        final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+    private int getPartitionCount(final KafkaConnectionService 
connectionService) {
+        Collection<String> topics = this.pollingContext.getTopics();
+
+        if (topics.isEmpty()) {
+            return -1;
+        }
+
+        int partitionsEachTopic = 0;
+        try (KafkaConsumerService kafkaConsumerService = 
connectionService.getConsumerService(this.pollingContext)) {
+            Map<String, List<PartitionState>> topicToPartitionStates = 
kafkaConsumerService.getPartitionStatesByTopic();
+            for (List<PartitionState> partitionStatesForTopic : 
topicToPartitionStates.values()) {
+                final int partitionsThisTopic = partitionStatesForTopic.size();
+                if (partitionsEachTopic != 0 && partitionsThisTopic != 
partitionsEachTopic) {
+                    throw new IllegalStateException("The specific topic names 
do not have the same number of partitions");
+                }
+
+                partitionsEachTopic = partitionsThisTopic;
+            }
+        } catch (IOException e) {
+            getLogger().warn("Couldn't close KafkaConsumerService after 
partition assignment check.", e);
+        }
+
+        return partitionsEachTopic;
+    }
+
+    private void recreatePartitionedConsumerServices() {
+        PollingContext partitionedPollingContext;
+        while ((partitionedPollingContext = 
availablePartitionedPollingContexts.poll()) != null) {
+            getLogger().info("Creating new Partitioned Kafka Consumer 
Service.");

Review Comment:
   Some identifying information, such as the Partition, should be included in 
the log message.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -379,6 +437,33 @@ public void onScheduled(final ProcessContext context) {
         if (maxUncommittedSizeConfigured) {
             maxUncommittedSize = 
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
         }
+
+        if 
(ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()))
 {
+            final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+            final int partitionCount = getPartitionCount(connectionService);
+
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+
+                throw new ProcessException("Illegal Partition Assignment: 
There are "
+                        + numAssignedPartitions + " partitions statically 
assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names,"
+                        + " but the Kafka topic(s) have " + partitionCount + " 
partitions");
+            }

Review Comment:
   Recommend using formatted strings instead of concatenation for this message.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/ConsumerPartitionsUtil.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.processors.consumer;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+    public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+    public static int[] getPartitionsForHost(final Map<String, String> 
properties, final ComponentLog logger) throws UnknownHostException {
+        final Map<String, String> hostnameToPartitionString = 
mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitionsByHost = 
mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+        if (partitionsByHost.isEmpty()) {
+            // Explicit partitioning is not enabled.
+            logger.debug("No explicit Consumer Partitions have been 
declared.");
+            return null;
+        }
+
+        logger.info("Found the following mapping of hosts to partitions: {}", 
hostnameToPartitionString);
+
+        // Determine the partitions based on hostname/IP.
+        int[] partitionsForThisHost = 
getPartitionsForThisHost(partitionsByHost);
+        if (partitionsForThisHost == null) {
+            throw new IllegalArgumentException("Could not find a partition 
mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
+        }
+
+        return partitionsForThisHost;
+    }
+
+    private static Map<String, int[]> mapPartitionValueToIntArrays(final 
Map<String, String> partitionValues) {
+        final Map<String, int[]> partitionsByHost = new HashMap<>();
+        for (final Map.Entry<String, String> entry : 
partitionValues.entrySet()) {
+            final String host = entry.getKey();
+            final int[] partitions = parsePartitions(host, entry.getValue());
+            partitionsByHost.put(host, partitions);
+        }
+
+        return partitionsByHost;
+    }
+
+    private static int[] getPartitionsForThisHost(final Map<String, int[]> 
partitionsByHost) throws UnknownHostException {
+        // Determine the partitions based on hostname/IP.
+        final InetAddress localhost = InetAddress.getLocalHost();
+        int[] partitionsForThisHost = 
partitionsByHost.get(localhost.getCanonicalHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        return partitionsByHost.get(localhost.getHostAddress());
+    }
+
+    private static Map<String, String> mapHostnamesToPartitionStrings(final 
Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionString = new HashMap<>();
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String propertyName = entry.getKey();
+            if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
+                continue;
+            }
+
+            if (propertyName.length() <= 
PARTITION_PROPERTY_NAME_PREFIX.length()) {
+                continue;
+            }
+
+            final String propertyNameAfterPrefix = 
propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
+            hostnameToPartitionString.put(propertyNameAfterPrefix, 
entry.getValue());
+        }
+
+        return hostnameToPartitionString;
+    }
+
+    private static int[] parsePartitions(final String hostname, final String 
propertyValue) {
+        final String[] splits = propertyValue.split(",");
+        final List<Integer> partitionList = new ArrayList<>();
+        for (final String split : splits) {
+            if (split.isBlank()) {
+                continue;
+            }
+
+            try {
+                final int partition = Integer.parseInt(split.trim());
+                if (partition < 0) {
+                    throw new IllegalArgumentException("Found invalid value 
for the partitions for hostname " + hostname + ": " + split + " is negative");
+                }
+
+                partitionList.add(partition);
+            } catch (final NumberFormatException nfe) {
+                throw new IllegalArgumentException("Found invalid value for 
the partitions for hostname " + hostname + ": " + split + " is not an integer");
+            }
+        }
+
+        // Map out List<Integer> to int[]
+        return partitionList.stream().mapToInt(Integer::intValue).toArray();
+    }
+
+    public static ValidationResult validateConsumePartitions(final Map<String, 
String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = 
mapHostnamesToPartitionStrings(properties);
+        if (hostnameToPartitionMapping.isEmpty()) {
+            // Partitions are not being explicitly assigned.
+            return new ValidationResult.Builder().valid(true).build();
+        }
+
+        final Set<Integer> partitionsClaimed = new HashSet<>();
+        final Set<Integer> duplicatePartitions = new HashSet<>();
+        for (final Map.Entry<String, String> entry : 
hostnameToPartitionMapping.entrySet()) {
+            final int[] partitions = parsePartitions(entry.getKey(), 
entry.getValue());
+            for (final int partition : partitions) {
+                final boolean added = partitionsClaimed.add(partition);
+                if (!added) {
+                    duplicatePartitions.add(partition);
+                }
+            }
+        }
+
+        final List<Integer> partitionsMissing = new ArrayList<>();
+        for (int i = 0; i < partitionsClaimed.size(); i++) {
+            if (!partitionsClaimed.contains(i)) {
+                partitionsMissing.add(i);
+            }
+        }
+
+        if (!partitionsMissing.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were not mapped to any 
node: " + partitionsMissing.toString())
+                .build();
+        }
+
+        if (!duplicatePartitions.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were mapped to multiple 
nodes: " + duplicatePartitions.toString())
+                .build();
+        }
+
+        final Map<String, int[]> partitionsByHost = 
mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+        final int[] partitionsForThisHost;
+        try {
+            partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+        } catch (UnknownHostException e) {
+            return new ValidationResult.Builder()
+                .valid(false)
+                .subject("Partition Assignment")
+                .explanation("Unable to determine hostname of localhost")
+                .build();
+        }
+
+        if (partitionsForThisHost == null) {
+            return new ValidationResult.Builder()
+                .subject("Partition Assignment")
+                .valid(false)
+                .explanation("No assignment was given for this host")
+                .build();
+        }
+
+        return new ValidationResult.Builder().valid(true).build();
+    }
+
+    public static boolean isPartitionAssignmentExplicit(final Map<String, 
String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = 
mapHostnamesToPartitionStrings(properties);
+        return !hostnameToPartitionMapping.isEmpty();
+    }
+
+    public static int getPartitionAssignmentCount(final Map<String, String> 
properties) {

Review Comment:
   Recommend moving all public methods before all private methods in this class.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -617,6 +765,10 @@ private void processInputFlowFile(final ProcessSession 
session, final OffsetTrac
     }
 
     private PollingContext createPollingContext(final ProcessContext context) {
+        return createPollingContext(context, null);
+    }
+
+    private PollingContext createPollingContext(final ProcessContext context, 
Integer partition) {

Review Comment:
   ```suggestion
       private PollingContext createPollingContext(final ProcessContext 
context, final Integer partition) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to