Hi Mark, When a consumer first connects to a kafka cluster, there is a metadata handshake that transmits things like consumer group assignments and offsets. In application code, I find the first poll establishes this information and does not actually retrieve messages, depending on timeout used. Consumers have a configuration setting "auto.offset.reset" that specifies what to do in the event there is no offset or the current offset does not exist. Kafka will then rewind to the beginning for 'earliest' to tail the end with 'latest' (the default).
With the default latest setting, after that first poll, the consumer may be waiting if new messages are not constantly coming in. This may also be compounded with how long the processor is waiting between calling poll() itself. I usually write something like this in my client applications, to get the consumer group assigned and rewound as quickly as possible: consumer.subscribe(Collections.singletonList(testTopic)); // "fake" poll to get assigned to consumer group, auto offset will kick in to earliest consumer.poll(Duration.ofMillis(0)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000)); for (ConsumerRecord<String, String> record : records) { logger.info("Message received: " + record.toString()); } Having that said, when the pattern/regex is used, the consumer does not receive metadata immediately since it is unknown at the time of connection what topics if any match the pattern. The subscription state on the consumer is updated. There is a delay from the time a new topic is discovered that matches the pattern and consumer groups are rebalanced. https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against all topics existing at the time of check. * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering * the max metadata age, the consumer will refresh metadata more often and check for matching topics. I would wager you're running into this lag above due to the pattern only checking every five minutes by default. Knowles On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com> wrote: > I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses Topic > Name Format of "names". The second uses Topic Name Format of "pattern". The > names format is able to sync with Kafka relatively quickly and begins > receiving messages within just a couple seconds. However, the pattern > format takes significantly longer to start receiving messages. > > Diving in the logs, it appears the issue is that the consumer does not yet > have the proper offset, so it cannot begin pulling messages. Eventually, I > saw this in the log: > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, > groupId=2] Setting offset for partition test.topic.1-0 to the committed > offset FetchPosition{offset=18, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1 rack: > r1)], epoch=0}} > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6] > o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-2-11, > groupId=2] Setting offset for partition test.topic.1-1 to the committed > offset FetchPosition{offset=13, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2 rack: > r2)], epoch=0}} > > Very soon after, messages started arriving. > > Is this lag an issue with the Kafka server? Or can the server be > queried/forced to establish the offset more quickly in cases when it is not > yet known? > > I did notice that reducing the yield time seemed to speed the process up a > bit. The implication is the offset is established in relation to the number > of polls attempted. But, since I've seen a wide range of lag time, I'm not > positive that there is a direct relationship. > > Thanks, > Mark >