I started this thread hoping it was just a configuration issue. But I may have to open a JIRA on this. If someone sees an issue with the following configuration and can explain why messages are not being consumed, please let me know. This processor has received no messages from Kafka in over an hour - while other processors are consuming from the same pattern-matched topics (using different Group ID values)
ConsumeKafka_2_6 (NiFi 1.16.2) Kafka Brokers: kafka1:12091, kafka2:12091 Topic Name(s): test2.* Topic Name Format: pattern Group ID: Group7 Commit Offsets: true Max Uncommitted Time: 1 secs Honor Transactions: true Separate By Key: false Security Protocol: PLAINTEXT SASL Mechanism: GSSAPI Token Auth: false Key Attribute Encoding: UTF-8 Encoded Offset Reset: earliest Message Header Encoding: UTF-8 Max Poll Records: 10000 Communications Timeout: 60 secs Thanks, Mark On Fri, May 27, 2022 at 12:41 PM Mark Bean <mark.o.b...@gmail.com> wrote: > I added 'metadata.max.age.ms = 1000' to the ConsumeKafka_2_6 processor. > It started receiving faster than the ConsumeKafka_2_6 (which has not > received any messages after about 15 minutes.) Here's my observations (all > with different ConsumeKafka_2_6 processors configured with unique GroupID) > > - Topic name method 'names' starts receiving messages almost immediately - > maybe a few seconds lag > - With method 'pattern' and 'metadata.max.age.ms = 1000', it took about > 10 minutes to start receiving messages > - With method 'pattern', no 'metadata.max.age.ms' override setting, but a > yield time of 10 ms, it starts receiving almost immediately > > This leads me to believe it has something to do with frequency or number > of polls. This would be odd behavior though. > > > > On Fri, May 27, 2022 at 9:47 AM Knowles Atchison Jr <katchiso...@gmail.com> > wrote: > >> 1. Correct, this setting is per consumer. You can add dynamic >> configuration >> items to the ConsumeKafka processor and they will get passed down to the >> underlying Kafka Consumer. >> >> 2. Yes, when a new topic is created that matches the pattern the >> ConsumerKafka has been configured with, it will get picked up the next >> time >> the sync runs. >> >> Knowles >> >> On Fri, May 27, 2022 at 9:27 AM Mark Bean <mark.o.b...@gmail.com> wrote: >> >> > I do not see this property (metadata.max.age.ms) anywhere in the >> > ConsumeKafka processor. If I want to adjust this value to something less >> > than 5 minutes, that will mean modifying the processor. Or is there a >> > mechanism already built-in to the processor that I can use? And, just >> to be >> > clear, this property is per-consumer, not inherent within the topic >> itself, >> > correct? >> > >> > Also, does the metadata refresh pick up new topics that were recently >> > created when using the pattern method? Example: ConsumeKafka connects >> and >> > begins consuming from a topic matching the pattern. Then, a new topic is >> > created on the Kafka server which also matches the pattern. Will >> > ConsumeKafka begin consuming from this new topic (after the default 5 >> > minute sync time)? >> > >> > Thanks, >> > Mark >> > >> > On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr < >> > katchiso...@gmail.com> >> > wrote: >> > >> > > Hi Mark, >> > > >> > > When a consumer first connects to a kafka cluster, there is a metadata >> > > handshake that transmits things like consumer group assignments and >> > > offsets. In application code, I find the first poll establishes this >> > > information and does not actually retrieve messages, depending on >> timeout >> > > used. Consumers have a configuration setting "auto.offset.reset" that >> > > specifies what to do in the event there is no offset or the current >> > offset >> > > does not exist. Kafka will then rewind to the beginning for >> 'earliest' to >> > > tail the end with 'latest' (the default). >> > > >> > > With the default latest setting, after that first poll, the consumer >> may >> > be >> > > waiting if new messages are not constantly coming in. This may also be >> > > compounded with how long the processor is waiting between calling >> poll() >> > > itself. >> > > >> > > I usually write something like this in my client applications, to get >> the >> > > consumer group assigned and rewound as quickly as possible: >> > > >> > > consumer.subscribe(Collections.singletonList(testTopic)); >> > > // "fake" poll to get assigned to consumer group, auto offset will >> > > kick in to earliest >> > > consumer.poll(Duration.ofMillis(0)); >> > > ConsumerRecords<String, String> records = >> > > consumer.poll(Duration.ofMillis(5000)); >> > > for (ConsumerRecord<String, String> record : records) { >> > > logger.info("Message received: " + record.toString()); >> > > } >> > > >> > > >> > > Having that said, when the pattern/regex is used, the consumer does >> not >> > > receive metadata immediately since it is unknown at the time of >> > connection >> > > what topics if any match the pattern. The subscription state on the >> > > consumer is updated. There is a delay from the time a new topic is >> > > discovered that matches the pattern and consumer groups are >> rebalanced. >> > > >> > > >> > >> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms >> > > >> > > * Subscribe to all topics matching specified pattern to get >> > > dynamically assigned partitions. >> > > * The pattern matching will be done periodically against all topics >> > > existing at the time of check. >> > > * This can be controlled through the {@code metadata.max.age.ms} >> > > configuration: by lowering >> > > * the max metadata age, the consumer will refresh metadata more often >> > > and check for matching topics. >> > > >> > > >> > > I would wager you're running into this lag above due to the pattern >> only >> > > checking every five minutes by default. >> > > >> > > Knowles >> > > >> > > On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> >> > wrote: >> > > >> > > > I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses >> > > Topic >> > > > Name Format of "names". The second uses Topic Name Format of >> "pattern". >> > > The >> > > > names format is able to sync with Kafka relatively quickly and >> begins >> > > > receiving messages within just a couple seconds. However, the >> pattern >> > > > format takes significantly longer to start receiving messages. >> > > > >> > > > Diving in the logs, it appears the issue is that the consumer does >> not >> > > yet >> > > > have the proper offset, so it cannot begin pulling messages. >> > Eventually, >> > > I >> > > > saw this in the log: >> > > > >> > > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] >> > > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer >> > clientId=consumer-2-11, >> > > > groupId=2] Setting offset for partition test.topic.1-0 to the >> committed >> > > > offset FetchPosition{offset=18, offsetEpoch=Optional.empty, >> > > > currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 >> rack: >> > > > r1)], epoch=0}} >> > > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] >> > > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer >> > clientId=consumer-2-11, >> > > > groupId=2] Setting offset for partition test.topic.1-1 to the >> committed >> > > > offset FetchPosition{offset=13, offsetEpoch=Optional.empty, >> > > > currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 >> rack: >> > > > r2)], epoch=0}} >> > > > >> > > > Very soon after, messages started arriving. >> > > > >> > > > Is this lag an issue with the Kafka server? Or can the server be >> > > > queried/forced to establish the offset more quickly in cases when >> it is >> > > not >> > > > yet known? >> > > > >> > > > I did notice that reducing the yield time seemed to speed the >> process >> > up >> > > a >> > > > bit. The implication is the offset is established in relation to the >> > > number >> > > > of polls attempted. But, since I've seen a wide range of lag time, >> I'm >> > > not >> > > > positive that there is a direct relationship. >> > > > >> > > > Thanks, >> > > > Mark >> > > > >> > > >> > >> >