Mark,

That configuration looks fine, but I cannot recreate this problem. I spun
up a kafka broker operating on PLAINTEXT and created two ConsumeKafka_2_6
(NiFi 1.16.2) processors, identical configuration except for group id. One
is group1, other is group2. Both are listening for test2.* via pattern.

No topics matching this pattern exist at the time of the processor
starting. I then used the console producer to auto create the topic
"test2.sample1" on first message write. When the metadata gets refreshed, I
see the consumers getting their partition assignments on the "newly
discovered" topics as expected:

2022-05-29 10:34:15,650 INFO [Timer-Driven Process Thread-5]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer
clientId=consumer-group1-1, groupId=group1] Notifying assignor about the
new Assignment(partitions=[test2.sample1-0])
2022-05-29 10:34:15,652 INFO [Timer-Driven Process Thread-9]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer
clientId=consumer-group2-2, groupId=group2] Notifying assignor about the
new Assignment(partitions=[test2.sample1-0])
2022-05-29 10:34:15,653 INFO [Timer-Driven Process Thread-9]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer
clientId=consumer-group2-2, groupId=group2] Adding newly assigned
partitions: test2.sample1-0
2022-05-29 10:34:15,653 INFO [Timer-Driven Process Thread-5]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer
clientId=consumer-group1-1, groupId=group1] Adding newly assigned
partitions: test2.sample1-0

I repeated this twice more with "test2.sample2" and "test2.sample3" topics.
Processors received partition assignments, rewound to offset 0 and consumed
messages.

Do you have access to the kafka broker logs or server configuration?
Identical consumer configuration except for group id would suggest an ACL
problem, but PLAINTEXT doesn't have a concept of a principal to match an
ACL on...

The kafka broker could tell you some more information about consumer groups:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups
--list
group2
group1

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups
--describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET
 LAG             CONSUMER-ID
 HOST            CLIENT-ID
group1          test2.sample2   0          5               5
0               consumer-group1-3-553d3eeb-7f82-4bf2-977d-b9247cfdc890 /
127.0.0.1      consumer-group1-3
group1          test2.sample1   0          10              10
 0               consumer-group1-3-553d3eeb-7f82-4bf2-977d-b9247cfdc890 /
127.0.0.1      consumer-group1-3
group1          test2.sample3   0          5               5
0               consumer-group1-3-553d3eeb-7f82-4bf2-977d-b9247cfdc890 /
127.0.0.1      consumer-group1-3

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET
 LAG             CONSUMER-ID
 HOST            CLIENT-ID
group2          test2.sample2   0          5               5
0               consumer-group2-4-8bee3f6c-a8ed-484a-87b5-dcb5644ce1dc /
127.0.0.1      consumer-group2-4
group2          test2.sample1   0          10              10
 0               consumer-group2-4-8bee3f6c-a8ed-484a-87b5-dcb5644ce1dc /
127.0.0.1      consumer-group2-4
group2          test2.sample3   0          5               5
0               consumer-group2-4-8bee3f6c-a8ed-484a-87b5-dcb5644ce1dc /
127.0.0.1      consumer-group2-4

If Group7 doesn't show up here, there's something wrong with this
consumer's connectivity to the kafka broker/cluster.

Knowles


On Fri, May 27, 2022, 2:04 PM Mark Bean <mark.o.b...@gmail.com> wrote:

> I started this thread hoping it was just a configuration issue. But I may
> have to open a JIRA on this. If someone sees an issue with the following
> configuration and can explain why messages are not being consumed, please
> let me know. This processor has received no messages from Kafka in over an
> hour - while other processors are consuming from the same pattern-matched
> topics (using different Group ID values)
>
> ConsumeKafka_2_6 (NiFi 1.16.2)
> Kafka Brokers: kafka1:12091, kafka2:12091
> Topic Name(s): test2.*
> Topic Name Format: pattern
> Group ID: Group7
> Commit Offsets: true
> Max Uncommitted Time: 1 secs
> Honor Transactions: true
> Separate By Key: false
> Security Protocol: PLAINTEXT
> SASL Mechanism: GSSAPI
> Token Auth: false
> Key Attribute Encoding: UTF-8 Encoded
> Offset Reset: earliest
> Message Header Encoding: UTF-8
> Max Poll Records: 10000
> Communications Timeout: 60 secs
>
> Thanks,
> Mark
>
>
> On Fri, May 27, 2022 at 12:41 PM Mark Bean <mark.o.b...@gmail.com> wrote:
>
> > I added 'metadata.max.age.ms = 1000' to the ConsumeKafka_2_6 processor.
> > It started receiving faster than the ConsumeKafka_2_6 (which has not
> > received any messages after about 15 minutes.) Here's my observations
> (all
> > with different ConsumeKafka_2_6 processors configured with unique
> GroupID)
> >
> > - Topic name method 'names' starts receiving messages almost immediately
> -
> > maybe a few seconds lag
> > - With method 'pattern' and 'metadata.max.age.ms = 1000', it took about
> > 10 minutes to start receiving messages
> > - With method 'pattern', no 'metadata.max.age.ms' override setting, but
> a
> > yield time of 10 ms, it starts receiving almost immediately
> >
> > This leads me to believe it has something to do with frequency or number
> > of polls. This would be odd behavior though.
> >
> >
> >
> > On Fri, May 27, 2022 at 9:47 AM Knowles Atchison Jr <
> katchiso...@gmail.com>
> > wrote:
> >
> >> 1. Correct, this setting is per consumer. You can add dynamic
> >> configuration
> >> items to the ConsumeKafka processor and they will get passed down to the
> >> underlying Kafka Consumer.
> >>
> >> 2. Yes, when a new topic is created that matches the pattern the
> >> ConsumerKafka has been configured with, it will get picked up the next
> >> time
> >> the sync runs.
> >>
> >> Knowles
> >>
> >> On Fri, May 27, 2022 at 9:27 AM Mark Bean <mark.o.b...@gmail.com>
> wrote:
> >>
> >> > I do not see this property (metadata.max.age.ms) anywhere in the
> >> > ConsumeKafka processor. If I want to adjust this value to something
> less
> >> > than 5 minutes, that will mean modifying the processor. Or is there a
> >> > mechanism already built-in to the processor that I can use? And, just
> >> to be
> >> > clear, this property is per-consumer, not inherent within the topic
> >> itself,
> >> > correct?
> >> >
> >> > Also, does the metadata refresh pick up new topics that were recently
> >> > created when using the pattern method? Example: ConsumeKafka connects
> >> and
> >> > begins consuming from a topic matching the pattern. Then, a new topic
> is
> >> > created on the Kafka server which also matches the pattern. Will
> >> > ConsumeKafka begin consuming from this new topic (after the default 5
> >> > minute sync time)?
> >> >
> >> > Thanks,
> >> > Mark
> >> >
> >> > On Thu, May 26, 2022 at 11:26 AM Knowles Atchison Jr <
> >> > katchiso...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Mark,
> >> > >
> >> > > When a consumer first connects to a kafka cluster, there is a
> metadata
> >> > > handshake that transmits things like consumer group assignments and
> >> > > offsets. In application code, I find the first poll establishes this
> >> > > information and does not actually retrieve messages, depending on
> >> timeout
> >> > > used. Consumers have a configuration setting "auto.offset.reset"
> that
> >> > > specifies what to do in the event there is no offset or the current
> >> > offset
> >> > > does not exist. Kafka will then rewind to the beginning for
> >> 'earliest' to
> >> > > tail the end with 'latest' (the default).
> >> > >
> >> > > With the default latest setting, after that first poll, the consumer
> >> may
> >> > be
> >> > > waiting if new messages are not constantly coming in. This may also
> be
> >> > > compounded with how long the processor is waiting between calling
> >> poll()
> >> > > itself.
> >> > >
> >> > > I usually write something like this in my client applications, to
> get
> >> the
> >> > > consumer group assigned and rewound as quickly as possible:
> >> > >
> >> > > consumer.subscribe(Collections.singletonList(testTopic));
> >> > > // "fake" poll to get assigned to consumer group, auto offset will
> >> > > kick in to earliest
> >> > > consumer.poll(Duration.ofMillis(0));
> >> > > ConsumerRecords<String, String> records =
> >> > > consumer.poll(Duration.ofMillis(5000));
> >> > > for (ConsumerRecord<String, String> record : records) {
> >> > >    logger.info("Message received: " + record.toString());
> >> > > }
> >> > >
> >> > >
> >> > > Having that said, when the pattern/regex is used, the consumer does
> >> not
> >> > > receive metadata immediately since it is unknown at the time of
> >> > connection
> >> > > what topics if any match the pattern. The subscription state on the
> >> > > consumer is updated. There is a delay from the time a new topic is
> >> > > discovered that matches the pattern and consumer groups are
> >> rebalanced.
> >> > >
> >> > >
> >> >
> >>
> https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
> >> > >
> >> > > * Subscribe to all topics matching specified pattern to get
> >> > > dynamically assigned partitions.
> >> > > * The pattern matching will be done periodically against all topics
> >> > > existing at the time of check.
> >> > > * This can be controlled through the {@code metadata.max.age.ms}
> >> > > configuration: by lowering
> >> > > * the max metadata age, the consumer will refresh metadata more
> often
> >> > > and check for matching topics.
> >> > >
> >> > >
> >> > > I would wager you're running into this lag above due to the pattern
> >> only
> >> > > checking every five minutes by default.
> >> > >
> >> > > Knowles
> >> > >
> >> > > On Wed, May 25, 2022 at 10:14 PM Mark Bean <mark.o.b...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first
> uses
> >> > > Topic
> >> > > > Name Format of "names". The second uses Topic Name Format of
> >> "pattern".
> >> > > The
> >> > > > names format is able to sync with Kafka relatively quickly and
> >> begins
> >> > > > receiving messages within just a couple seconds. However, the
> >> pattern
> >> > > > format takes significantly longer to start receiving messages.
> >> > > >
> >> > > > Diving in the logs, it appears the issue is that the consumer does
> >> not
> >> > > yet
> >> > > > have the proper offset, so it cannot begin pulling messages.
> >> > Eventually,
> >> > > I
> >> > > > saw this in the log:
> >> > > >
> >> > > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> >> > > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer
> >> > clientId=consumer-2-11,
> >> > > > groupId=2] Setting offset for partition test.topic.1-0 to the
> >> committed
> >> > > > offset FetchPosition{offset=18, offsetEpoch=Optional.empty,
> >> > > > currentLeader=LeaderAndEpoch{leader=Optional[kafka1:12091 (id: 1
> >> rack:
> >> > > > r1)], epoch=0}}
> >> > > > 2022-05-25 21:35:06,163 INFO [Timer-Driven Process Thread-6]
> >> > > > o.a.k.c.c.internals.ConsumerCoordinator [Consumer
> >> > clientId=consumer-2-11,
> >> > > > groupId=2] Setting offset for partition test.topic.1-1 to the
> >> committed
> >> > > > offset FetchPosition{offset=13, offsetEpoch=Optional.empty,
> >> > > > currentLeader=LeaderAndEpoch{leader=Optional[kafka2:12092 (id: 2
> >> rack:
> >> > > > r2)], epoch=0}}
> >> > > >
> >> > > > Very soon after, messages started arriving.
> >> > > >
> >> > > > Is this lag an issue with the Kafka server? Or can the server be
> >> > > > queried/forced to establish the offset more quickly in cases when
> >> it is
> >> > > not
> >> > > > yet known?
> >> > > >
> >> > > > I did notice that reducing the yield time seemed to speed the
> >> process
> >> > up
> >> > > a
> >> > > > bit. The implication is the offset is established in relation to
> the
> >> > > number
> >> > > > of polls attempted. But, since I've seen a wide range of lag time,
> >> I'm
> >> > > not
> >> > > > positive that there is a direct relationship.
> >> > > >
> >> > > > Thanks,
> >> > > > Mark
> >> > > >
> >> > >
> >> >
> >>
> >
>
>

Reply via email to