This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 1ea4a731 build with 2.0.0-M1 (#393)
1ea4a731 is described below
commit 1ea4a731da45682bb5bb0f6b68ad875c5a808348
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Dec 9 12:08:47 2025 +0100
build with 2.0.0-M1 (#393)
* build with 2.0.0-M1
* deprecated code in TestSource/TestSink
* Update PartitionedSourceSpec.scala
* Remove redundant resolver configuration
---
project/PekkoCoreDependency.scala | 2 +-
.../pekko/kafka/testkit/scaladsl/KafkaSpec.scala | 2 +-
.../scaladsl/SchemaRegistrySerializationSpec.scala | 10 ++--
.../org/apache/pekko/kafka/TransactionsOps.scala | 2 +-
.../kafka/internal/CommitCollectorStageSpec.scala | 5 +-
.../kafka/internal/CommittingWithMockSpec.scala | 22 ++++----
.../apache/pekko/kafka/internal/ConsumerSpec.scala | 22 ++++----
.../kafka/internal/PartitionedSourceSpec.scala | 54 +++++++++---------
.../apache/pekko/kafka/internal/ProducerSpec.scala | 66 +++++++++-------------
.../pekko/kafka/scaladsl/CommittingSpec.scala | 20 +++----
.../kafka/scaladsl/ConnectionCheckerSpec.scala | 2 +-
.../pekko/kafka/scaladsl/IntegrationSpec.scala | 10 ++--
.../kafka/scaladsl/PartitionedSourcesSpec.scala | 8 +--
.../pekko/kafka/scaladsl/RebalanceSpec.scala | 10 ++--
.../pekko/kafka/scaladsl/RetentionPeriodSpec.scala | 12 ++--
.../pekko/kafka/scaladsl/TimestampSpec.scala | 6 +-
16 files changed, 120 insertions(+), 133 deletions(-)
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index 992921fe..c127cb61 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.1.5"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 1f77271b..97a52767 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -206,7 +206,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val
zooKeeperPort: Int, actorSystem: A
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic.toSet))
.map(_.value)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
}
diff --git
a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
index 5f643fb1..bccd1d65 100644
--- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
+++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala
@@ -147,21 +147,21 @@ class SchemaRegistrySerializationSpec extends
DocsSpecBase with TestcontainersKa
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 0)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val (control2, probe2) =
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 1)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val (thisStreamStaysAlive, probe3) =
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 2)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// request from 2 streams
@@ -194,12 +194,12 @@ class SchemaRegistrySerializationSpec extends
DocsSpecBase with TestcontainersKa
val (control1, partitionedProbe) =
Consumer
.plainPartitionedSource(specificRecordConsumerSettings(group),
Subscriptions.topics(topic))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
partitionedProbe.request(1L)
val (_, subSource) = partitionedProbe.expectNext()
- val subStream = subSource.runWith(TestSink.probe)
+ val subStream = subSource.runWith(TestSink())
subStream.request(1L)
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
index 17d592b9..d2c35fd4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/TransactionsOps.scala
@@ -172,7 +172,7 @@ trait TransactionsOps extends TestSuite with Matchers {
topic: String)(implicit actorSystem: ActorSystem, mat: Materializer):
TestSubscriber.Probe[String] =
offsetValueSource(settings, topic)
.map(_._2)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
def offsetValueSource(settings: ConsumerSettings[String, String],
topic: String): Source[(Long, String), Consumer.Control] =
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
index e8e575e2..ec7fc6fd 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommitCollectorStageSpec.scala
@@ -407,11 +407,10 @@ class CommitCollectorStageSpec(_system: ActorSystem)
val flow = Committer.batchFlow(committerSettings)
- val ((source, control), sink) = TestSource
- .probe[Committable]
+ val ((source, control), sink) = TestSource[Committable]()
.viaMat(ConsumerControlFactory.controlFlow())(Keep.both)
.via(flow)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
(source, control, sink)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index fd283bb1..43ce86b3 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -118,7 +118,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createSourceWithMetadata(mock.mock, (rec:
ConsumerRecord[K, V]) => rec.offset.toString)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msg = createMessage(1)
@@ -149,7 +149,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msg = createMessage(1)
@@ -179,7 +179,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler(onCompleteFailure)
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msg = createMessage(1)
@@ -214,7 +214,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler(onCompleteFailure)
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msg = createMessage(1)
@@ -244,7 +244,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val count = 100
@@ -274,7 +274,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock, topics =
Set("topic1", "topic2"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -311,7 +311,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val (control, probe) = createSourceWithMetadata(mock.mock,
(rec: ConsumerRecord[K, V]) => rec.offset.toString,
topics = Set("topic1", "topic2"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -350,7 +350,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val (control, probe) = createSourceWithMetadata(mock.mock,
(rec: ConsumerRecord[K, V]) => rec.offset.toString,
topics = Set("topic1", "topic2"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
@@ -392,10 +392,10 @@ class CommittingWithMockSpec(_system: ActorSystem)
val mock1 = new ConsumerMock[K, V](commitLog1)
val mock2 = new ConsumerMock[K, V](commitLog2)
val (control1, probe1) = createCommittableSource(mock1.mock, "group1",
Set("topic1", "topic2"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val (control2, probe2) = createCommittableSource(mock2.mock, "group2",
Set("topic1", "topic3"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgs1a = (1 to 3).map(createMessage(_, "topic1", "group1"))
@@ -452,7 +452,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
val (control, probe) = createSourceWithMetadata(mock.mock,
(rec: ConsumerRecord[K, V]) => rec.offset.toString,
topics = Set("topic1", "topic2"))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1"))
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index d92f55ed..b4c762c2 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -86,7 +86,7 @@ class ConsumerSpec(_system: ActorSystem)
def checkMessagesReceiving(msgss: Seq[Seq[CommittableMessage[K, V]]]): Unit
= {
val mock = new ConsumerMock[K, V]()
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(msgss.map(_.size).sum.toLong)
@@ -125,7 +125,7 @@ class ConsumerSpec(_system: ActorSystem)
val mock = new FailingConsumerMock[K, V](new Exception("Fatal Kafka
error"), failOnCallNumber = 1)
val probe = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
probe
@@ -136,7 +136,7 @@ class ConsumerSpec(_system: ActorSystem)
it should "complete stage when stream control.stop called" in
assertAllStagesStopped {
val mock = new ConsumerMock[K, V]()
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(100)
@@ -149,7 +149,7 @@ class ConsumerSpec(_system: ActorSystem)
it should "complete stage when processing flow canceled" in
assertAllStagesStopped {
val mock = new ConsumerMock[K, V]()
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(100)
@@ -184,7 +184,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
mock.enqueue((1 to 10).map(createMessage).map(toRecord))
@@ -204,7 +204,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(1)
@@ -219,7 +219,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.cancel()
@@ -231,7 +231,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
mock.enqueue((1 to 10).map(createMessage).map(toRecord))
@@ -249,7 +249,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgs = (1 to 10).map(createMessage)
@@ -285,7 +285,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msg = createMessage(1)
@@ -308,7 +308,7 @@ class ConsumerSpec(_system: ActorSystem)
val commitLog = new ConsumerMock.LogHandler()
val mock = new ConsumerMock[K, V](commitLog)
val (control, probe) = createCommittableSource(mock.mock)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val msgs = (1 to 10).map(createMessage)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 24d014f9..c7b3a244 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -80,7 +80,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
.flatMapMerge(breadth = 10, _._2)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -113,7 +113,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -139,7 +139,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -157,7 +157,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
// No demand on sub-sources => paused
dummy.tpsPaused should contain only tp0
- val probeTp0 =
subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]])
+ val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K,
V]]())
dummy.setNextPollData(tp0 -> singleRecord)
// demand a value
@@ -175,7 +175,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -187,7 +187,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
// No demand on sub-sources => paused
dummy.tpsPaused should contain.allOf(tp0, tp1)
- val probeTp0 =
subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]])
+ val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K,
V]]())
dummy.setNextPollData(tp0 -> singleRecord)
// demand a value
@@ -205,7 +205,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -229,7 +229,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.committablePartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -270,7 +270,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic), getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -302,7 +302,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic), getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -341,7 +341,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic), getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -379,7 +379,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
onRevoke = { tp =>
revoked = revoked ++ tp
})
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -413,7 +413,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic), getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -453,7 +453,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic), getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -483,7 +483,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
.committablePartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic),
getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -517,7 +517,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
.committablePartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic),
getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -558,7 +558,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
.committablePartitionedManualOffsetSource(consumerSettings(dummy),
Subscriptions.topics(topic),
getOffsetsOnAssign)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -596,7 +596,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
onRevoke = { tp =>
revoked = revoked ++ tp
})
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -620,7 +620,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink = Consumer
.plainPartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -644,7 +644,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink1 = Consumer
.plainPartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -656,7 +656,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
// simulate partition re-balance
val sink2 = Consumer
.plainPartitionedSource(consumerSettings(dummy2),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.assignWithCallback(tp0)
subSources1(tp1).runWith(Sink.ignore).futureValue should be(Done)
@@ -675,7 +675,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink1 = Consumer
.plainPartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -684,8 +684,8 @@ class PartitionedSourceSpec(_system: ActorSystem)
val subSources1 = Map(sink1.requestNext(), sink1.requestNext())
subSources1.keys should contain.allOf(tp0, tp1)
- val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K,
V]])
- val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K,
V]])
+ val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]())
+ val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]())
// trigger demand
probeTp0.request(1L)
@@ -710,7 +710,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
val sink1 = Consumer
.plainPartitionedSource(consumerSettings(dummy),
Subscriptions.topics(topic))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
dummy.started.futureValue should be(Done)
@@ -719,8 +719,8 @@ class PartitionedSourceSpec(_system: ActorSystem)
val subSources1 = Map(sink1.requestNext(), sink1.requestNext())
subSources1.keys should contain.allOf(tp0, tp1)
- val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K,
V]])
- val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K,
V]])
+ val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]())
+ val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]())
// trigger demand
probeTp0.request(1L)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 12e93360..114d024e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -133,10 +133,9 @@ class ProducerSpec(_system: ActorSystem)
}
val committer = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.via(testProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val txMsg = toTxMessage(input, committer.mock)
@@ -178,7 +177,7 @@ class ProducerSpec(_system: ActorSystem)
}
val probe = Source(input.map(toMessage))
.via(testProducerFlow(client))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(10)
@@ -198,10 +197,9 @@ class ProducerSpec(_system: ActorSystem)
val inputMap = input.toMap
new ProducerMock[K, V](ProducerMock.handlers.delayedMap(100.millis)(x =>
Try { inputMap(x) }))
}
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(testProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
input.map(toMessage).foreach(source.sendNext)
@@ -230,10 +228,9 @@ class ProducerSpec(_system: ActorSystem)
else Success(inputMap(msg))
})
}
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(testProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
sink.request(100)
@@ -259,10 +256,9 @@ class ProducerSpec(_system: ActorSystem)
else Success(inputMap(msg))
})
}
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(testProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
input.map(toMessage).foreach(source.sendNext)
@@ -292,11 +288,10 @@ class ProducerSpec(_system: ActorSystem)
else Success(inputMap(msg))
})
}
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(
testProducerFlow(client).withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.resumingDecider)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
input.map(toMessage).foreach(source.sendNext)
@@ -320,7 +315,7 @@ class ProducerSpec(_system: ActorSystem)
val client = new ProducerMock[K, V](ProducerMock.handlers.fail)
val probe = Source(input.map(toMessage))
.via(testProducerFlow(client))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(10)
@@ -340,10 +335,9 @@ class ProducerSpec(_system: ActorSystem)
val inputMap = input.toMap
new ProducerMock[K, V](ProducerMock.handlers.delayedMap(5.seconds)(x
=> Try { inputMap(x) }))
}
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(testProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
sink.request(10)
@@ -366,7 +360,7 @@ class ProducerSpec(_system: ActorSystem)
}
val probe = Source(input.map(toMessage))
.via(testProducerFlow(client, closeOnStop = false))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(10)
@@ -387,10 +381,9 @@ class ProducerSpec(_system: ActorSystem)
Failure(error)
})
- val (source, sink) = TestSource
- .probe[Msg]
+ val (source, sink) = TestSource[Msg]()
.via(testProducerFlow(client, closeOnStop = false))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
sink.request(100)
@@ -410,7 +403,7 @@ class ProducerSpec(_system: ActorSystem)
val probe = Source
.empty[Msg]
.via(testTransactionProducerFlow(client))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(1)
@@ -430,10 +423,9 @@ class ProducerSpec(_system: ActorSystem)
}
val committer = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.via(testTransactionProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val txMsg = toTxMessage(input, committer.mock)
@@ -457,10 +449,9 @@ class ProducerSpec(_system: ActorSystem)
}
val committer = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.via(testTransactionProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val txMsg = toTxMessage(input, committer.mock)
@@ -485,11 +476,10 @@ class ProducerSpec(_system: ActorSystem)
}
val committer = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.map(msg => ProducerMessage.passThrough[K, V,
PartitionOffset](msg.passThrough))
.via(testTransactionProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val txMsg = toTxMessage(input, committer.mock)
@@ -513,10 +503,9 @@ class ProducerSpec(_system: ActorSystem)
}
val committedMarker = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.via(testTransactionProducerFlow(client))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val txMsg: TxMsg = toTxMessage(input, committedMarker.mock)
@@ -542,8 +531,7 @@ class ProducerSpec(_system: ActorSystem)
}
val committedMarker = new CommittedMarkerMock
- val (source, sink) = TestSource
- .probe[TxMsg]
+ val (source, sink) = TestSource[TxMsg]()
.via(testTransactionProducerFlow(client))
.toMat(Sink.lastOption)(Keep.both)
.run()
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
index 44efbb53..722bbe89 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/CommittingSpec.scala
@@ -69,7 +69,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
elem.record.value
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe1
@@ -82,7 +82,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe2 = Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1))
.map(_.record.value)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
// Note that due to buffers and mapAsync(10) the committed offset is more
// than 26, and that is not wrong
@@ -100,7 +100,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe3 = Consumer
.committableSource(consumerSettings.withGroupId(group2),
Subscriptions.topics(topic1))
.map(_.record.value)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe3
.request(100)
@@ -131,7 +131,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val subscription1 =
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref)
val (control1, probe1) = Consumer
.committableSource(consumerSettings, subscription1)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Await initial partition assignment
@@ -149,7 +149,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val subscription2 =
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref)
val (control2, probe2) = Consumer
.committableSource(consumerSettings, subscription2)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Await an assignment to the new rebalance listener
@@ -211,7 +211,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val subscription1 =
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref)
val (control1, probe1) = Consumer
.committableSource(consumerSettings, subscription1)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Await initial partition assignment
@@ -229,7 +229,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val subscription2 =
Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref)
val (control2, probe2) = Consumer
.committableSource(consumerSettings, subscription2)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Rebalance happens
@@ -275,7 +275,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val (control, probe1) = Consumer
.committableSource(consumerDefaults.withGroupId(group),
Subscriptions.topics(topic))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// request one, only
@@ -315,7 +315,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
.map(_.committableOffset)
.batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_))
.mapAsync(1)(_.commitInternal())
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val (control, probe) = consumeAndBatchCommit(topic)
@@ -561,7 +561,7 @@ class CommittingSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
.via(
Committer
.batchFlow(committerDefaults.withDelivery(CommitDelivery.SendAndForget).withMaxBatch(commitBatchSize)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val (control, probe) = consumeAndBatchCommit(topic)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
index 45b73939..4725d418 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/ConnectionCheckerSpec.scala
@@ -83,7 +83,7 @@ class ConnectionCheckerSpec extends SpecBase with
TestcontainersKafkaPerClassLik
val consumerSettings =
noBrokerConsumerSettings.withBootstrapServers(bootstrapServers)
val (control, probe) =
- Consumer.plainSource(consumerSettings,
Subscriptions.topics(topic)).toMat(TestSink.probe)(Keep.both).run()
+ Consumer.plainSource(consumerSettings,
Subscriptions.topics(topic)).toMat(TestSink())(Keep.both).run()
probe.ensureSubscription().requestNext().value() shouldBe msg
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index 1058face..6325d63e 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -180,7 +180,7 @@ class IntegrationSpec extends SpecBase with
TestcontainersKafkaLike with Inside
.batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_))
.mapAsync(producerDefaults.parallelism)(_.commitInternal())
- val probe = source.runWith(TestSink.probe)
+ val probe = source.runWith(TestSink())
probe.request(1).expectNext()
@@ -302,7 +302,7 @@ class IntegrationSpec extends SpecBase with
TestcontainersKafkaLike with Inside
val probe = Consumer
.plainExternalSource[Array[Byte], String](consumer,
Subscriptions.assignment(partition0))
.map(_.value())
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(100)
@@ -325,7 +325,7 @@ class IntegrationSpec extends SpecBase with
TestcontainersKafkaLike with Inside
val (control, probe) = Consumer
.plainSource(consumerDefaults.withGroupId(group),
Subscriptions.topics(topic))
.map(_.value())
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe
@@ -353,7 +353,7 @@ class IntegrationSpec extends SpecBase with
TestcontainersKafkaLike with Inside
Subscriptions.topics(topic))
.flatMapMerge(1, _._2)
.map(_.value())
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe
@@ -376,7 +376,7 @@ class IntegrationSpec extends SpecBase with
TestcontainersKafkaLike with Inside
val control = Consumer
.plainSource(consumerDefaults.withGroupId(group),
Subscriptions.topics(topic))
.map(_.value())
- .to(TestSink.probe)
+ .to(TestSink())
.run()
// Wait a tiny bit to avoid a race on "not yet initialized: only
setHandler is allowed in GraphStageLogic constructor"
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
index 3b31a1e1..d9492b3f 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/PartitionedSourcesSpec.scala
@@ -55,7 +55,7 @@ class PartitionedSourcesSpec extends SpecBase with
TestcontainersKafkaLike with
_ => Future.successful(Map.empty))
.flatMapMerge(1, _._2)
.map(_.value())
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(100)
@@ -76,7 +76,7 @@ class PartitionedSourcesSpec extends SpecBase with
TestcontainersKafkaLike with
tp => Future.successful(tp.map(_ -> 51L).toMap))
.flatMapMerge(1, _._2)
.map(_.value())
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(50)
@@ -337,13 +337,13 @@ class PartitionedSourcesSpec extends SpecBase with
TestcontainersKafkaLike with
.flatMapMerge(1, _._2)
.map(_.value())
- val (control1, firstConsumer) =
source.toMat(TestSink.probe)(Keep.both).run()
+ val (control1, firstConsumer) = source.toMat(TestSink())(Keep.both).run()
eventually {
assert(partitionsAssigned, "first consumer should get asked for
offsets")
}
- val secondConsumer = source.runWith(TestSink.probe)
+ val secondConsumer = source.runWith(TestSink())
eventually {
revoked.value should have size partitions / 2L
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index cd2471c1..21fca2b2 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -74,7 +74,7 @@ class RebalanceSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe1subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
val (control1, probe1) = Consumer
.plainSource(consumerSettings.withClientId(consumerClientId1),
probe1subscription)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
log.debug("Await initial partition assignment")
@@ -95,7 +95,7 @@ class RebalanceSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe2subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
val (control2, probe2) = Consumer
.plainSource(consumerSettings.withClientId(consumerClientId2),
probe2subscription)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
log.debug("Await a revoke to consumer 1")
@@ -136,7 +136,7 @@ class RebalanceSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
.expectNextN(partitions.toLong)
.map {
case (tp, subSource) =>
- (tp, subSource.toMat(TestSink.probe)(Keep.right).run())
+ (tp, subSource.toMat(TestSink())(Keep.right).run())
}
def runForSubSource(
@@ -170,7 +170,7 @@ class RebalanceSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe1subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
val (control1, probe1) = Consumer
.plainPartitionedSource(consumerSettings.withClientId(consumerClientId1),
probe1subscription)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
log.debug("Await initial partition assignment")
@@ -197,7 +197,7 @@ class RebalanceSpec extends SpecBase with
TestcontainersKafkaLike with Inside {
val probe2subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
val (control2, probe2) = Consumer
.plainPartitionedSource(consumerSettings.withClientId(consumerClientId2),
probe2subscription)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe2.request(1)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
index 870a120f..2dfe0de7 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RetentionPeriodSpec.scala
@@ -76,7 +76,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
// val probe1subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref)
// val (control1, probe1) = Consumer
// .committableSource(consumerSettings.withClientId(consumerClientId1),
probe1subscription)
-// .toMat(TestSink.probe)(Keep.both)
+// .toMat(TestSink())(Keep.both)
// .run()
//
// log.debug("Await initial partition assignment")
@@ -95,7 +95,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
// val probe2subscription =
Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref)
// val (control2, probe2) = Consumer
// .committableSource(consumerSettings.withClientId(consumerClientId2),
probe2subscription)
-// .toMat(TestSink.probe)(Keep.both)
+// .toMat(TestSink())(Keep.both)
// .run()
//
// log.debug("Await a revoke to consumer 1")
@@ -128,7 +128,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
// val probe3subscription = Subscriptions.topics("__consumer_offsets")
// val (control3, probe3) = Consumer
// .plainSource(group2consumerSettings.withClientId(consumerClientId3),
probe3subscription)
-// .toMat(TestSink.probe)(Keep.both)
+// .toMat(TestSink())(Keep.both)
// .run()
// val commits: Seq[ConsumerRecord[Array[Byte], Array[Byte]]] =
probe3.request(100).expectNextN(10)
//
@@ -198,7 +198,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
Done
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe1
@@ -215,7 +215,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
val probe2 = Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1))
.map(_.record.value)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
// Note that due to buffers and mapAsync(10) the committed offset is more
// than 26, and that is not wrong
@@ -235,7 +235,7 @@ class RetentionPeriodSpec extends SpecBase with
TestcontainersKafkaPerClassLike
val probe3 = Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1))
.map(_.record.value)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe3
.request(100)
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
index 75267da4..a917604a 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/TimestampSpec.scala
@@ -48,7 +48,7 @@ class TimestampSpec extends SpecBase with
TestcontainersKafkaLike with Inside wi
val probe = Consumer
.plainSource(consumerSettings, topicsAndTs)
.map(_.value())
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe
.request(50)
@@ -74,7 +74,7 @@ class TimestampSpec extends SpecBase with
TestcontainersKafkaLike with Inside wi
val probe = Consumer
.plainSource(consumerSettings, topicsAndTs)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe.ensureSubscription()
probe.expectNoMessage(200.millis)
@@ -93,7 +93,7 @@ class TimestampSpec extends SpecBase with
TestcontainersKafkaLike with Inside wi
val probe = Consumer
.plainSource(consumerSettings, topicsAndTs)
- .runWith(TestSink.probe)
+ .runWith(TestSink())
probe.ensureSubscription()
probe.expectNoMessage(200.millis)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]