I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses Topic
Name Format of "names". The second uses Topic Name Format of "pattern". The
names format is able to sync with Kafka relatively quickly and begins
receiving messages within just a couple seconds. However, the pattern
format takes significantly longer to start receiving messages.

Diving in the logs, it appears the issue is that the consumer does not yet
have the proper offset, so it cannot begin pulling messages. Eventually, I
saw this in the log:

2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
groupId=2] Setting offset for partition test.topic.1-0 to the committed
offset FetchPosition{offset=18, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack:
r1)], epoch=0}}
2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
groupId=2] Setting offset for partition test.topic.1-1 to the committed
offset FetchPosition{offset=13, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack:
r2)], epoch=0}}

Very soon after, messages started arriving.

Is this lag an issue with the Kafka server? Or can the server be
queried/forced to establish the offset more quickly in cases when it is not
yet known?

I did notice that reducing the yield time seemed to speed the process up a
bit. The implication is the offset is established in relation to the number
of polls attempted. But, since I've seen a wide range of lag time, I'm not
positive that there is a direct relationship.

Thanks,
Mark

Reply via email to