This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ea4a731 build with 2.0.0-M1 (#393)
1ea4a731 is described below

commit 1ea4a731da45682bb5bb0f6b68ad875c5a808348
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Dec 9 12:08:47 2025 +0100

    build with 2.0.0-M1 (#393)
    
    * build with 2.0.0-M1
    
    * deprecated code in TestSource/TestSink
    
    * Update PartitionedSourceSpec.scala
    
    * Remove redundant resolver configuration
---
 project/PekkoCoreDependency.scala                  |  2 +-
 .../pekko/kafka/testkit/scaladsl/KafkaSpec.scala   |  2 +-
 .../scaladsl/SchemaRegistrySerializationSpec.scala | 10 ++--
 .../org/apache/pekko/kafka/TransactionsOps.scala   |  2 +-
 .../kafka/internal/CommitCollectorStageSpec.scala  |  5 +-
 .../kafka/internal/CommittingWithMockSpec.scala    | 22 ++++----
 .../apache/pekko/kafka/internal/ConsumerSpec.scala | 22 ++++----
 .../kafka/internal/PartitionedSourceSpec.scala     | 54 +++++++++---------
 .../apache/pekko/kafka/internal/ProducerSpec.scala | 66 +++++++++-------------
 .../pekko/kafka/scaladsl/CommittingSpec.scala      | 20 +++----
 .../kafka/scaladsl/ConnectionCheckerSpec.scala     |  2 +-
 .../pekko/kafka/scaladsl/IntegrationSpec.scala     | 10 ++--
 .../kafka/scaladsl/PartitionedSourcesSpec.scala    |  8 +--
 .../pekko/kafka/scaladsl/RebalanceSpec.scala       | 10 ++--
 .../pekko/kafka/scaladsl/RetentionPeriodSpec.scala | 12 ++--
 .../pekko/kafka/scaladsl/TimestampSpec.scala       |  6 +-
 16 files changed, 120 insertions(+), 133 deletions(-)

diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index 992921fe..c127cb61 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "1.1.5"
+  override val currentVersion: String = "2.0.0-M1"
 }
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 1f77271b..97a52767 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -206,7 +206,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val 
zooKeeperPort: Int, actorSystem: A
     Consumer
       .plainSource(consumerSettings, Subscriptions.topics(topic.toSet))
       .map(_.value)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
 }
diff --git 
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala 
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index 5f643fb1..bccd1d65 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -147,21 +147,21 @@ class SchemaRegistrySerializationSpec extends 
DocsSpecBase with TestcontainersKa
       Consumer
         .plainExternalSource[String, SpecificRecord](consumerActor,
           Subscriptions.assignment(new TopicPartition(topic, 0)))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val (control2, probe2) =
       Consumer
         .plainExternalSource[String, SpecificRecord](consumerActor,
           Subscriptions.assignment(new TopicPartition(topic, 1)))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val (thisStreamStaysAlive, probe3) =
       Consumer
         .plainExternalSource[String, SpecificRecord](consumerActor,
           Subscriptions.assignment(new TopicPartition(topic, 2)))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     // request from 2 streams
@@ -194,12 +194,12 @@ class SchemaRegistrySerializationSpec extends 
DocsSpecBase with TestcontainersKa
     val (control1, partitionedProbe) =
       Consumer
         .plainPartitionedSource(specificRecordConsumerSettings(group), 
Subscriptions.topics(topic))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     partitionedProbe.request(1L)
     val (_, subSource) = partitionedProbe.expectNext()
-    val subStream = subSource.runWith(TestSink.probe)
+    val subStream = subSource.runWith(TestSink())
 
     subStream.request(1L)
 
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
index 17d592b9..d2c35fd4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
@@ -172,7 +172,7 @@ trait TransactionsOps extends TestSuite with Matchers {
       topic: String)(implicit actorSystem: ActorSystem, mat: Materializer): 
TestSubscriber.Probe[String] =
     offsetValueSource(settings, topic)
       .map(_._2)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
   def offsetValueSource(settings: ConsumerSettings[String, String],
       topic: String): Source[(Long, String), Consumer.Control] =
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
index e8e575e2..ec7fc6fd 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
@@ -407,11 +407,10 @@ class CommitCollectorStageSpec(_system: ActorSystem)
 
     val flow = Committer.batchFlow(committerSettings)
 
-    val ((source, control), sink) = TestSource
-      .probe[Committable]
+    val ((source, control), sink) = TestSource[Committable]()
       .viaMat(ConsumerControlFactory.controlFlow())(Keep.both)
       .via(flow)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     (source, control, sink)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index fd283bb1..43ce86b3 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -118,7 +118,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val mock = new ConsumerMock[K, V](commitLog)
 
     val (control, probe) = createSourceWithMetadata(mock.mock, (rec: 
ConsumerRecord[K, V]) => rec.offset.toString)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msg = createMessage(1)
@@ -149,7 +149,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msg = createMessage(1)
@@ -179,7 +179,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler(onCompleteFailure)
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msg = createMessage(1)
@@ -214,7 +214,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
       val commitLog = new ConsumerMock.LogHandler(onCompleteFailure)
       val mock = new ConsumerMock[K, V](commitLog)
       val (control, probe) = createCommittableSource(mock.mock)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val msg = createMessage(1)
@@ -244,7 +244,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val count = 100
@@ -274,7 +274,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock, topics = 
Set("topic1", "topic2"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -311,7 +311,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val (control, probe) = createSourceWithMetadata(mock.mock,
       (rec: ConsumerRecord[K, V]) => rec.offset.toString,
       topics = Set("topic1", "topic2"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -350,7 +350,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val (control, probe) = createSourceWithMetadata(mock.mock,
       (rec: ConsumerRecord[K, V]) => rec.offset.toString,
       topics = Set("topic1", "topic2"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -392,10 +392,10 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val mock1 = new ConsumerMock[K, V](commitLog1)
     val mock2 = new ConsumerMock[K, V](commitLog2)
     val (control1, probe1) = createCommittableSource(mock1.mock, "group1", 
Set("topic1", "topic2"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
     val (control2, probe2) = createCommittableSource(mock2.mock, "group2", 
Set("topic1", "topic3"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgs1a = (1 to 3).map(createMessage(_, "topic1", "group1"))
@@ -452,7 +452,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     val (control, probe) = createSourceWithMetadata(mock.mock,
       (rec: ConsumerRecord[K, V]) => rec.offset.toString,
       topics = Set("topic1", "topic2"))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index d92f55ed..b4c762c2 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -86,7 +86,7 @@ class ConsumerSpec(_system: ActorSystem)
   def checkMessagesReceiving(msgss: Seq[Seq[CommittableMessage[K, V]]]): Unit 
= {
     val mock = new ConsumerMock[K, V]()
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     probe.request(msgss.map(_.size).sum.toLong)
@@ -125,7 +125,7 @@ class ConsumerSpec(_system: ActorSystem)
     val mock = new FailingConsumerMock[K, V](new Exception("Fatal Kafka 
error"), failOnCallNumber = 1)
 
     val probe = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.right)
+      .toMat(TestSink())(Keep.right)
       .run()
 
     probe
@@ -136,7 +136,7 @@ class ConsumerSpec(_system: ActorSystem)
   it should "complete stage when stream control.stop called" in 
assertAllStagesStopped {
     val mock = new ConsumerMock[K, V]()
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     probe.request(100)
@@ -149,7 +149,7 @@ class ConsumerSpec(_system: ActorSystem)
   it should "complete stage when processing flow canceled" in 
assertAllStagesStopped {
     val mock = new ConsumerMock[K, V]()
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     probe.request(100)
@@ -184,7 +184,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     mock.enqueue((1 to 10).map(createMessage).map(toRecord))
@@ -204,7 +204,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     probe.request(1)
@@ -219,7 +219,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     probe.cancel()
@@ -231,7 +231,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     mock.enqueue((1 to 10).map(createMessage).map(toRecord))
@@ -249,7 +249,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgs = (1 to 10).map(createMessage)
@@ -285,7 +285,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msg = createMessage(1)
@@ -308,7 +308,7 @@ class ConsumerSpec(_system: ActorSystem)
     val commitLog = new ConsumerMock.LogHandler()
     val mock = new ConsumerMock[K, V](commitLog)
     val (control, probe) = createCommittableSource(mock.mock)
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val msgs = (1 to 10).map(createMessage)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 24d014f9..c7b3a244 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -80,7 +80,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
       .flatMapMerge(breadth = 10, _._2)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -113,7 +113,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -139,7 +139,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -157,7 +157,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     // No demand on sub-sources => paused
     dummy.tpsPaused should contain only tp0
 
-    val probeTp0 = 
subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]])
+    val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K, 
V]]())
     dummy.setNextPollData(tp0 -> singleRecord)
 
     // demand a value
@@ -175,7 +175,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -187,7 +187,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     // No demand on sub-sources => paused
     dummy.tpsPaused should contain.allOf(tp0, tp1)
 
-    val probeTp0 = 
subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]])
+    val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K, 
V]]())
     dummy.setNextPollData(tp0 -> singleRecord)
 
     // demand a value
@@ -205,7 +205,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -229,7 +229,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .committablePartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -270,7 +270,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedManualOffsetSource(consumerSettings(dummy), 
Subscriptions.topics(topic), getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -302,7 +302,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedManualOffsetSource(consumerSettings(dummy), 
Subscriptions.topics(topic), getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -341,7 +341,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedManualOffsetSource(consumerSettings(dummy), 
Subscriptions.topics(topic), getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -379,7 +379,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
         onRevoke = { tp =>
           revoked = revoked ++ tp
         })
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -413,7 +413,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedManualOffsetSource(consumerSettings(dummy), 
Subscriptions.topics(topic), getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -453,7 +453,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedManualOffsetSource(consumerSettings(dummy), 
Subscriptions.topics(topic), getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -483,7 +483,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
       .committablePartitionedManualOffsetSource(consumerSettings(dummy),
         Subscriptions.topics(topic),
         getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -517,7 +517,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
       .committablePartitionedManualOffsetSource(consumerSettings(dummy),
         Subscriptions.topics(topic),
         getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -558,7 +558,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
       .committablePartitionedManualOffsetSource(consumerSettings(dummy),
         Subscriptions.topics(topic),
         getOffsetsOnAssign)
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -596,7 +596,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
         onRevoke = { tp =>
           revoked = revoked ++ tp
         })
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -620,7 +620,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink = Consumer
       .plainPartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -644,7 +644,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink1 = Consumer
       .plainPartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -656,7 +656,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     // simulate partition re-balance
     val sink2 = Consumer
       .plainPartitionedSource(consumerSettings(dummy2), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.assignWithCallback(tp0)
     subSources1(tp1).runWith(Sink.ignore).futureValue should be(Done)
@@ -675,7 +675,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink1 = Consumer
       .plainPartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -684,8 +684,8 @@ class PartitionedSourceSpec(_system: ActorSystem)
     val subSources1 = Map(sink1.requestNext(), sink1.requestNext())
     subSources1.keys should contain.allOf(tp0, tp1)
 
-    val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K, 
V]])
-    val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K, 
V]])
+    val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]())
+    val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]())
 
     // trigger demand
     probeTp0.request(1L)
@@ -710,7 +710,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
 
     val sink1 = Consumer
       .plainPartitionedSource(consumerSettings(dummy), 
Subscriptions.topics(topic))
-      .runWith(TestSink.probe)
+      .runWith(TestSink())
 
     dummy.started.futureValue should be(Done)
 
@@ -719,8 +719,8 @@ class PartitionedSourceSpec(_system: ActorSystem)
     val subSources1 = Map(sink1.requestNext(), sink1.requestNext())
     subSources1.keys should contain.allOf(tp0, tp1)
 
-    val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K, 
V]])
-    val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K, 
V]])
+    val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]())
+    val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]())
 
     // trigger demand
     probeTp0.request(1L)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 12e93360..114d024e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -133,10 +133,9 @@ class ProducerSpec(_system: ActorSystem)
       }
       val committer = new CommittedMarkerMock
 
-      val (source, sink) = TestSource
-        .probe[TxMsg]
+      val (source, sink) = TestSource[TxMsg]()
         .via(testProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val txMsg = toTxMessage(input, committer.mock)
@@ -178,7 +177,7 @@ class ProducerSpec(_system: ActorSystem)
       }
       val probe = Source(input.map(toMessage))
         .via(testProducerFlow(client))
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(10)
@@ -198,10 +197,9 @@ class ProducerSpec(_system: ActorSystem)
       val inputMap = input.toMap
       new ProducerMock[K, V](ProducerMock.handlers.delayedMap(100.millis)(x => 
Try { inputMap(x) }))
     }
-    val (source, sink) = TestSource
-      .probe[Msg]
+    val (source, sink) = TestSource[Msg]()
       .via(testProducerFlow(client))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     input.map(toMessage).foreach(source.sendNext)
@@ -230,10 +228,9 @@ class ProducerSpec(_system: ActorSystem)
           else Success(inputMap(msg))
         })
       }
-      val (source, sink) = TestSource
-        .probe[Msg]
+      val (source, sink) = TestSource[Msg]()
         .via(testProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       sink.request(100)
@@ -259,10 +256,9 @@ class ProducerSpec(_system: ActorSystem)
           else Success(inputMap(msg))
         })
       }
-      val (source, sink) = TestSource
-        .probe[Msg]
+      val (source, sink) = TestSource[Msg]()
         .via(testProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       input.map(toMessage).foreach(source.sendNext)
@@ -292,11 +288,10 @@ class ProducerSpec(_system: ActorSystem)
           else Success(inputMap(msg))
         })
       }
-      val (source, sink) = TestSource
-        .probe[Msg]
+      val (source, sink) = TestSource[Msg]()
         .via(
           
testProducerFlow(client).withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.resumingDecider)))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       input.map(toMessage).foreach(source.sendNext)
@@ -320,7 +315,7 @@ class ProducerSpec(_system: ActorSystem)
       val client = new ProducerMock[K, V](ProducerMock.handlers.fail)
       val probe = Source(input.map(toMessage))
         .via(testProducerFlow(client))
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(10)
@@ -340,10 +335,9 @@ class ProducerSpec(_system: ActorSystem)
         val inputMap = input.toMap
         new ProducerMock[K, V](ProducerMock.handlers.delayedMap(5.seconds)(x 
=> Try { inputMap(x) }))
       }
-      val (source, sink) = TestSource
-        .probe[Msg]
+      val (source, sink) = TestSource[Msg]()
         .via(testProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       sink.request(10)
@@ -366,7 +360,7 @@ class ProducerSpec(_system: ActorSystem)
       }
       val probe = Source(input.map(toMessage))
         .via(testProducerFlow(client, closeOnStop = false))
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(10)
@@ -387,10 +381,9 @@ class ProducerSpec(_system: ActorSystem)
         Failure(error)
       })
 
-      val (source, sink) = TestSource
-        .probe[Msg]
+      val (source, sink) = TestSource[Msg]()
         .via(testProducerFlow(client, closeOnStop = false))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       sink.request(100)
@@ -410,7 +403,7 @@ class ProducerSpec(_system: ActorSystem)
       val probe = Source
         .empty[Msg]
         .via(testTransactionProducerFlow(client))
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(1)
@@ -430,10 +423,9 @@ class ProducerSpec(_system: ActorSystem)
       }
       val committer = new CommittedMarkerMock
 
-      val (source, sink) = TestSource
-        .probe[TxMsg]
+      val (source, sink) = TestSource[TxMsg]()
         .via(testTransactionProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val txMsg = toTxMessage(input, committer.mock)
@@ -457,10 +449,9 @@ class ProducerSpec(_system: ActorSystem)
       }
       val committer = new CommittedMarkerMock
 
-      val (source, sink) = TestSource
-        .probe[TxMsg]
+      val (source, sink) = TestSource[TxMsg]()
         .via(testTransactionProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val txMsg = toTxMessage(input, committer.mock)
@@ -485,11 +476,10 @@ class ProducerSpec(_system: ActorSystem)
       }
       val committer = new CommittedMarkerMock
 
-      val (source, sink) = TestSource
-        .probe[TxMsg]
+      val (source, sink) = TestSource[TxMsg]()
         .map(msg => ProducerMessage.passThrough[K, V, 
PartitionOffset](msg.passThrough))
         .via(testTransactionProducerFlow(client))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val txMsg = toTxMessage(input, committer.mock)
@@ -513,10 +503,9 @@ class ProducerSpec(_system: ActorSystem)
     }
     val committedMarker = new CommittedMarkerMock
 
-    val (source, sink) = TestSource
-      .probe[TxMsg]
+    val (source, sink) = TestSource[TxMsg]()
       .via(testTransactionProducerFlow(client))
-      .toMat(TestSink.probe)(Keep.both)
+      .toMat(TestSink())(Keep.both)
       .run()
 
     val txMsg: TxMsg = toTxMessage(input, committedMarker.mock)
@@ -542,8 +531,7 @@ class ProducerSpec(_system: ActorSystem)
     }
     val committedMarker = new CommittedMarkerMock
 
-    val (source, sink) = TestSource
-      .probe[TxMsg]
+    val (source, sink) = TestSource[TxMsg]()
       .via(testTransactionProducerFlow(client))
       .toMat(Sink.lastOption)(Keep.both)
       .run()
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
index 44efbb53..722bbe89 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
@@ -69,7 +69,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
             elem.record.value
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe1
@@ -82,7 +82,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe2 = Consumer
         .committableSource(consumerSettings, Subscriptions.topics(topic1))
         .map(_.record.value)
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       // Note that due to buffers and mapAsync(10) the committed offset is more
       // than 26, and that is not wrong
@@ -100,7 +100,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe3 = Consumer
         .committableSource(consumerSettings.withGroupId(group2), 
Subscriptions.topics(topic1))
         .map(_.record.value)
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe3
         .request(100)
@@ -131,7 +131,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val subscription1 = 
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref)
       val (control1, probe1) = Consumer
         .committableSource(consumerSettings, subscription1)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Await initial partition assignment
@@ -149,7 +149,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val subscription2 = 
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref)
       val (control2, probe2) = Consumer
         .committableSource(consumerSettings, subscription2)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Await an assignment to the new rebalance listener
@@ -211,7 +211,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val subscription1 = 
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref)
       val (control1, probe1) = Consumer
         .committableSource(consumerSettings, subscription1)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Await initial partition assignment
@@ -229,7 +229,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val subscription2 = 
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref)
       val (control2, probe2) = Consumer
         .committableSource(consumerSettings, subscription2)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Rebalance happens
@@ -275,7 +275,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
 
       val (control, probe1) = Consumer
         .committableSource(consumerDefaults.withGroupId(group), 
Subscriptions.topics(topic))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // request one, only
@@ -315,7 +315,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
           .map(_.committableOffset)
           .batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_))
           .mapAsync(1)(_.commitInternal())
-          .toMat(TestSink.probe)(Keep.both)
+          .toMat(TestSink())(Keep.both)
           .run()
 
       val (control, probe) = consumeAndBatchCommit(topic)
@@ -561,7 +561,7 @@ class CommittingSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
           .via(
             Committer
               
.batchFlow(committerDefaults.withDelivery(CommitDelivery.SendAndForget).withMaxBatch(commitBatchSize)))
-          .toMat(TestSink.probe)(Keep.both)
+          .toMat(TestSink())(Keep.both)
           .run()
 
       val (control, probe) = consumeAndBatchCommit(topic)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
index 45b73939..4725d418 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
@@ -83,7 +83,7 @@ class ConnectionCheckerSpec extends SpecBase with 
TestcontainersKafkaPerClassLik
       val consumerSettings = 
noBrokerConsumerSettings.withBootstrapServers(bootstrapServers)
 
       val (control, probe) =
-        Consumer.plainSource(consumerSettings, 
Subscriptions.topics(topic)).toMat(TestSink.probe)(Keep.both).run()
+        Consumer.plainSource(consumerSettings, 
Subscriptions.topics(topic)).toMat(TestSink())(Keep.both).run()
 
       probe.ensureSubscription().requestNext().value() shouldBe msg
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index 1058face..6325d63e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -180,7 +180,7 @@ class IntegrationSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
           .batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_))
           .mapAsync(producerDefaults.parallelism)(_.commitInternal())
 
-        val probe = source.runWith(TestSink.probe)
+        val probe = source.runWith(TestSink())
 
         probe.request(1).expectNext()
 
@@ -302,7 +302,7 @@ class IntegrationSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
         val probe = Consumer
           .plainExternalSource[Array[Byte], String](consumer, 
Subscriptions.assignment(partition0))
           .map(_.value())
-          .runWith(TestSink.probe)
+          .runWith(TestSink())
 
         probe
           .request(100)
@@ -325,7 +325,7 @@ class IntegrationSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
       val (control, probe) = Consumer
         .plainSource(consumerDefaults.withGroupId(group), 
Subscriptions.topics(topic))
         .map(_.value())
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe
@@ -353,7 +353,7 @@ class IntegrationSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
           Subscriptions.topics(topic))
         .flatMapMerge(1, _._2)
         .map(_.value())
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe
@@ -376,7 +376,7 @@ class IntegrationSpec extends SpecBase with 
TestcontainersKafkaLike with Inside
       val control = Consumer
         .plainSource(consumerDefaults.withGroupId(group), 
Subscriptions.topics(topic))
         .map(_.value())
-        .to(TestSink.probe)
+        .to(TestSink())
         .run()
 
       // Wait a tiny bit to avoid a race on "not yet initialized: only 
setHandler is allowed in GraphStageLogic constructor"
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
index 3b31a1e1..d9492b3f 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
@@ -55,7 +55,7 @@ class PartitionedSourcesSpec extends SpecBase with 
TestcontainersKafkaLike with
           _ => Future.successful(Map.empty))
         .flatMapMerge(1, _._2)
         .map(_.value())
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(100)
@@ -76,7 +76,7 @@ class PartitionedSourcesSpec extends SpecBase with 
TestcontainersKafkaLike with
           tp => Future.successful(tp.map(_ -> 51L).toMap))
         .flatMapMerge(1, _._2)
         .map(_.value())
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe
         .request(50)
@@ -337,13 +337,13 @@ class PartitionedSourcesSpec extends SpecBase with 
TestcontainersKafkaLike with
         .flatMapMerge(1, _._2)
         .map(_.value())
 
-      val (control1, firstConsumer) = 
source.toMat(TestSink.probe)(Keep.both).run()
+      val (control1, firstConsumer) = source.toMat(TestSink())(Keep.both).run()
 
       eventually {
         assert(partitionsAssigned, "first consumer should get asked for 
offsets")
       }
 
-      val secondConsumer = source.runWith(TestSink.probe)
+      val secondConsumer = source.runWith(TestSink())
 
       eventually {
         revoked.value should have size partitions / 2L
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index cd2471c1..21fca2b2 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -74,7 +74,7 @@ class RebalanceSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe1subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
       val (control1, probe1) = Consumer
         .plainSource(consumerSettings.withClientId(consumerClientId1), 
probe1subscription)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       log.debug("Await initial partition assignment")
@@ -95,7 +95,7 @@ class RebalanceSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe2subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
       val (control2, probe2) = Consumer
         .plainSource(consumerSettings.withClientId(consumerClientId2), 
probe2subscription)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       log.debug("Await a revoke to consumer 1")
@@ -136,7 +136,7 @@ class RebalanceSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
           .expectNextN(partitions.toLong)
           .map {
             case (tp, subSource) =>
-              (tp, subSource.toMat(TestSink.probe)(Keep.right).run())
+              (tp, subSource.toMat(TestSink())(Keep.right).run())
           }
 
       def runForSubSource(
@@ -170,7 +170,7 @@ class RebalanceSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe1subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
       val (control1, probe1) = Consumer
         
.plainPartitionedSource(consumerSettings.withClientId(consumerClientId1), 
probe1subscription)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       log.debug("Await initial partition assignment")
@@ -197,7 +197,7 @@ class RebalanceSpec extends SpecBase with 
TestcontainersKafkaLike with Inside {
       val probe2subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
       val (control2, probe2) = Consumer
         
.plainPartitionedSource(consumerSettings.withClientId(consumerClientId2), 
probe2subscription)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe2.request(1)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
index 870a120f..2dfe0de7 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
@@ -76,7 +76,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
 //      val probe1subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
 //      val (control1, probe1) = Consumer
 //        .committableSource(consumerSettings.withClientId(consumerClientId1), 
probe1subscription)
-//        .toMat(TestSink.probe)(Keep.both)
+//        .toMat(TestSink())(Keep.both)
 //        .run()
 //
 //      log.debug("Await initial partition assignment")
@@ -95,7 +95,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
 //      val probe2subscription = 
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
 //      val (control2, probe2) = Consumer
 //        .committableSource(consumerSettings.withClientId(consumerClientId2), 
probe2subscription)
-//        .toMat(TestSink.probe)(Keep.both)
+//        .toMat(TestSink())(Keep.both)
 //        .run()
 //
 //      log.debug("Await a revoke to consumer 1")
@@ -128,7 +128,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
 //      val probe3subscription = Subscriptions.topics("__consumer_offsets")
 //      val (control3, probe3) = Consumer
 //        .plainSource(group2consumerSettings.withClientId(consumerClientId3), 
probe3subscription)
-//        .toMat(TestSink.probe)(Keep.both)
+//        .toMat(TestSink())(Keep.both)
 //        .run()
 //      val commits: Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = 
probe3.request(100).expectNextN(10)
 //
@@ -198,7 +198,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
             Done
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe1
@@ -215,7 +215,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
       val probe2 = Consumer
         .committableSource(consumerSettings, Subscriptions.topics(topic1))
         .map(_.record.value)
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       // Note that due to buffers and mapAsync(10) the committed offset is more
       // than 26, and that is not wrong
@@ -235,7 +235,7 @@ class RetentionPeriodSpec extends SpecBase with 
TestcontainersKafkaPerClassLike
       val probe3 = Consumer
         .committableSource(consumerSettings, Subscriptions.topics(topic1))
         .map(_.record.value)
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
 
       probe3
         .request(100)
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
index 75267da4..a917604a 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
@@ -48,7 +48,7 @@ class TimestampSpec extends SpecBase with 
TestcontainersKafkaLike with Inside wi
         val probe = Consumer
           .plainSource(consumerSettings, topicsAndTs)
           .map(_.value())
-          .runWith(TestSink.probe)
+          .runWith(TestSink())
 
         probe
           .request(50)
@@ -74,7 +74,7 @@ class TimestampSpec extends SpecBase with 
TestcontainersKafkaLike with Inside wi
 
         val probe = Consumer
           .plainSource(consumerSettings, topicsAndTs)
-          .runWith(TestSink.probe)
+          .runWith(TestSink())
 
         probe.ensureSubscription()
         probe.expectNoMessage(200.millis)
@@ -93,7 +93,7 @@ class TimestampSpec extends SpecBase with 
TestcontainersKafkaLike with Inside wi
 
         val probe = Consumer
           .plainSource(consumerSettings, topicsAndTs)
-          .runWith(TestSink.probe)
+          .runWith(TestSink())
 
         probe.ensureSubscription()
         probe.expectNoMessage(200.millis)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to