This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 52fa5ba7 Update kafka-clients to 4.3.0 (#532)
52fa5ba7 is described below
commit 52fa5ba705efc12f2226468800339195d33cc42d
Author: Scala Steward <[email protected]>
AuthorDate: Mon May 25 20:36:17 2026 +0200
Update kafka-clients to 4.3.0 (#532)
* Update kafka-clients to 4.3.0
* Update TransactionalProducerStage.scala
* deprecations
* Update ProducerSpec.scala
* compile issues
* Update ProducerSpec.scala
* Update ProducerSpec.scala
* Update ConsumerSettings.scala
* Update ProducerSettings.scala
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala | 7 ++++---
.../src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala | 9 +++++++--
.../src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala | 9 +++++++--
.../apache/pekko/kafka/internal/TransactionalProducerStage.scala | 3 +++
project/Versions.scala | 2 +-
.../scala/org/apache/pekko/kafka/internal/ProducerSpec.scala | 6 +++++-
6 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index 78be5908..e34c9fec 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -14,15 +14,15 @@
package org.apache.pekko.kafka.benchmarks
-import org.apache.pekko
-import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
+import org.apache.pekko
+import pekko.kafka.benchmarks.KafkaConsumerBenchmarks.pollTimeoutMs
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ Callback, ProducerRecord,
RecordMetadata }
import org.apache.kafka.common.TopicPartition
-import scala.annotation.tailrec
+import scala.annotation.{ nowarn, tailrec }
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
@@ -45,6 +45,7 @@ object KafkaTransactionBenchmarks extends LazyLogging {
var accumulatedMsgCount = 0L
var lastCommit = 0L
+ @nowarn("msg=deprecated")
def doCommit(): Unit = {
accumulatedMsgCount = 0
val offsetMap = Map(new TopicPartition(fixture.sourceTopic, 0) -> new
OffsetAndMetadata(lastProcessedOffset))
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index e2989c08..0d4e952a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -660,9 +660,14 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
val propertiesWithMandatoryKeys = properties ++ Map(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
keyDeserializerOpt.map(_.getClass).orNull,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
valueDeserializerOpt.map(_.getClass).orNull)
-
+ val myProperties = if
(propertiesWithMandatoryKeys.contains(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
{
+ propertiesWithMandatoryKeys
+ } else {
+ // If bootstrap servers are not included, add a placeholder to avoid
confusion in the logs
+ propertiesWithMandatoryKeys + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-> "undefined")
+ }
val kafkaClients =
-
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
+ ConfigSettings.serializeAndMaskKafkaProperties(myProperties,
new org.apache.kafka.clients.consumer.ConsumerConfig(_))
"org.apache.pekko.kafka.ConsumerSettings(" +
s"properties=$kafkaClients," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index 936da91e..2f63d4da 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -391,9 +391,14 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val propertiesWithMandatoryKeys = properties ++ Map(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
keySerializerOpt.map(_.getClass).orNull,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
valueSerializerOpt.map(_.getClass).orNull)
-
+ val myProperties = if
(propertiesWithMandatoryKeys.contains(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
{
+ propertiesWithMandatoryKeys
+ } else {
+ // If bootstrap servers are not included, add a placeholder to avoid
confusion in the logs
+ propertiesWithMandatoryKeys + (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-> "undefined")
+ }
val kafkaClients =
-
ConfigSettings.serializeAndMaskKafkaProperties(propertiesWithMandatoryKeys,
+ ConfigSettings.serializeAndMaskKafkaProperties(myProperties,
new org.apache.kafka.clients.producer.ProducerConfig(_))
"org.apache.pekko.kafka.ProducerSettings(" +
s"properties=$kafkaClients," +
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 31d8464e..9c4b5d2c 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.{
ConsumerGroupMetadata, OffsetAndMetad
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
+import scala.annotation.nowarn
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -240,6 +241,7 @@ private final class TransactionalProducerStageLogic[K, V,
P](
super.onCompletionFailure(ex)
}
+ @nowarn("msg=deprecated")
private def commitTransaction(batch: NonemptyTransactionBatch,
beginNewTransaction: Boolean): Unit = {
val group = batch.group
log.debug("Committing transaction for transactional id '{}' consumer group
'{}' with offsets: {}",
@@ -247,6 +249,7 @@ private final class TransactionalProducerStageLogic[K, V,
P](
group,
batch.offsets)
val offsetMap = batch.offsetMap()
+ // ConsumerGroupMetadata constructor is deprecated
producer.sendOffsetsToTransaction(offsetMap.asJava, new
ConsumerGroupMetadata(group))
producer.commitTransaction()
log.debug("Committed transaction for transactional id '{}' consumer group
'{}' with offsets: {}",
diff --git a/project/Versions.scala b/project/Versions.scala
index 6d054791..ca87e09b 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -22,7 +22,7 @@ object Versions {
val pekkoConnectorsKafkaVersionForDocs = "current"
val pekkoManagementVersionForDocs = "current"
- val kafkaVersion = "4.1.2"
+ val kafkaVersion = "4.3.0"
val KafkaVersionForDocs = "37"
val mockitoVersion = "5.23.0"
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 114d024e..32850718 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -14,7 +14,6 @@
package org.apache.pekko.kafka.internal
-import java.util.concurrent.CompletableFuture
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.kafka.ConsumerMessage.{ GroupTopicPartition, PartitionOffset,
PartitionOffsetCommittedMarker }
@@ -43,6 +42,9 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
+import java.util.concurrent.CompletableFuture
+
+import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.jdk.CollectionConverters._
@@ -633,6 +635,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K,
V])(implicit ec: Execu
inOrder.verify(mock).beginTransaction()
}
+ @nowarn("msg=deprecated")
def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = {
val inOrder = Mockito.inOrder(mock)
val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) ->
new OffsetAndMetadata(po.offset + 1)).asJava
@@ -641,6 +644,7 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K,
V])(implicit ec: Execu
inOrder.verify(mock).beginTransaction()
}
+ @nowarn("msg=deprecated")
def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = {
val inOrder = Mockito.inOrder(mock)
val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) ->
new OffsetAndMetadata(po.offset + 1)).asJava
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]