[ 
https://issues.apache.org/jira/browse/BEAM-12995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17426264#comment-17426264
 ] 

Arkadiusz Gasinski commented on BEAM-12995:
-------------------------------------------

We observed it on both Flink and Direct runners. 

The reason, as Hubert mentioned above, is that the 
[KafkaUnboundedReader|https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L85]
 calls the 
[assign|https://kafka.apache.org/30/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)]
 method for each KafkaConsumer it creates for every partition there is in the 
configured topic.

[This dzone 
article|https://dzone.com/articles/dont-use-apache-kafka-consumer-groups-the-wrong-wa]
 explains the reason for this behavior.

We were able to replicate the behavior with plain KafkaConsumers belonging to 
the same group, but running on different machines and consuming messages via 
the assign method. Each consumer was receiving a copy of the same message even 
though they belonged to the same group.

Please let us know if additional information is required.

> Consumer group with random prefix
> ---------------------------------
>
>                 Key: BEAM-12995
>                 URL: https://issues.apache.org/jira/browse/BEAM-12995
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.32.0
>            Reporter: Hubert S
>            Priority: P1
>
> I'm running two beam pipelines on Apache Flink which read messages from the 
> same Kafka topic by using KafkaIO. Both of them are in the same consumer 
> group, so in my understanding each message should be consumed by either 
> pipeline (but not both) but I observed that the same set of messages is 
> consumed by each pipeline, meaning each message is processed at least twice. 
> I can see in the log that additional consumer group with prefix 
> "Reader-1_offset_consumer_<random_number>_" was created and also consume 
> messages. 
> Could you explain what should I do to consume messages only once by one 
> consumer group and why that additional group was created?
> Similar issue: BEAM-12880



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to