Hi Mark,

When a consumer first connects to a kafka cluster, there is a metadata
handshake that transmits things like consumer group assignments and
offsets. In application code, I find the first poll establishes this
information and does not actually retrieve messages, depending on timeout
used. Consumers have a configuration setting "auto.offset.reset" that
specifies what to do in the event there is no offset or the current offset
does not exist. Kafka will then rewind to the beginning for 'earliest' to
tail the end with 'latest' (the default).

With the default latest setting, after that first poll, the consumer may be
waiting if new messages are not constantly coming in. This may also be
compounded with how long the processor is waiting between calling poll()
itself.

I usually write something like this in my client applications, to get the
consumer group assigned and rewound as quickly as possible:

consumer.subscribe(Collections.singletonList(testTopic));
// "fake" poll to get assigned to consumer group, auto offset will
kick in to earliest
consumer.poll(Duration.ofMillis(0));
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, String> record : records) {
   logger.info("Message received: " + record.toString());
}


Having that said, when the pattern/regex is used, the consumer does not
receive metadata immediately since it is unknown at the time of connection
what topics if any match the pattern. The subscription state on the
consumer is updated. There is a delay from the time a new topic is
discovered that matches the pattern and consumer groups are rebalanced.

https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms

* Subscribe to all topics matching specified pattern to get
dynamically assigned partitions.
* The pattern matching will be done periodically against all topics
existing at the time of check.
* This can be controlled through the {@code metadata.max.age.ms}
configuration: by lowering
* the max metadata age, the consumer will refresh metadata more often
and check for matching topics.


I would wager you're running into this lag above due to the pattern only
checking every five minutes by default.

Knowles

On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote:

> I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses Topic
> Name Format of "names". The second uses Topic Name Format of "pattern". The
> names format is able to sync with Kafka relatively quickly and begins
> receiving messages within just a couple seconds. However, the pattern
> format takes significantly longer to start receiving messages.
>
> Diving in the logs, it appears the issue is that the consumer does not yet
> have the proper offset, so it cannot begin pulling messages. Eventually, I
> saw this in the log:
>
> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
> groupId=2] Setting offset for partition test.topic.1-0 to the committed
> offset FetchPosition{offset=18, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack:
> r1)], epoch=0}}
> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
> groupId=2] Setting offset for partition test.topic.1-1 to the committed
> offset FetchPosition{offset=13, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack:
> r2)], epoch=0}}
>
> Very soon after, messages started arriving.
>
> Is this lag an issue with the Kafka server? Or can the server be
> queried/forced to establish the offset more quickly in cases when it is not
> yet known?
>
> I did notice that reducing the yield time seemed to speed the process up a
> bit. The implication is the offset is established in relation to the number
> of polls attempted. But, since I've seen a wide range of lag time, I'm not
> positive that there is a direct relationship.
>
> Thanks,
> Mark
>

Reply via email to