This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch update/kafka-clients-4.0.0
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git

commit fab3db74122f13d2104db5f91220da0506a022e4
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 26 13:08:41 2025 +0100

    Update KafkaConsumerActor.scala
---
 .../apache/pekko/kafka/internal/KafkaConsumerActor.scala  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index 87a56426..f720b3a8 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -716,13 +716,16 @@ import scala.util.control.NonFatal
         })
 
     case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
-      @nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
+      Metadata.CommittedOffsets(
         Try {
-          @nowarn("cat=deprecation") val offset = 
consumer.committed(req.partition, settings.getMetadataRequestTimeout)
-          offset
-        },
-        req.partition)
-      resp
+          consumer
+            .committed(
+              java.util.Collections.singleton(req.partition),
+              settings.getMetadataRequestTimeout)
+            .asScala
+            .filterNot(_._2 == null)
+            .toMap
+        })
   }
 
   private def stopFromMessage(msg: StopLike) = msg match {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to