I do not see this property (metadata.max.age.ms) anywhere in the
ConsumeKafka processor. If I want to adjust this value to something less
than 5 minutes, that will mean modifying the processor. Or is there a
mechanism already built-in to the processor that I can use? And, just to be
clear, this property is per-consumer, not inherent within the topic itself,
correct?

Also, does the metadata refresh pick up new topics that were recently
created when using the pattern method? Example: ConsumeKafka connects and
begins consuming from a topic matching the pattern. Then, a new topic is
created on the Kafka server which also matches the pattern. Will
ConsumeKafka begin consuming from this new topic (after the default 5
minute sync time)?

Thanks,
Mark

On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr <katchiso...@gmail.com>
wrote:

> Hi Mark,
>
> When a consumer first connects to a kafka cluster, there is a metadata
> handshake that transmits things like consumer group assignments and
> offsets. In application code, I find the first poll establishes this
> information and does not actually retrieve messages, depending on timeout
> used. Consumers have a configuration setting "auto.offset.reset" that
> specifies what to do in the event there is no offset or the current offset
> does not exist. Kafka will then rewind to the beginning for 'earliest' to
> tail the end with 'latest' (the default).
>
> With the default latest setting, after that first poll, the consumer may be
> waiting if new messages are not constantly coming in. This may also be
> compounded with how long the processor is waiting between calling poll()
> itself.
>
> I usually write something like this in my client applications, to get the
> consumer group assigned and rewound as quickly as possible:
>
> consumer.subscribe(Collections.singletonList(testTopic));
> // "fake" poll to get assigned to consumer group, auto offset will
> kick in to earliest
> consumer.poll(Duration.ofMillis(0));
> ConsumerRecords<String, String> records =
> consumer.poll(Duration.ofMillis(5000));
> for (ConsumerRecord<String, String> record : records) {
>    logger.info("Message received: " + record.toString());
> }
>
>
> Having that said, when the pattern/regex is used, the consumer does not
> receive metadata immediately since it is unknown at the time of connection
> what topics if any match the pattern. The subscription state on the
> consumer is updated. There is a delay from the time a new topic is
> discovered that matches the pattern and consumer groups are rebalanced.
>
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
>
> * Subscribe to all topics matching specified pattern to get
> dynamically assigned partitions.
> * The pattern matching will be done periodically against all topics
> existing at the time of check.
> * This can be controlled through the {@code metadata.max.age.ms}
> configuration: by lowering
> * the max metadata age, the consumer will refresh metadata more often
> and check for matching topics.
>
>
> I would wager you're running into this lag above due to the pattern only
> checking every five minutes by default.
>
> Knowles
>
> On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote:
>
> > I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses
> Topic
> > Name Format of "names". The second uses Topic Name Format of "pattern".
> The
> > names format is able to sync with Kafka relatively quickly and begins
> > receiving messages within just a couple seconds. However, the pattern
> > format takes significantly longer to start receiving messages.
> >
> > Diving in the logs, it appears the issue is that the consumer does not
> yet
> > have the proper offset, so it cannot begin pulling messages. Eventually,
> I
> > saw this in the log:
> >
> > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
> > groupId=2] Setting offset for partition test.topic.1-0 to the committed
> > offset FetchPosition{offset=18, offsetEpoch=Optional.empty,
> > currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack:
> > r1)], epoch=0}}
> > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
> > groupId=2] Setting offset for partition test.topic.1-1 to the committed
> > offset FetchPosition{offset=13, offsetEpoch=Optional.empty,
> > currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack:
> > r2)], epoch=0}}
> >
> > Very soon after, messages started arriving.
> >
> > Is this lag an issue with the Kafka server? Or can the server be
> > queried/forced to establish the offset more quickly in cases when it is
> not
> > yet known?
> >
> > I did notice that reducing the yield time seemed to speed the process up
> a
> > bit. The implication is the offset is established in relation to the
> number
> > of polls attempted. But, since I've seen a wide range of lag time, I'm
> not
> > positive that there is a direct relationship.
> >
> > Thanks,
> > Mark
> >
>

Reply via email to