We have a Alpakka kafka consumer that listens to a topic with multiple 
partitions. As we have found two issues.

   1. 
   
   If we use committable source as shown below. the consumer starts by 
   listening to only one partition. Once the first batch commit happens it 
   start reading from another partition. This behaviour continues until all 
   partitions are read once and then it round robins between all partitions 
   until all lags are caught up.*(Single Thread consuming from all 
   partitions so only partition at a time)*
   
   Consumer.committableSource(consumerSettings, 
   Subscriptions.topics(topicName)
   
2.When replace above way of creating with commitable partitioned source we 
start with all partitions when consumer starts but still continue reading 
only from one partition at a time until lag zero on a given partition. This 
is the behaviour even if we use a mapAsync with flatMapMerge on source.*(Single 
Thread consuming from all partitions so only partition at a time)*

Source<Pair<TopicPartition, Source<CommittableMessage<String, String>, 
NotUsed>>, Control> source = 
Consumer.committablePartitionedSource(consumerSettings,
                                                                                
                                                              
Subscriptions.topics(TOPIC_NAME));

    source.flatMapMerge(10,
                        Pair::second)
          .mapAsync(10,
                    msg -> print(msg).thenApply(done -> 
msg.committableOffset()))
          .toMat(Committer.sink(committerSettings.withMaxBatch(100)),
                 Keep.both())
          .mapMaterializedValue(Consumer::createDrainingControl)
          .run(materializer);

If we don't do a flatMapMerge and wite one stream by per source we are able 
to use multi-thread and parallelly process from all partitions.

*Why is Default Alpakka consumer behaviour is single-threaded?*

One Another interesting behaviour:

   - If there are multiple consumers each consuming from different Kafka 
   topics but using the same group-id to consume. when one of the listeners is 
   reset using manual assignment all other consumer are also getting 
   rebalances and getting reset.

*Why is Kafka consumer sharing group ids between topic gets rebalanced if 
one of that topic is reset?*

Similar to the below issues but I don't think its the shovel between the 
delete to compact topic causing it rather its a bug. Alpakka/Kafka - 
Partitions consumed faster than others 
<https://stackoverflow.com/questions/52209589/alpakka-kafka-partitions-consumed-faster-than-others>

-- 
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user 
google-group soon.
** This group will soon be put into read-only mode, and replaced by 
discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>> 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/akka-user/695ba363-1d4b-45d5-9a14-319591665f9b%40googlegroups.com.

Reply via email to