Mark,

Any user defined property on the processor will be passed to the Kafka client 
as a client property.

Thanks
Mark


> On May 27, 2022, at 9:27 AM, Mark Bean <mark.o.b...@gmail.com> wrote:
> 
> I do not see this property (metadata.max.age.ms) anywhere in the
> ConsumeKafka processor. If I want to adjust this value to something less
> than 5 minutes, that will mean modifying the processor. Or is there a
> mechanism already built-in to the processor that I can use? And, just to be
> clear, this property is per-consumer, not inherent within the topic itself,
> correct?
> 
> Also, does the metadata refresh pick up new topics that were recently
> created when using the pattern method? Example: ConsumeKafka connects and
> begins consuming from a topic matching the pattern. Then, a new topic is
> created on the Kafka server which also matches the pattern. Will
> ConsumeKafka begin consuming from this new topic (after the default 5
> minute sync time)?
> 
> Thanks,
> Mark
> 
>> On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr <katchiso...@gmail.com>
>> wrote:
>> 
>> Hi Mark,
>> 
>> When a consumer first connects to a kafka cluster, there is a metadata
>> handshake that transmits things like consumer group assignments and
>> offsets. In application code, I find the first poll establishes this
>> information and does not actually retrieve messages, depending on timeout
>> used. Consumers have a configuration setting "auto.offset.reset" that
>> specifies what to do in the event there is no offset or the current offset
>> does not exist. Kafka will then rewind to the beginning for 'earliest' to
>> tail the end with 'latest' (the default).
>> 
>> With the default latest setting, after that first poll, the consumer may be
>> waiting if new messages are not constantly coming in. This may also be
>> compounded with how long the processor is waiting between calling poll()
>> itself.
>> 
>> I usually write something like this in my client applications, to get the
>> consumer group assigned and rewound as quickly as possible:
>> 
>> consumer.subscribe(Collections.singletonList(testTopic));
>> // "fake" poll to get assigned to consumer group, auto offset will
>> kick in to earliest
>> consumer.poll(Duration.ofMillis(0));
>> ConsumerRecords<String, String> records =
>> consumer.poll(Duration.ofMillis(5000));
>> for (ConsumerRecord<String, String> record : records) {
>>   logger.info("Message received: " + record.toString());
>> }
>> 
>> 
>> Having that said, when the pattern/regex is used, the consumer does not
>> receive metadata immediately since it is unknown at the time of connection
>> what topics if any match the pattern. The subscription state on the
>> consumer is updated. There is a delay from the time a new topic is
>> discovered that matches the pattern and consumer groups are rebalanced.
>> 
>> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
>> 
>> * Subscribe to all topics matching specified pattern to get
>> dynamically assigned partitions.
>> * The pattern matching will be done periodically against all topics
>> existing at the time of check.
>> * This can be controlled through the {@code metadata.max.age.ms}
>> configuration: by lowering
>> * the max metadata age, the consumer will refresh metadata more often
>> and check for matching topics.
>> 
>> 
>> I would wager you're running into this lag above due to the pattern only
>> checking every five minutes by default.
>> 
>> Knowles
>> 
>>> On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote:
>>> 
>>> I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses
>> Topic
>>> Name Format of "names". The second uses Topic Name Format of "pattern".
>> The
>>> names format is able to sync with Kafka relatively quickly and begins
>>> receiving messages within just a couple seconds. However, the pattern
>>> format takes significantly longer to start receiving messages.
>>> 
>>> Diving in the logs, it appears the issue is that the consumer does not
>> yet
>>> have the proper offset, so it cannot begin pulling messages. Eventually,
>> I
>>> saw this in the log:
>>> 
>>> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
>>> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
>>> groupId=2] Setting offset for partition test.topic.1-0 to the committed
>>> offset FetchPosition{offset=18, offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack:
>>> r1)], epoch=0}}
>>> 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
>>> o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11,
>>> groupId=2] Setting offset for partition test.topic.1-1 to the committed
>>> offset FetchPosition{offset=13, offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack:
>>> r2)], epoch=0}}
>>> 
>>> Very soon after, messages started arriving.
>>> 
>>> Is this lag an issue with the Kafka server? Or can the server be
>>> queried/forced to establish the offset more quickly in cases when it is
>> not
>>> yet known?
>>> 
>>> I did notice that reducing the yield time seemed to speed the process up
>> a
>>> bit. The implication is the offset is established in relation to the
>> number
>>> of polls attempted. But, since I've seen a wide range of lag time, I'm
>> not
>>> positive that there is a direct relationship.
>>> 
>>> Thanks,
>>> Mark
>>> 
>> 

Reply via email to