This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch update/kafka-clients-4.0.0
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git

commit 56ca1631b86181f0b3ecedf294649ce30bcc1444
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 26 14:35:59 2025 +0100

    test compile issues
---
 .../internal/CommittingProducerSinkSpec.scala      | 34 +++++++++++-----------
 .../pekko/kafka/internal/ConsumerDummy.scala       | 11 +++++--
 .../apache/pekko/kafka/internal/ProducerSpec.scala |  2 +-
 3 files changed, 26 insertions(+), 21 deletions(-)

diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
index 098de532..eebb1ac0 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala
@@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 200.millis
@@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "skip"),
       consumer.message(partition, "send"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 200.millis
@@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
@@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
     val producerRecordsPerInput = 2
     val totalProducerRecords = elements.size * producerRecordsPerInput
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = 
CommitterSettings(system).withMaxBatch(elements.size.longValue())
@@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
     val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)
     val message = consumer.message(partition, "increment the offset")
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1)
@@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system)
@@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     // choose a large commit interval so that completion happens before
@@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 5.seconds
@@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val commitInterval = 5.seconds
@@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 2"))
 
     // this producer does not auto complete messages
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(1L)
@@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](false, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](false, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     // choose a large commit interval so that completion happens before
@@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxBatch(2L)
@@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
       consumer.message(partition, "value 1"),
       consumer.message(partition, "value 2"))
 
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system)
@@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
   }
 
   it should "shut down without elements" in assertAllStagesStopped {
-    val producer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+    val producer = new MockProducer[String, String](true, None.orNull, new 
StringSerializer, new StringSerializer)
     val producerSettings = ProducerSettings(system, new StringSerializer, new 
StringSerializer)
       .withProducer(producer)
     val committerSettings = CommitterSettings(system).withMaxInterval(1.second)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
index 93073673..bf59b57d 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.pekko.Done
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, 
TopicPartition }
+import org.apache.kafka.common.metrics.KafkaMetric
 import org.slf4j.{ Logger, LoggerFactory }
 
 import scala.concurrent.Promise
@@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def subscribe(pattern: java.util.regex.Pattern, callback: 
ConsumerRebalanceListener): Unit = ???
   override def subscribe(pattern: java.util.regex.Pattern): Unit = ???
   override def unsubscribe(): Unit = ???
-  override def poll(timeout: Long): ConsumerRecords[K, V] = ???
   override def commitSync(): Unit = ???
   override def commitSync(offsets: java.util.Map[TopicPartition, 
OffsetAndMetadata]): Unit = ???
   override def commitAsync(): Unit = ???
@@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def seekToEnd(partitions: java.util.Collection[TopicPartition]): 
Unit = ???
   override def position(partition: TopicPartition): Long = ???
   override def position(partition: TopicPartition, timeout: 
java.time.Duration): Long = ???
-  override def committed(partition: TopicPartition): OffsetAndMetadata = ???
   override def metrics(): java.util.Map[MetricName, _ <: Metric] = ???
   override def partitionsFor(topic: String): java.util.List[PartitionInfo] = 
???
   override def listTopics(): java.util.Map[String, 
java.util.List[PartitionInfo]] = ???
@@ -87,7 +86,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def commitSync(timeout: java.time.Duration): Unit = ???
   override def commitSync(offsets: java.util.Map[TopicPartition, 
OffsetAndMetadata],
       timeout: java.time.Duration): Unit = ???
-  override def committed(partition: TopicPartition, timeout: 
java.time.Duration): OffsetAndMetadata = ???
   override def committed(partitions: util.Set[TopicPartition]): 
util.Map[TopicPartition, OffsetAndMetadata] = ???
   override def committed(partitions: util.Set[TopicPartition],
       timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???
@@ -102,4 +100,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def groupMetadata(): ConsumerGroupMetadata = ???
   override def enforceRebalance(): Unit = ???
   override def currentLag(partition: TopicPartition): java.util.OptionalLong = 
???
+
+  override def subscribe(sp: SubscriptionPattern): Unit = ???
+  override def subscribe(sp: SubscriptionPattern, listener: 
ConsumerRebalanceListener): Unit = ???
+
+  override def registerMetricForSubscription(metric: KafkaMetric): Unit = ???
+  override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = 
???
+
 }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 5dd0df32..c88ef97d 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -158,7 +158,7 @@ class ProducerSpec(_system: ActorSystem)
     assertAllStagesStopped {
       val input = (1 to 10).map { recordAndMetadata(_)._1 }
 
-      val mockProducer = new MockProducer[String, String](true, new 
StringSerializer, new StringSerializer)
+      val mockProducer = new MockProducer[String, String](true, None.orNull, 
new StringSerializer, new StringSerializer)
 
       val fut: Future[Done] = 
Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer)))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to