This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 52fa5ba7 Update kafka-clients to 4.3.0 (#532)
52fa5ba7 is described below

commit 52fa5ba705efc12f2226468800339195d33cc42d
Author: Scala Steward <[email protected]>
AuthorDate: Mon May 25 20:36:17 2026 +0200

    Update kafka-clients to 4.3.0 (#532)
    
    * Update kafka-clients to 4.3.0
    
    * Update TransactionalProducerStage.scala
    
    * deprecations
    
    * Update ProducerSpec.scala
    
    * compile issues
    
    * Update ProducerSpec.scala
    
    * Update ProducerSpec.scala
    
    * Update ConsumerSettings.scala
    
    * Update ProducerSettings.scala
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala      | 7 ++++---
 .../src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala | 9 +++++++--
 .../src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala | 9 +++++++--
 .../apache/pekko/kafka/internal/TransactionalProducerStage.scala | 3 +++
 project/Versions.scala                                           | 2 +-
 .../scala/org/apache/pekko/kafka/internal/ProducerSpec.scala     | 6 +++++-
 6 files changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index 78be5908..e34c9fec 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -14,15 +14,15 @@
 
 package org.apache.pekko.kafka.benchmarks
 
-import org.apache.pekko
-import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
 import com.codahale.metrics.Meter
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.pekko
+import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, 
RecordMetadata }
 import org.apache.kafka.common.TopicPartition
 
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
 import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 
@@ -45,6 +45,7 @@ object KafkaTransactionBenchmarks extends LazyLogging {
     var accumulatedMsgCount = 0L
     var lastCommit = 0L
 
+    @nowarn("msg=deprecated")
     def doCommit(): Unit = {
       accumulatedMsgCount = 0
       val offsetMap = Map(new TopicPartition(fixture.sourceTopic, 0) -> new 
OffsetAndMetadata(lastProcessedOffset))
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index e2989c08..0d4e952a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -660,9 +660,14 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
     val propertiesWithMandatoryKeys = properties ++ Map(
       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
keyDeserializerOpt.map(_.getClass).orNull,
       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
valueDeserializerOpt.map(_.getClass).orNull)
-
+    val myProperties = if 
(propertiesWithMandatoryKeys.contains(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) 
{
+      propertiesWithMandatoryKeys
+    } else {
+      // If bootstrap servers are not included, add a placeholder to avoid 
confusion in the logs
+      propertiesWithMandatoryKeys + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG 
-> "undefined")
+    }
     val kafkaClients =
-      
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
+      ConfigSettings.serializeAndMaskKafkaProperties(myProperties,
         new org.apache.kafka.clients.consumer.ConsumerConfig(_))
     "org.apache.pekko.kafka.ConsumerSettings(" +
     s"properties=$kafkaClients," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index 936da91e..2f63d4da 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -391,9 +391,14 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
     val propertiesWithMandatoryKeys = properties ++ Map(
       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
keySerializerOpt.map(_.getClass).orNull,
       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
valueSerializerOpt.map(_.getClass).orNull)
-
+    val myProperties = if 
(propertiesWithMandatoryKeys.contains(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) 
{
+      propertiesWithMandatoryKeys
+    } else {
+      // If bootstrap servers are not included, add a placeholder to avoid 
confusion in the logs
+      propertiesWithMandatoryKeys + (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 
-> "undefined")
+    }
     val kafkaClients =
-      
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
+      ConfigSettings.serializeAndMaskKafkaProperties(myProperties,
         new org.apache.kafka.clients.producer.ProducerConfig(_))
     "org.apache.pekko.kafka.ProducerSettings(" +
     s"properties=$kafkaClients," +
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 31d8464e..9c4b5d2c 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.{ 
ConsumerGroupMetadata, OffsetAndMetad
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 
+import scala.annotation.nowarn
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -240,6 +241,7 @@ private final class TransactionalProducerStageLogic[K, V, 
P](
     super.onCompletionFailure(ex)
   }
 
+  @nowarn("msg=deprecated")
   private def commitTransaction(batch: NonemptyTransactionBatch, 
beginNewTransaction: Boolean): Unit = {
     val group = batch.group
     log.debug("Committing transaction for transactional id '{}' consumer group 
'{}' with offsets: {}",
@@ -247,6 +249,7 @@ private final class TransactionalProducerStageLogic[K, V, 
P](
       group,
       batch.offsets)
     val offsetMap = batch.offsetMap()
+    // ConsumerGroupMetadata constructor is deprecated
     producer.sendOffsetsToTransaction(offsetMap.asJava, new 
ConsumerGroupMetadata(group))
     producer.commitTransaction()
     log.debug("Committed transaction for transactional id '{}' consumer group 
'{}' with offsets: {}",
diff --git a/project/Versions.scala b/project/Versions.scala
index 6d054791..ca87e09b 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -22,7 +22,7 @@ object Versions {
   val pekkoConnectorsKafkaVersionForDocs = "current"
   val pekkoManagementVersionForDocs = "current"
 
-  val kafkaVersion = "4.1.2"
+  val kafkaVersion = "4.3.0"
   val KafkaVersionForDocs = "37"
 
   val mockitoVersion = "5.23.0"
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 114d024e..32850718 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -14,7 +14,6 @@
 
 package org.apache.pekko.kafka.internal
 
-import java.util.concurrent.CompletableFuture
 import org.apache.pekko
 import pekko.actor.ActorSystem
 import pekko.kafka.ConsumerMessage.{ GroupTopicPartition, PartitionOffset, 
PartitionOffsetCommittedMarker }
@@ -43,6 +42,9 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpecLike
 import org.scalatest.matchers.should.Matchers
 
+import java.util.concurrent.CompletableFuture
+
+import scala.annotation.nowarn
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
 import scala.jdk.CollectionConverters._
@@ -633,6 +635,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, 
V])(implicit ec: Execu
     inOrder.verify(mock).beginTransaction()
   }
 
+  @nowarn("msg=deprecated")
   def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = {
     val inOrder = Mockito.inOrder(mock)
     val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> 
new OffsetAndMetadata(po.offset + 1)).asJava
@@ -641,6 +644,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, 
V])(implicit ec: Execu
     inOrder.verify(mock).beginTransaction()
   }
 
+  @nowarn("msg=deprecated")
   def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = {
     val inOrder = Mockito.inOrder(mock)
     val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> 
new OffsetAndMetadata(po.offset + 1)).asJava


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to