[ 
https://issues.apache.org/jira/browse/NIFI-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17248145#comment-17248145
 ] 

Mark Payne commented on NIFI-8021:
----------------------------------

Re-opening this issue because i found a bug. If you have more partitions than 
you have NiFi nodes, in order to pin specific partitions to nodes, you must 
enter an empty string for at least one node, since all partitions are already 
otherwise accounted for.

However, if using an empty string, it results in a lot of ERROR logs. For 
example:
{code:java}
2020-12-11 14:51:21,680 ERROR [Timer-Driven Process Thread-2] 
o.a.n.p.kafka.pubsub.ConsumeKafka_2_0 
ConsumeKafka_2_0[id=535add08-0176-1000-ffff-ffff98b76af7] Exception while 
processing data from kafka so will close the lease 
org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@553f24a1
 due to java.lang.IllegalStateException: Consumer is not subscribed to any 
topics or assigned any partitions: java.lang.IllegalStateException: Consumer is 
not subscribed to any topics or assigned any partitions
java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
assigned any partitions
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1163)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:181)
        at 
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0.onTrigger(ConsumeKafka_2_0.java:444)
        at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
        at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){code}
This should be easy to fix, fortunately. In the createConsumerPool methods of 
the Consume* processors, we do something like:
{code:java}
final int[] partitionsToConsume;
try {
    partitionsToConsume = 
ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), 
getLogger());
} catch (final UnknownHostException uhe) {
    throw new ProcessException("Could not determine localhost's hostname", uhe);
} {code}
An easy fix would be to create a `volatile boolean hasPartitions;` member 
variable. In this section of the createConsumePool method, simply set 
`this.hasPartitions = partitionsToConsume.length > 0;`

The, in the `onTrigger` method, check this:
{code:java}
if (!this.hasPartitions) {
    getLogger().debug("This host has no partitions so will not poll Kafka.");
    context.yield();
    return;
} {code}

> Allow ConsumeKafka processors to use static partition mapping
> -------------------------------------------------------------
>
>                 Key: NIFI-8021
>                 URL: https://issues.apache.org/jira/browse/NIFI-8021
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>             Fix For: 1.13.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> With the ConsumeKafka processors, we use the KafkaConsumer.subscribe() 
> method, which is generally what is desirable. However, for a case where 
> strict FIFO order per partition is critical, as may be the case with CDC use 
> cases among others, we should allow users to pin a particular partition to a 
> particular node. This way, even if the node is restarted or stops pulling 
> data for a while, its partitions are not re-assigned, so there's no risk of 
> messages being delivered out of order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to