Mark, Any user defined property on the processor will be passed to the Kafka client as a client property.
Thanks Mark > On May 27, 2022, at 9:27 AM, Mark Bean <mark.o.b...@gmail.com> wrote: > > I do not see this property (metadata.max.age.ms) anywhere in the > ConsumeKafka processor. If I want to adjust this value to something less > than 5 minutes, that will mean modifying the processor. Or is there a > mechanism already built-in to the processor that I can use? And, just to be > clear, this property is per-consumer, not inherent within the topic itself, > correct? > > Also, does the metadata refresh pick up new topics that were recently > created when using the pattern method? Example: ConsumeKafka connects and > begins consuming from a topic matching the pattern. Then, a new topic is > created on the Kafka server which also matches the pattern. Will > ConsumeKafka begin consuming from this new topic (after the default 5 > minute sync time)? > > Thanks, > Mark > >> On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr <katchiso...@gmail.com> >> wrote: >> >> Hi Mark, >> >> When a consumer first connects to a kafka cluster, there is a metadata >> handshake that transmits things like consumer group assignments and >> offsets. In application code, I find the first poll establishes this >> information and does not actually retrieve messages, depending on timeout >> used. Consumers have a configuration setting "auto.offset.reset" that >> specifies what to do in the event there is no offset or the current offset >> does not exist. Kafka will then rewind to the beginning for 'earliest' to >> tail the end with 'latest' (the default). >> >> With the default latest setting, after that first poll, the consumer may be >> waiting if new messages are not constantly coming in. This may also be >> compounded with how long the processor is waiting between calling poll() >> itself. >> >> I usually write something like this in my client applications, to get the >> consumer group assigned and rewound as quickly as possible: >> >> consumer.subscribe(Collections.singletonList(testTopic)); >> // "fake" poll to get assigned to consumer group, auto offset will >> kick in to earliest >> consumer.poll(Duration.ofMillis(0)); >> ConsumerRecords<String, String> records = >> consumer.poll(Duration.ofMillis(5000)); >> for (ConsumerRecord<String, String> record : records) { >> logger.info("Message received: " + record.toString()); >> } >> >> >> Having that said, when the pattern/regex is used, the consumer does not >> receive metadata immediately since it is unknown at the time of connection >> what topics if any match the pattern. The subscription state on the >> consumer is updated. There is a delay from the time a new topic is >> discovered that matches the pattern and consumer groups are rebalanced. >> >> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms >> >> * Subscribe to all topics matching specified pattern to get >> dynamically assigned partitions. >> * The pattern matching will be done periodically against all topics >> existing at the time of check. >> * This can be controlled through the {@code metadata.max.age.ms} >> configuration: by lowering >> * the max metadata age, the consumer will refresh metadata more often >> and check for matching topics. >> >> >> I would wager you're running into this lag above due to the pattern only >> checking every five minutes by default. >> >> Knowles >> >>> On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote: >>> >>> I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses >> Topic >>> Name Format of "names". The second uses Topic Name Format of "pattern". >> The >>> names format is able to sync with Kafka relatively quickly and begins >>> receiving messages within just a couple seconds. However, the pattern >>> format takes significantly longer to start receiving messages. >>> >>> Diving in the logs, it appears the issue is that the consumer does not >> yet >>> have the proper offset, so it cannot begin pulling messages. Eventually, >> I >>> saw this in the log: >>> >>> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] >>> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, >>> groupId=2] Setting offset for partition test.topic.1-0 to the committed >>> offset FetchPosition{offset=18, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack: >>> r1)], epoch=0}} >>> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] >>> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, >>> groupId=2] Setting offset for partition test.topic.1-1 to the committed >>> offset FetchPosition{offset=13, offsetEpoch=Optional.empty, >>> currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack: >>> r2)], epoch=0}} >>> >>> Very soon after, messages started arriving. >>> >>> Is this lag an issue with the Kafka server? Or can the server be >>> queried/forced to establish the offset more quickly in cases when it is >> not >>> yet known? >>> >>> I did notice that reducing the yield time seemed to speed the process up >> a >>> bit. The implication is the offset is established in relation to the >> number >>> of polls attempted. But, since I've seen a wide range of lag time, I'm >> not >>> positive that there is a direct relationship. >>> >>> Thanks, >>> Mark >>> >>