I do not see this property (metadata.max.age.ms) anywhere in the ConsumeKafka processor. If I want to adjust this value to something less than 5 minutes, that will mean modifying the processor. Or is there a mechanism already built-in to the processor that I can use? And, just to be clear, this property is per-consumer, not inherent within the topic itself, correct?
Also, does the metadata refresh pick up new topics that were recently created when using the pattern method? Example: ConsumeKafka connects and begins consuming from a topic matching the pattern. Then, a new topic is created on the Kafka server which also matches the pattern. Will ConsumeKafka begin consuming from this new topic (after the default 5 minute sync time)? Thanks, Mark On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr <katchiso...@gmail.com> wrote: > Hi Mark, > > When a consumer first connects to a kafka cluster, there is a metadata > handshake that transmits things like consumer group assignments and > offsets. In application code, I find the first poll establishes this > information and does not actually retrieve messages, depending on timeout > used. Consumers have a configuration setting "auto.offset.reset" that > specifies what to do in the event there is no offset or the current offset > does not exist. Kafka will then rewind to the beginning for 'earliest' to > tail the end with 'latest' (the default). > > With the default latest setting, after that first poll, the consumer may be > waiting if new messages are not constantly coming in. This may also be > compounded with how long the processor is waiting between calling poll() > itself. > > I usually write something like this in my client applications, to get the > consumer group assigned and rewound as quickly as possible: > > consumer.subscribe(Collections.singletonList(testTopic)); > // "fake" poll to get assigned to consumer group, auto offset will > kick in to earliest > consumer.poll(Duration.ofMillis(0)); > ConsumerRecords<String, String> records = > consumer.poll(Duration.ofMillis(5000)); > for (ConsumerRecord<String, String> record : records) { > logger.info("Message received: " + record.toString()); > } > > > Having that said, when the pattern/regex is used, the consumer does not > receive metadata immediately since it is unknown at the time of connection > what topics if any match the pattern. The subscription state on the > consumer is updated. There is a delay from the time a new topic is > discovered that matches the pattern and consumer groups are rebalanced. > > https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms > > * Subscribe to all topics matching specified pattern to get > dynamically assigned partitions. > * The pattern matching will be done periodically against all topics > existing at the time of check. > * This can be controlled through the {@code metadata.max.age.ms} > configuration: by lowering > * the max metadata age, the consumer will refresh metadata more often > and check for matching topics. > > > I would wager you're running into this lag above due to the pattern only > checking every five minutes by default. > > Knowles > > On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote: > > > I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses > Topic > > Name Format of "names". The second uses Topic Name Format of "pattern". > The > > names format is able to sync with Kafka relatively quickly and begins > > receiving messages within just a couple seconds. However, the pattern > > format takes significantly longer to start receiving messages. > > > > Diving in the logs, it appears the issue is that the consumer does not > yet > > have the proper offset, so it cannot begin pulling messages. Eventually, > I > > saw this in the log: > > > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, > > groupId=2] Setting offset for partition test.topic.1-0 to the committed > > offset FetchPosition{offset=18, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack: > > r1)], epoch=0}} > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, > > groupId=2] Setting offset for partition test.topic.1-1 to the committed > > offset FetchPosition{offset=13, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack: > > r2)], epoch=0}} > > > > Very soon after, messages started arriving. > > > > Is this lag an issue with the Kafka server? Or can the server be > > queried/forced to establish the offset more quickly in cases when it is > not > > yet known? > > > > I did notice that reducing the yield time seemed to speed the process up > a > > bit. The implication is the offset is established in relation to the > number > > of polls attempted. But, since I've seen a wide range of lag time, I'm > not > > positive that there is a direct relationship. > > > > Thanks, > > Mark > > >