This is an automated email from the ASF dual-hosted git repository. engelen pushed a commit to branch update/kafka-clients-4.0.0 in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
commit 56ca1631b86181f0b3ecedf294649ce30bcc1444 Author: PJ Fanning <[email protected]> AuthorDate: Wed Mar 26 14:35:59 2025 +0100 test compile issues --- .../internal/CommittingProducerSinkSpec.scala | 34 +++++++++++----------- .../pekko/kafka/internal/ConsumerDummy.scala | 11 +++++-- .../apache/pekko/kafka/internal/ProducerSpec.scala | 2 +- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala index 098de532..eebb1ac0 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala @@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 200.millis @@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "skip"), consumer.message(partition, "send")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 200.millis @@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds) @@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds) @@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val producerRecordsPerInput = 2 val totalProducerRecords = elements.size * producerRecordsPerInput - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(elements.size.longValue()) @@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val consumer = FakeConsumer(groupId, topic, startOffset = 1616L) val message = consumer.message(partition, "increment the offset") - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1) @@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system) @@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) // choose a large commit interval so that completion happens before @@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 5.seconds @@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 5.seconds @@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 2")) // this producer does not auto complete messages - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1L) @@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1L) @@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) // choose a large commit interval so that completion happens before @@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L) @@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L) @@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system) @@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) } it should "shut down without elements" in assertAllStagesStopped { - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxInterval(1.second) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala index 93073673..bf59b57d 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.pekko.Done import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } +import org.apache.kafka.common.metrics.KafkaMetric import org.slf4j.{ Logger, LoggerFactory } import scala.concurrent.Promise @@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def subscribe(pattern: java.util.regex.Pattern, callback: ConsumerRebalanceListener): Unit = ??? override def subscribe(pattern: java.util.regex.Pattern): Unit = ??? override def unsubscribe(): Unit = ??? - override def poll(timeout: Long): ConsumerRecords[K, V] = ??? override def commitSync(): Unit = ??? override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = ??? override def commitAsync(): Unit = ??? @@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ??? override def position(partition: TopicPartition): Long = ??? override def position(partition: TopicPartition, timeout: java.time.Duration): Long = ??? - override def committed(partition: TopicPartition): OffsetAndMetadata = ??? override def metrics(): java.util.Map[MetricName, _ <: Metric] = ??? override def partitionsFor(topic: String): java.util.List[PartitionInfo] = ??? override def listTopics(): java.util.Map[String, java.util.List[PartitionInfo]] = ??? @@ -87,7 +86,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def commitSync(timeout: java.time.Duration): Unit = ??? override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata], timeout: java.time.Duration): Unit = ??? - override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ??? override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ??? override def committed(partitions: util.Set[TopicPartition], timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ??? @@ -102,4 +100,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def groupMetadata(): ConsumerGroupMetadata = ??? override def enforceRebalance(): Unit = ??? override def currentLag(partition: TopicPartition): java.util.OptionalLong = ??? + + override def subscribe(sp: SubscriptionPattern): Unit = ??? + override def subscribe(sp: SubscriptionPattern, listener: ConsumerRebalanceListener): Unit = ??? + + override def registerMetricForSubscription(metric: KafkaMetric): Unit = ??? + override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = ??? + } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala index 5dd0df32..c88ef97d 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala @@ -158,7 +158,7 @@ class ProducerSpec(_system: ActorSystem) assertAllStagesStopped { val input = (1 to 10).map { recordAndMetadata(_)._1 } - val mockProducer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val mockProducer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val fut: Future[Done] = Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer))) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
