This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 7aa685f92 use pekko 1.3.0 (#1290)
7aa685f92 is described below
commit 7aa685f92a2cf06b8a17d0036c900222b3725e0d
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Nov 20 09:36:11 2025 +0100
use pekko 1.3.0 (#1290)
* use pekko 1.3.0
* try to fix TestSource/TestSink probe issues
* more changes
* compile issues
* compile issues
* compile issues
* Update MqttFrameStageSpec.scala
---
amqp/src/test/java/docs/javadsl/AmqpDocsTest.java | 2 +-
.../amqp/javadsl/AmqpConnectorsTest.java | 2 +-
.../connectors/amqp/javadsl/AmqpFlowTest.java | 6 +++---
.../test/scala/docs/scaladsl/AmqpDocsSpec.scala | 2 +-
.../amqp/scaladsl/AmqpConnectorsSpec.scala | 6 +++---
.../connectors/amqp/scaladsl/AmqpFlowSpec.scala | 23 +++++++++++-----------
.../docs/scaladsl/AvroParquetSourceSpec.scala | 4 ++--
.../eventbridge/EventBridgePublishMockSpec.scala | 10 +++++-----
.../scala/docs/scaladsl/AwsLambdaFlowSpec.scala | 4 ++--
.../cassandra/javadsl/CassandraSessionSpec.scala | 4 ++--
.../test/scala/docs/scaladsl/CsvParsingSpec.scala | 5 ++---
.../impl/ElasticsearchSimpleFlowStageTest.scala | 10 ++++------
.../impl/ElasticsearchSourcStageTest.scala | 2 +-
.../docs/scaladsl/FileTailSourceExtrasSpec.scala | 4 ++--
.../scala/docs/scaladsl/LogRotatorSinkSpec.scala | 12 +++++------
.../file/impl/archive/ZipArchiveFlowTest.scala | 5 ++---
.../stream/connectors/ftp/CommonFtpStageTest.java | 4 ++--
.../stream/connectors/ftp/CommonFtpStageSpec.scala | 22 ++++++++++-----------
.../test/scala/docs/scaladsl/IntegrationSpec.scala | 2 +-
.../connectors/google/util/AnnotateLastSpec.scala | 10 +++++-----
.../scaladsl/JmsBufferedAckConnectorsSpec.scala | 4 ++--
.../jakartams/scaladsl/JmsAckConnectorsSpec.scala | 4 ++--
.../scaladsl/JmsBufferedAckConnectorsSpec.scala | 4 ++--
.../jms/scaladsl/JmsAckConnectorsSpec.scala | 4 ++--
.../connectors/kinesis/KinesisFlowSpec.scala | 10 ++++------
.../kinesis/KinesisSchedulerSourceSpec.scala | 7 +++----
.../connectors/kinesis/KinesisSourceSpec.scala | 12 +++++------
.../kinesisfirehose/KinesisFirehoseFlowSpec.scala | 5 ++---
.../mqtt/streaming/impl/MqttFrameStageSpec.scala | 16 +++++++--------
.../src/test/java/docs/javadsl/MqttSourceTest.java | 2 +-
.../test/scala/docs/scaladsl/MqttSourceSpec.scala | 8 ++++----
.../src/test/java/docs/javadsl/MqttSourceTest.java | 2 +-
.../test/scala/docs/scaladsl/MqttSourceSpec.scala | 8 ++++----
project/PekkoCoreDependency.scala | 2 +-
.../connectors/sns/SnsPublishMockingSpec.scala | 14 ++++++-------
.../sqs/scaladsl/SqsPublishSinkSpec.scala | 18 ++++++++---------
.../sqs/scaladsl/SqsSourceMockSpec.scala | 6 +++---
.../text/scaladsl/CharsetCodingFlowsSpec.scala | 8 +++-----
udp/src/test/java/docs/javadsl/UdpTest.java | 8 ++++----
udp/src/test/scala/docs/scaladsl/UdpSpec.scala | 20 ++++++++-----------
40 files changed, 141 insertions(+), 160 deletions(-)
diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
index 66a5aa727..06bb08309 100644
--- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
+++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
@@ -145,7 +145,7 @@ public class AmqpDocsTest {
Source.from(input)
.map(ByteString::fromString)
.viaMat(ampqRpcFlow, Keep.right())
- .toMat(TestSink.probe(system), Keep.both())
+ .toMat(TestSink.create(system), Keep.both())
.run(system);
// #create-rpc-flow
result.first().toCompletableFuture().get(3, TimeUnit.SECONDS);
diff --git
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
index 225c9178d..e48526867 100644
---
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
+++
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
@@ -149,7 +149,7 @@ public class AmqpConnectorsTest {
.map(WriteMessage::create)
.viaMat(ampqRpcFlow, Keep.right())
.mapAsync(1, cm -> cm.ack().thenApply(unused -> cm.message()))
- .toMat(TestSink.probe(system), Keep.both())
+ .toMat(TestSink.create(system), Keep.both())
.run(system);
result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
diff --git
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
index 06f5e435f..bb0f9866c 100644
---
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
+++
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
@@ -108,7 +108,7 @@ public class AmqpFlowTest {
Source.from(input)
.map(s -> WriteMessage.create(ByteString.fromString(s)))
.via(flow)
- .toMat(TestSink.probe(system), Keep.right())
+ .toMat(TestSink.create(system), Keep.right())
.run(system);
result
@@ -142,7 +142,7 @@ public class AmqpFlowTest {
.map(s -> WriteMessage.create(ByteString.fromString(s)))
.via(flowWithContext)
.asSource()
- .toMat(TestSink.probe(system), Keep.right())
+ .toMat(TestSink.create(system), Keep.right())
.run(system);
result
@@ -165,7 +165,7 @@ public class AmqpFlowTest {
Source.from(input)
.map(s ->
Pair.create(WriteMessage.create(ByteString.fromString(s)), s))
.via(flow)
- .toMat(TestSink.probe(system), Keep.right())
+ .toMat(TestSink.create(system), Keep.right())
.run(system);
result
diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
index dab24b6a3..d0956f465 100644
--- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
+++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
@@ -103,7 +103,7 @@ class AmqpDocsSpec extends AmqpSpec {
val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString])
= Source(input)
.map(s => ByteString(s))
.viaMat(amqpRpcFlow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// #create-rpc-flow
rpcQueueF.futureValue
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index dad8f3bb7..bb038874a 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -103,7 +103,7 @@ class AmqpConnectorsSpec extends AmqpSpec with
ScalaCheckDrivenPropertyChecks {
val input = Vector("one", "two", "three", "four", "five")
val (rpcQueueF, probe) =
- Source(input).map(s =>
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
+ Source(input).map(s =>
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink())(Keep.both).run()
rpcQueueF.futureValue
val amqpSink = AmqpSink.replyTo(
@@ -134,7 +134,7 @@ class AmqpConnectorsSpec extends AmqpSpec with
ScalaCheckDrivenPropertyChecks {
Source
.empty[ByteString]
.via(AmqpRpcFlow.simple(AmqpWriteSettings(connectionProvider)))
- .runWith(TestSink.probe)
+ .runWith(TestSink())
.ensureSubscription()
.expectComplete()
@@ -358,7 +358,7 @@ class AmqpConnectorsSpec extends AmqpSpec with
ScalaCheckDrivenPropertyChecks {
.map(bytes => WriteMessage(bytes))
.viaMat(amqpRpcFlow)(Keep.right)
.mapAsync(1)(cm => cm.ack().map(_ => cm.message))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
rpcQueueF.futureValue
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
index 421da76cf..d45ec2f60 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
@@ -160,7 +160,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(mockedFlowWithContextAndConfirm)(Keep.right)
.asSource
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(input.size)
@@ -242,7 +242,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(mockedUnorderedFlowWithPassThrough)(Keep.right)
.asSource
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(input.size)
@@ -277,7 +277,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(input)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val messages = probe.request(input.size).expectNextN(input.size)
@@ -295,7 +295,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
.asSourceWithContext(identity)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val messages = probe.request(input.size).expectNextN(input.size)
@@ -312,7 +312,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(input)
.map(s => (WriteMessage(ByteString(s)), s))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val messages = probe.request(input.size).expectNextN(input.size)
@@ -350,7 +350,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(input)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(input.size)
@@ -378,7 +378,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(input)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val messages = probe.request(input.size).expectNextN(input.size)
@@ -397,7 +397,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(input)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.request(input.size)
@@ -435,7 +435,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
Source(1 to sourceElements)
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
probe.request(sourceElements)
@@ -451,11 +451,10 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with
BeforeAndAfterEach {
val input = Vector("one", "two")
val (sourceProbe, sinkProbe) =
- TestSource
- .probe[String]
+ TestSource[String]()
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.left)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
sinkProbe.request(input.size)
diff --git
a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
index 2fa88db1e..c385ec8e4 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
@@ -56,7 +56,7 @@ class AvroParquetSourceSpec
// #init-source
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
// #init-source
- val sink = source.runWith(TestSink.probe)
+ val sink = source.runWith(TestSink())
// then
val result: Seq[GenericRecord] = sink.toStrict(3.seconds)
@@ -80,7 +80,7 @@ class AvroParquetSourceSpec
// #init-source
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
// #init-source
- val sink = source.runWith(TestSink.probe)
+ val sink = source.runWith(TestSink())
// then
val result: Seq[GenericRecord] = sink.toStrict(3.seconds)
diff --git
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
index 35cf55e9e..cf3f89f2c 100644
---
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
+++
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
@@ -56,7 +56,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with
DefaultTestContext wit
when(eventBridgeClient.putEvents(putRequest)).thenReturn(CompletableFuture.completedFuture(putResult))
val (probe, future) =
-
TestSource.probe[PutEventsRequestEntry].via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[PutEventsRequestEntry]().via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
probe.sendNext(putRequestEntry).sendComplete()
@@ -71,7 +71,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with
DefaultTestContext wit
when(eventBridgeClient.putEvents(any[PutEventsRequest]())).thenReturn(CompletableFuture.completedFuture(putResult))
val (probe, future) =
-
TestSource.probe[PutEventsRequestEntry].via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[PutEventsRequestEntry]().via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
probe
.sendNext(entryDetail("eb-message-1"))
@@ -97,7 +97,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with
DefaultTestContext wit
when(eventBridgeClient.putEvents(any[PutEventsRequest]())).thenReturn(CompletableFuture.completedFuture(putResult))
val (probe, future) =
-
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
probe
.sendNext(Seq(entryDetail("eb-message-1"), entryDetail("eb-message-2"),
entryDetail("eb-message-3")))
.sendComplete()
@@ -118,7 +118,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with
DefaultTestContext wit
when(eventBridgeClient.putEvents(meq(publishRequest))).thenReturn(promise)
val (probe, future) =
-
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
probe.sendNext(Seq(entryDetail("eb-message"))).sendComplete()
a[RuntimeException] should be thrownBy {
@@ -132,7 +132,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with
DefaultTestContext wit
case class MyCustomException(message: String) extends Exception(message)
val (probe, future) =
-
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
probe.sendError(MyCustomException("upstream failure"))
a[MyCustomException] should be thrownBy {
diff --git a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
index 4cad3b954..0b3825596 100644
--- a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
+++ b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
@@ -77,7 +77,7 @@ class AwsLambdaFlowSpec
CompletableFuture.completedFuture(invokeResponse)
})
- val (probe, future) =
TestSource.probe[InvokeRequest].via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[InvokeRequest]().via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
probe.sendNext(invokeRequest)
probe.sendComplete()
@@ -98,7 +98,7 @@ class AwsLambdaFlowSpec
}
})
- val (probe, future) =
TestSource.probe[InvokeRequest].via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[InvokeRequest]().via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
probe.sendNext(invokeFailureRequest)
probe.sendComplete()
diff --git
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
index c47484118..646647c39 100644
---
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
+++
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
@@ -112,7 +112,7 @@ final class CassandraSessionSpec extends
CassandraSpecBase(ActorSystem("Cassandr
val stmt = await(session.prepare(s"SELECT count FROM $dataTable WHERE
partition = ?"))
val bound = stmt.bind("A")
val rows = session.select(bound).asScala
- val probe = rows.map(_.getLong("count")).runWith(TestSink.probe[Long])
+ val probe = rows.map(_.getLong("count")).runWith(TestSink[Long]())
probe.within(10.seconds) {
probe.request(10).expectNextUnordered(1L, 2L, 3L, 4L).expectComplete()
}
@@ -120,7 +120,7 @@ final class CassandraSessionSpec extends
CassandraSpecBase(ActorSystem("Cassandr
"select and bind as Source" in {
val rows = session.select(s"SELECT count FROM $dataTable WHERE partition
= ?", "B").asScala
- val probe = rows.map(_.getLong("count")).runWith(TestSink.probe[Long])
+ val probe = rows.map(_.getLong("count")).runWith(TestSink[Long]())
probe.within(10.seconds) {
probe.request(10).expectNextUnordered(5L, 6L).expectComplete()
}
diff --git a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
index 526dda4c1..8894b0071 100644
--- a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
+++ b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
@@ -129,11 +129,10 @@ class CsvParsingSpec extends CsvSpec {
}
"emit completion even without new line at end" in assertAllStagesStopped {
- val (source, sink) = TestSource
- .probe[ByteString]
+ val (source, sink) = TestSource[ByteString]()
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String))
- .toMat(TestSink.probe[List[String]])(Keep.both)
+ .toMat(TestSink[List[String]]())(Keep.both)
.run()
source.sendNext(ByteString("eins,zwei,drei\nuno,dos,tres\n1,2,3"))
sink.request(3)
diff --git
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
index fe5cb0f29..49dbdacc7 100644
---
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
+++
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
@@ -52,14 +52,13 @@ class ElasticsearchSimpleFlowStageTest
"stream ends" should {
"emit element only when downstream requests" in {
val (upstream, downstream) =
- TestSource
- .probe[(immutable.Seq[WriteMessage[String, NotUsed]],
immutable.Seq[WriteResult[String, NotUsed]])]
+ TestSource[(immutable.Seq[WriteMessage[String, NotUsed]],
immutable.Seq[WriteResult[String, NotUsed]])]()
.via(
new impl.ElasticsearchSimpleFlowStage[String, NotUsed](
ElasticsearchParams.V7("es-simple-flow-index"),
settings,
writer))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
upstream.sendNext(dummyMessages)
@@ -77,14 +76,13 @@ class ElasticsearchSimpleFlowStageTest
"client cannot connect to ES" should {
"stop the stream" in {
val (upstream, downstream) =
- TestSource
- .probe[(immutable.Seq[WriteMessage[String, NotUsed]],
immutable.Seq[WriteResult[String, NotUsed]])]
+ TestSource[(immutable.Seq[WriteMessage[String, NotUsed]],
immutable.Seq[WriteResult[String, NotUsed]])]()
.via(
new impl.ElasticsearchSimpleFlowStage[String, NotUsed](
ElasticsearchParams.V7("es-simple-flow-index"),
settings.withConnection(ElasticsearchConnectionSettings("http://wololo:9202")),
writer))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
upstream.sendNext(dummyMessages)
diff --git
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
index c212a735b..d39464f47 100644
---
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
+++
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
@@ -46,7 +46,7 @@ class ElasticsearchSourcStageTest
Map("query" -> """{ "match_all":{}}"""),
ElasticsearchSourceSettings(ElasticsearchConnectionSettings("http://wololo:9202")),
(json: String) => ScrollResponse(Some(json), None)))
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
downstream.request(1)
diff --git a/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
b/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
index 92096c4f6..a2a6878ce 100644
--- a/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
@@ -69,7 +69,7 @@ class FileTailSourceExtrasSpec
// #shutdown-on-delete
- val probe = stream.toMat(TestSink.probe)(Keep.right).run()
+ val probe = stream.toMat(TestSink())(Keep.right).run()
val result = probe.requestNext()
result shouldEqual "a"
@@ -96,7 +96,7 @@ class FileTailSourceExtrasSpec
// #shutdown-on-idle-timeout
- val probe = stream.toMat(TestSink.probe)(Keep.right).run()
+ val probe = stream.toMat(TestSink())(Keep.right).run()
val result = probe.requestNext()
result shouldEqual "a"
diff --git a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
index 433acb853..22fbf6a52 100644
--- a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
@@ -296,7 +296,7 @@ class LogRotatorSinkSpec
"upstream fail before first file creation" in assertAllStagesStopped {
val (triggerFunctionCreator, files) = fileLengthTriggerCreator()
val (probe, completion) =
-
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
val ex = new Exception("my-exception")
probe.sendError(ex)
@@ -307,7 +307,7 @@ class LogRotatorSinkSpec
"upstream fail after first file creation" in assertAllStagesStopped {
val (triggerFunctionCreator, files) = fileLengthTriggerCreator()
val (probe, completion) =
-
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
val ex = new Exception("my-exception")
probe.sendNext(ByteString("test"))
@@ -326,7 +326,7 @@ class LogRotatorSinkSpec
}
}
val (probe, completion) =
-
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
probe.sendNext(ByteString("test"))
the[Exception] thrownBy Await.result(completion, 3.seconds) shouldBe ex
}
@@ -340,8 +340,7 @@ class LogRotatorSinkSpec
}
}
val (probe, completion) =
- TestSource
- .probe[ByteString]
+ TestSource[ByteString]()
.toMat(LogRotatorSink(triggerFunctionCreator,
Set(StandardOpenOption.READ)))(Keep.both)
.run()
probe.sendNext(ByteString("test"))
@@ -370,8 +369,7 @@ class LogRotatorSinkSpec
}
}
val (probe, completion) =
- TestSource
- .probe[ByteString]
+ TestSource[ByteString]()
.toMat(
LogRotatorSink.withSinkFactory(
triggerGeneratorCreator = triggerFunctionCreator,
diff --git
a/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
b/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
index e681001db..f2b8e5e4b 100644
---
a/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
+++
b/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
@@ -33,10 +33,9 @@ class ZipArchiveFlowTest
"stream ends" should {
"emit element only when downstream requests" in {
val (upstream, downstream) =
- TestSource
- .probe[ByteString]
+ TestSource[ByteString]()
.via(new ZipArchiveFlow())
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
upstream.sendNext(FileByteStringSeparators.createStartingByteString("test"))
diff --git
a/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
b/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
index 331e523be..e122a0d77 100644
---
a/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
+++
b/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
@@ -68,7 +68,7 @@ interface CommonFtpStageTest extends BaseSupport,
PekkoSupport {
Source<FtpFile, NotUsed> source = getBrowserSource(basePath);
Pair<NotUsed, TestSubscriber.Probe<FtpFile>> pairResult =
- source.toMat(TestSink.probe(system), Keep.both()).run(system);
+ source.toMat(TestSink.create(system), Keep.both()).run(system);
TestSubscriber.Probe<FtpFile> probe = pairResult.second();
probe.request(demand).expectNextN(numFiles);
probe.expectComplete();
@@ -82,7 +82,7 @@ interface CommonFtpStageTest extends BaseSupport,
PekkoSupport {
Source<ByteString, CompletionStage<IOResult>> source =
getIOSource(fileName);
Pair<CompletionStage<IOResult>, TestSubscriber.Probe<ByteString>>
pairResult =
- source.toMat(TestSink.probe(system), Keep.both()).run(system);
+ source.toMat(TestSink.create(system), Keep.both()).run(system);
TestSubscriber.Probe<ByteString> probe = pairResult.second();
probe.request(100).expectNextOrComplete();
diff --git
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
index 177503d8c..cb34d5561 100644
---
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
+++
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
@@ -107,7 +107,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val basePath = ""
generateFiles(30, 10, basePath)
val probe =
- listFiles(basePath).toMat(TestSink.probe)(Keep.right).run()
+ listFiles(basePath).toMat(TestSink())(Keep.right).run()
probe.request(40).expectNextN(30)
probe.expectComplete()
}
@@ -116,7 +116,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val basePath = "/foo"
generateFiles(30, 10, basePath)
val probe =
- listFiles(basePath).toMat(TestSink.probe)(Keep.right).run()
+ listFiles(basePath).toMat(TestSink())(Keep.right).run()
probe.request(40).expectNextN(30)
probe.expectComplete()
}
@@ -125,7 +125,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val basePath = "/foo"
generateFiles(30, 10, basePath)
val probe =
- listFilesWithFilter(basePath, f =>
false).toMat(TestSink.probe)(Keep.right).run()
+ listFilesWithFilter(basePath, f =>
false).toMat(TestSink())(Keep.right).run()
probe.request(40).expectNextN(12) // 9 files, 3 directories
probe.expectComplete()
@@ -136,7 +136,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
generateFiles(30, 10, basePath)
val probe =
listFilesWithFilter(basePath, f => f.name.contains("1"))
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
probe.request(40).expectNextN(21) // 9 files in root, 2 directories, 10
files in dir_1
probe.expectComplete()
@@ -146,7 +146,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
"list all files in sparse directory tree" in assertAllStagesStopped {
putFileOnFtp("foo/bar/baz/foobar/sample")
val probe =
- listFiles("/").toMat(TestSink.probe)(Keep.right).run()
+ listFiles("/").toMat(TestSink())(Keep.right).run()
probe.request(2).expectNextN(1)
probe.expectComplete()
}
@@ -155,7 +155,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
putFileOnFtp("foo/bar/baz/foobar/sample")
val probe =
listFilesWithFilter("/", _ => true, emitTraversedDirectories = true)
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
probe.request(10).expectNextN(5) // foo, bar, baz, foobar, and sample_1
= 5 files
probe.expectComplete()
@@ -189,7 +189,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val fileName = "sample_io_" + Instant.now().getNano
putFileOnFtp(fileName)
val (result, probe) =
- retrieveFromPath(s"/$fileName").toMat(TestSink.probe)(Keep.both).run()
+ retrieveFromPath(s"/$fileName").toMat(TestSink())(Keep.both).run()
probe.request(100).expectNextOrComplete()
val expectedNumOfBytes = getDefaultContent.getBytes().length
@@ -201,7 +201,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val offset = 10L
putFileOnFtp(fileName)
val (result, probe) =
- retrieveFromPathWithOffset(s"/$fileName",
offset).toMat(TestSink.probe)(Keep.both).run()
+ retrieveFromPathWithOffset(s"/$fileName",
offset).toMat(TestSink())(Keep.both).run()
probe.request(100).expectNextOrComplete()
val expectedNumOfBytes = getDefaultContent.getBytes().length - offset
@@ -213,7 +213,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
val fileContents = new Array[Byte](2000020)
Random.nextBytes(fileContents)
putFileOnFtpWithContents(fileName, fileContents)
- val (result, probe) =
retrieveFromPath(s"/$fileName").toMat(TestSink.probe)(Keep.both).run()
+ val (result, probe) =
retrieveFromPath(s"/$fileName").toMat(TestSink())(Keep.both).run()
probe.request(1000).expectNextOrComplete()
val expectedNumOfBytes = fileContents.length
@@ -227,7 +227,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
Random.nextBytes(fileContents)
putFileOnFtpWithContents(fileName, fileContents)
val (result, probe) =
- retrieveFromPathWithOffset(s"/$fileName",
offset).toMat(TestSink.probe)(Keep.both).run()
+ retrieveFromPathWithOffset(s"/$fileName",
offset).toMat(TestSink())(Keep.both).run()
probe.request(1000).expectNextOrComplete()
val expectedNumOfBytes = fileContents.length - offset
@@ -242,7 +242,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
generateFiles(numOfFiles, numOfFiles, basePath)
val probe = listFiles(basePath)
.mapAsyncUnordered(1)(file => retrieveFromPath(file.path, fromRoot =
true).to(Sink.ignore).run())
- .toMat(TestSink.probe)(Keep.right)
+ .toMat(TestSink())(Keep.right)
.run()
val result = probe.request(numOfFiles + 1).expectNextN(numOfFiles)
probe.expectComplete()
diff --git
a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
index f7a1d2fab..b563f41c1 100644
--- a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
+++ b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
@@ -126,7 +126,7 @@ class IntegrationSpec
// the acknowledged message should not arrive again
val (stream, result2) = GooglePubSub
.subscribe(topic2subscription, config)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
result2.ensureSubscription()
diff --git
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
index d6653269c..535c5148d 100644
---
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
+++
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
@@ -33,7 +33,7 @@ class AnnotateLastSpec
"AnnotateLast" should {
"indicate last element" in {
- val probe = Source(1 to 3).via(AnnotateLast[Int]).runWith(TestSink.probe)
+ val probe = Source(1 to 3).via(AnnotateLast[Int]).runWith(TestSink())
probe.requestNext(NotLast(1))
probe.requestNext(NotLast(2))
probe.requestNext(Last(3))
@@ -41,24 +41,24 @@ class AnnotateLastSpec
}
"indicate first element is last if only one element" in {
- val probe =
Source.single(1).via(AnnotateLast[Int]).runWith(TestSink.probe)
+ val probe = Source.single(1).via(AnnotateLast[Int]).runWith(TestSink())
probe.requestNext(Last(1))
probe.expectComplete()
}
"do nothing when stream is empty" in {
- val probe =
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink.probe)
+ val probe =
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink())
probe.expectSubscriptionAndComplete()
}
"return zero value when stream is empty using zero apply" in {
- val probe =
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink.probe)
+ val probe =
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink())
probe.requestNext(Last(null))
probe.expectComplete()
}
"don't return zero value if stream is non empty using zero apply" in {
- val probe =
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink.probe)
+ val probe =
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink())
probe.requestNext(Last(1))
probe.expectComplete()
}
diff --git
a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
index 7738f3a89..cdbedf6dc 100644
--- a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
+++ b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
@@ -386,7 +386,7 @@ class JmsBufferedAckConnectorsSpec extends
JmsSharedServerSpec {
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.requestNext(convertSpanToDuration(patienceConfig.timeout))
shouldBe Some(aMessage)
@@ -399,7 +399,7 @@ class JmsBufferedAckConnectorsSpec extends
JmsSharedServerSpec {
probe.expectComplete()
// Consuming again should give us no elements, as msg was acked and
therefore removed from the broker
- val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink.probe)(Keep.both).run()
+ val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink())(Keep.both).run()
emptySourceProbe.ensureSubscription().expectNoMessage()
emptyConsumerControl.shutdown()
emptySourceProbe.expectComplete()
diff --git
a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
index 3795c6d10..e9ef7c491 100644
---
a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
+++
b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
@@ -422,7 +422,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
case _ => None
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.requestNext(convertSpanToDuration(patienceConfig.timeout))
shouldBe Some(aMessage)
@@ -435,7 +435,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
probe.expectComplete()
// Consuming again should give us no elements, as msg was acked and
therefore removed from the broker
- val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink.probe)(Keep.both).run()
+ val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink())(Keep.both).run()
emptySourceProbe.ensureSubscription().expectNoMessage()
emptyConsumerControl.shutdown()
emptySourceProbe.expectComplete()
diff --git
a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
index 5c33cf710..bfc78430e 100644
--- a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
+++ b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
@@ -388,7 +388,7 @@ class JmsBufferedAckConnectorsSpec extends
JmsSharedServerSpec {
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.requestNext(convertSpanToDuration(patienceConfig.timeout))
shouldBe Some(aMessage)
@@ -401,7 +401,7 @@ class JmsBufferedAckConnectorsSpec extends
JmsSharedServerSpec {
probe.expectComplete()
// Consuming again should give us no elements, as msg was acked and
therefore removed from the broker
- val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink.probe)(Keep.both).run()
+ val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink())(Keep.both).run()
emptySourceProbe.ensureSubscription().expectNoMessage()
emptyConsumerControl.shutdown()
emptySourceProbe.expectComplete()
diff --git
a/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
b/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
index 10339609b..0cbdde347 100644
---
a/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
+++
b/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
@@ -425,7 +425,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
case _ => None
}
}
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
probe.requestNext(convertSpanToDuration(patienceConfig.timeout))
shouldBe Some(aMessage)
@@ -438,7 +438,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
probe.expectComplete()
// Consuming again should give us no elements, as msg was acked and
therefore removed from the broker
- val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink.probe)(Keep.both).run()
+ val (emptyConsumerControl, emptySourceProbe) =
source.toMat(TestSink())(Keep.both).run()
emptySourceProbe.ensureSubscription().expectNoMessage()
emptyConsumerControl.shutdown()
emptySourceProbe.expectComplete()
diff --git
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
index 40ca5f206..4eb0f8722 100644
---
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
+++
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
@@ -105,10 +105,9 @@ class KinesisFlowSpec extends AnyWordSpec with Matchers
with KinesisMock with Lo
.build()
val (sourceProbe, sinkProbe) =
- TestSource
- .probe[PutRecordsRequestEntry]
+ TestSource[PutRecordsRequestEntry]()
.via(KinesisFlow(streamName, settings))
- .toMat(TestSink.probe[PutRecordsResultEntry])(Keep.both)
+ .toMat(TestSink[PutRecordsResultEntry]())(Keep.both)
.run()
}
@@ -129,10 +128,9 @@ class KinesisFlowSpec extends AnyWordSpec with Matchers
with KinesisMock with Lo
.map(i => (PutRecordsResultEntry.builder().build(), i))
val (sourceProbe, sinkProbe) =
- TestSource
- .probe[(PutRecordsRequestEntry, Int)]
+ TestSource[(PutRecordsRequestEntry, Int)]()
.via(KinesisFlow.withContext(streamName, settings))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
}
diff --git
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
index d87596998..78ada709e 100644
---
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
+++
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
@@ -282,7 +282,7 @@ class KinesisSchedulerSourceSpec
.viaMat(Valve(switchMode))(Keep.right)
.viaMat(KillSwitches.single)(Keep.both)
.watchTermination()(Keep.both)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
watch.onComplete(_ => lock.release())
@@ -449,13 +449,12 @@ class KinesisSchedulerSourceSpec
private trait KinesisSchedulerCheckpointContext {
val (sourceProbe, sinkProbe) =
- TestSource
- .probe[CommittableRecord]
+ TestSource[CommittableRecord]()
.via(
KinesisSchedulerSource
.checkpointRecordsFlow(
KinesisSchedulerCheckpointSettings(maxBatchSize = 100,
maxBatchWait = 500.millis)))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val recordProcessor = new ShardProcessor(_ => ())
}
diff --git
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
index db6b5f4f3..3e56c58b9 100644
---
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
+++
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
@@ -54,7 +54,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build(),
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("2").toByteBuffer)).build())
- val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink.probe)
+ val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink())
probe.requestNext().utf8String shouldEqual "1"
probe.requestNext().utf8String shouldEqual "2"
@@ -72,7 +72,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build(),
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("2").toByteBuffer)).build())
- val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink.probe)
+ val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink())
probe.request(2)
probe.expectNext().utf8String shouldEqual "1"
@@ -94,7 +94,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("5").toByteBuffer)).build(),
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("6").toByteBuffer)).build())
- val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink.probe)
+ val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink())
probe.request(1)
probe.expectNext().utf8String shouldEqual "1"
@@ -124,7 +124,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("3").toByteBuffer)).build())
val probe =
- KinesisSource.basicMerge(mergeSettings,
amazonKinesisAsync).map(_.utf8String).runWith(TestSink.probe)
+ KinesisSource.basicMerge(mergeSettings,
amazonKinesisAsync).map(_.utf8String).runWith(TestSink())
probe.request(6)
probe.expectNextUnordered("1", "1", "2", "2", "3", "3")
@@ -137,7 +137,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
override def records =
util.Arrays.asList(Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build())
- val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink.probe)
+ val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink())
probe.requestNext().utf8String shouldEqual "1"
nextShardIterator.set(null)
@@ -150,7 +150,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers
with KinesisMock with
"fail with error when GetStreamRequest fails" in assertAllStagesStopped {
new KinesisSpecContext with WithGetShardIteratorSuccess with
WithGetRecordsFailure {
- val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink.probe)
+ val probe = KinesisSource.basic(shardSettings,
amazonKinesisAsync).runWith(TestSink())
probe.request(1)
probe.expectError() shouldBe an[KinesisErrors.GetRecordsError]
probe.cancel()
diff --git
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
index 4f9c8d8b9..81bac5187 100644
---
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
+++
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
@@ -81,10 +81,9 @@ class KinesisFirehoseFlowSpec extends AnyWordSpec with
Matchers with KinesisFire
val requestError = new RuntimeException("kinesisfirehose-error")
val (sourceProbe, sinkProbe) =
- TestSource
- .probe[Record]
+ TestSource[Record]()
.via(KinesisFirehoseFlow(streamName, settings))
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
}
diff --git
a/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
b/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
index a9d47f05a..f2d4ced68 100644
---
a/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
+++
b/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
@@ -18,8 +18,7 @@ import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.{ Keep, Source }
-import pekko.stream.testkit.javadsl.TestSink
-import pekko.stream.testkit.scaladsl.TestSource
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.TestKit
import pekko.util.ByteString
import org.scalatest.BeforeAndAfterAll
@@ -41,7 +40,7 @@ class MqttFrameStageSpec
Source
.single(bytes)
.via(new MqttFrameStage(MaxPacketSize))
- .runWith(TestSink.probe(system))
+ .runWith(TestSink()(system))
.request(1)
.expectNext(bytes)
.expectComplete()
@@ -52,7 +51,7 @@ class MqttFrameStageSpec
Source
.single(bytes)
.via(new MqttFrameStage(MaxPacketSize))
- .runWith(TestSink.probe(system))
+ .runWith(TestSink()(system))
.request(1)
.expectNext(bytes)
.expectComplete()
@@ -63,7 +62,7 @@ class MqttFrameStageSpec
Source
.single(bytes ++ bytes)
.via(new MqttFrameStage(MaxPacketSize))
- .runWith(TestSink.probe(system))
+ .runWith(TestSink()(system))
.request(2)
.expectNext(bytes, bytes)
.expectComplete()
@@ -74,10 +73,9 @@ class MqttFrameStageSpec
val bytes1 =
ByteString.newBuilder.putByte(1).putBytes(Array.ofDim(0x80)).result()
val (pub, sub) =
- TestSource
- .probe(system)
+ TestSource()(system)
.via(new MqttFrameStage(MaxPacketSize * 2))
- .toMat(TestSink.probe(system))(Keep.both)
+ .toMat(TestSink()(system))(Keep.both)
.run()
pub.sendNext(bytes0)
@@ -97,7 +95,7 @@ class MqttFrameStageSpec
Source
.single(bytes)
.via(new MqttFrameStage(MaxPacketSize))
- .runWith(TestSink.probe(system))
+ .runWith(TestSink()(system))
.request(1)
.expectError()
ex.getMessage shouldBe s"Max packet size of $MaxPacketSize exceeded with
${MaxPacketSize + 2}"
diff --git a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
index 9b606ff36..c4d00414f 100644
--- a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -314,7 +314,7 @@ public class MqttSourceTest {
MqttSource.atMostOnce(settings1, subscriptions, bufferSize);
Pair<CompletionStage<Done>, TestSubscriber.Probe<MqttMessage>> result2 =
- source1.toMat(TestSink.probe(system), Keep.both()).run(system);
+ source1.toMat(TestSink.create(system), Keep.both()).run(system);
// Ensure that the connection made it all the way to the server by waiting
until it receives a
// message
diff --git a/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
b/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index d92e59763..d1fd588b8 100644
--- a/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -321,7 +321,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
.withBroker(s"tcp://localhost:$proxyPort"),
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
@@ -385,7 +385,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8))
- val (subscribed, probe) = source1.toMat(TestSink.probe)(Keep.both).run()
+ val (subscribed, probe) = source1.toMat(TestSink())(Keep.both).run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
Await.ready(subscribed, timeout)
@@ -435,7 +435,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
.via(sharedKillSwitch.flow)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
Await.ready(killSwitch, timeout)
@@ -471,7 +471,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
.withOfflinePersistenceSettings(bufferSize = 1234),
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 3d769ba31..395a88aed 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -317,7 +317,7 @@ public class MqttSourceTest {
MqttSource.atMostOnce(settings1, subscriptions, bufferSize);
Pair<CompletionStage<Done>, TestSubscriber.Probe<MqttMessage>> result2
=
- source1.toMat(TestSink.probe(system), Keep.both()).run(system);
+ source1.toMat(TestSink.create(system),
Keep.both()).run(system);
// Ensure that the connection made it all the way to the server by
waiting until it receives a
// message
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index 5ae7b1b3a..bedd05ab3 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -331,7 +331,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
.withBroker(s"tcp://localhost:$proxyPort"),
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
@@ -396,7 +396,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8))
- val (subscribed, probe) = source1.toMat(TestSink.probe)(Keep.both).run()
+ val (subscribed, probe) = source1.toMat(TestSink())(Keep.both).run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
Await.ready(subscribed, timeout)
@@ -446,7 +446,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
.via(sharedKillSwitch.flow)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
Await.ready(killSwitch, timeout)
@@ -482,7 +482,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec")
{
.withOfflinePersistenceSettings(bufferSize = 1234),
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
8)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
// Ensure that the connection made it all the way to the server by
waiting until it receives a message
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index 1ad1bac11..61d5bd7d0 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.2.1"
+ override val currentVersion: String = "1.3.0"
}
diff --git
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
index b4ffcb319..fef0d2b45 100644
---
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
+++
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
@@ -38,7 +38,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(meq(publishRequest))).thenReturn(CompletableFuture.completedFuture(publishResult))
val (probe, future) =
-
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+
TestSource[PublishRequest]().via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe.sendNext(PublishRequest.builder().message("sns-message").build()).sendComplete()
@@ -53,7 +53,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
val (probe, future) =
-
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+
TestSource[PublishRequest]().via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe
.sendNext(PublishRequest.builder().message("sns-message-1").build())
@@ -79,7 +79,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
val (probe, future) =
-
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow()).toMat(Sink.seq)(Keep.both).run()
+
TestSource[PublishRequest]().via(SnsPublisher.publishFlow()).toMat(Sink.seq)(Keep.both).run()
probe
.sendNext(PublishRequest.builder().message("sns-message-1").topicArn("topic-arn-1").build())
@@ -105,7 +105,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(meq(publishRequest))).thenReturn(CompletableFuture.completedFuture(publishResult))
- val (probe, future) =
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe.sendNext("sns-message").sendComplete()
Await.result(future, 1.second) mustBe publishResult :: Nil
@@ -117,7 +117,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
- val (probe, future) =
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe.sendNext("sns-message-1").sendNext("sns-message-2").sendNext("sns-message-3").sendComplete()
Await.result(future, 1.second) mustBe publishResult :: publishResult ::
publishResult :: Nil
@@ -140,7 +140,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
when(snsClient.publish(meq(publishRequest))).thenReturn(promise)
- val (probe, future) =
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe.sendNext("sns-message").sendComplete()
a[RuntimeException] should be thrownBy {
@@ -153,7 +153,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with
DefaultTestContext with Mat
it should "fail stage if upstream failure occurs" in {
case class MyCustomException(message: String) extends Exception(message)
- val (probe, future) =
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+ val (probe, future) =
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
probe.sendError(MyCustomException("upstream failure"))
a[MyCustomException] should be thrownBy {
diff --git
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
index 8bb3f8277..854bcd3bb 100644
---
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
+++
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
@@ -41,7 +41,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
when(sqsClient.sendMessage(any[SendMessageRequest]))
.thenReturn(CompletableFuture.completedFuture(SendMessageResponse.builder().build()))
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
probe.sendNext("notused").sendComplete()
Await.result(future, 1.second) shouldBe Done
@@ -58,7 +58,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
override def get(): SendMessageResponse = throw new
RuntimeException("Fake client error")
}))
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
probe.sendNext("notused").sendComplete()
a[RuntimeException] should be thrownBy {
@@ -97,7 +97,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
it should "failure the promise on upstream failure" in {
implicit val sqsClient: SqsAsyncClient = mock[SqsAsyncClient]
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
probe.sendError(new RuntimeException("Fake upstream failure"))
@@ -112,7 +112,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
when(sqsClient.sendMessage(any[SendMessageRequest]))
.thenReturn(CompletableFuture.completedFuture(SendMessageResponse.builder().build()))
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
probe
.sendNext("test-101")
.sendNext("test-102")
@@ -138,7 +138,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
SendMessageBatchResultEntry.builder().id("0").messageId(UUID.randomUUID().toString).build())
.build()))
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
probe.sendNext("notused").sendComplete()
Await.result(future, 1.second) shouldBe Done
@@ -164,7 +164,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
val settings = SqsPublishGroupedSettings.create().withMaxBatchSize(5)
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused",
settings))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink.grouped("notused",
settings))(Keep.both).run()
probe
.sendNext("notused - 1")
.sendNext("notused - 2")
@@ -200,7 +200,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
.failed(BatchResultErrorEntry.builder().id("4").message("a very
weird error just happened").build())
.build()))
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
probe
.sendNext("notused - 1")
.sendNext("notused - 2")
@@ -226,7 +226,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
}))
val settings = SqsPublishGroupedSettings().withMaxBatchSize(5)
- val (probe, future) =
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused",
settings))(Keep.both).run()
+ val (probe, future) =
TestSource[String]().toMat(SqsPublishSink.grouped("notused",
settings))(Keep.both).run()
probe
.sendNext("notused - 1")
.sendNext("notused - 2")
@@ -258,7 +258,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers
with DefaultTestConte
SendMessageBatchResultEntry.builder().id("3").messageId(UUID.randomUUID().toString).build())
.build()))
- val (probe, future) =
TestSource.probe[Seq[String]].toMat(SqsPublishSink.batch("notused"))(Keep.both).run()
+ val (probe, future) =
TestSource[Seq[String]]().toMat(SqsPublishSink.batch("notused"))(Keep.both).run()
probe
.sendNext(
Seq(
diff --git
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
index 0ebfd6f74..6a6d3fdb3 100644
---
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
+++
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
@@ -53,7 +53,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers
with DefaultTestContex
val probe = SqsSource(
"url",
-
SqsSourceSettings.Defaults.withMaxBufferSize(10)).runWith(TestSink.probe[Message])
+
SqsSourceSettings.Defaults.withMaxBufferSize(10)).runWith(TestSink[Message]())
defaultMessages.foreach(probe.requestNext)
@@ -90,7 +90,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers
with DefaultTestContex
val probe = SqsSource(
"url",
SqsSourceSettings.Defaults.withMaxBufferSize(
- SqsSourceSettings.Defaults.maxBatchSize *
bufferToBatchRatio)).runWith(TestSink.probe[Message])
+ SqsSourceSettings.Defaults.maxBatchSize *
bufferToBatchRatio)).runWith(TestSink[Message]())
Thread.sleep(timeout.toMillis * (bufferToBatchRatio + 1))
@@ -141,7 +141,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers
with DefaultTestContex
SqsSourceSettings.Defaults
.withMaxBufferSize(10)
.withParallelRequests(10)
- .withWaitTime(timeout)).runWith(TestSink.probe[Message])
+ .withWaitTime(timeout)).runWith(TestSink[Message]())
(1 to firstWithDataCount * 10).foreach(_ => probe.requestNext())
diff --git
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
index 336189201..53ac2f215 100644
---
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
+++
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
@@ -135,8 +135,7 @@ class CharsetCodingFlowsSpec
}
def verifyByteSends(charsetIn: Charset, charsetOut: Charset, in: String) =
{
- val (source, sink) = TestSource
- .probe[ByteString]
+ val (source, sink) = TestSource[ByteString]()
.via(TextFlow.transcoding(charsetIn, charsetOut))
.map(_.decodeString(charsetOut))
.toMat(Sink.seq)(Keep.both)
@@ -163,10 +162,9 @@ class CharsetCodingFlowsSpec
}
"complete" in {
- val (source, sink) = TestSource
- .probe[ByteString]
+ val (source, sink) = TestSource[ByteString]()
.via(TextFlow.transcoding(StandardCharsets.UTF_8,
StandardCharsets.UTF_8))
- .toMat(TestSink.probe[ByteString])(Keep.both)
+ .toMat(TestSink[ByteString]())(Keep.both)
.run()
source.sendNext(ByteString("eins,zwei,drei"))
sink.request(3)
diff --git a/udp/src/test/java/docs/javadsl/UdpTest.java
b/udp/src/test/java/docs/javadsl/UdpTest.java
index 346b9d175..90dd503cf 100644
--- a/udp/src/test/java/docs/javadsl/UdpTest.java
+++ b/udp/src/test/java/docs/javadsl/UdpTest.java
@@ -72,9 +72,9 @@ public class UdpTest {
Pair<TestPublisher.Probe<Datagram>,
CompletionStage<InetSocketAddress>>,
TestSubscriber.Probe<Datagram>>
materialized =
- TestSource.<Datagram>probe(system)
+ TestSource.<Datagram>create(system)
.viaMat(bindFlow, Keep.both())
- .toMat(TestSink.probe(system), Keep.both())
+ .toMat(TestSink.create(system), Keep.both())
.run(system);
{
@@ -140,9 +140,9 @@ public class UdpTest {
Pair<TestPublisher.Probe<Datagram>,
CompletionStage<InetSocketAddress>>,
TestSubscriber.Probe<Datagram>>
materialized =
- TestSource.<Datagram>probe(system)
+ TestSource.<Datagram>create(system)
.viaMat(bindFlow, Keep.both())
- .toMat(TestSink.probe(system), Keep.both())
+ .toMat(TestSink.create(system), Keep.both())
.run(system);
{
diff --git a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
index 8f58affa6..2f87b71bc 100644
--- a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
+++ b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
@@ -62,10 +62,9 @@ class UdpSpec
Udp.bindFlow(bindToLocal)
// #bind-flow
- val ((pub, bound), sub) = TestSource
- .probe[Datagram](system)
+ val ((pub, bound), sub) = TestSource[Datagram]()(system)
.viaMat(bindFlow)(Keep.both)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val destination = bound.futureValue
@@ -103,10 +102,9 @@ class UdpSpec
val bindFlow: Flow[Datagram, Datagram, Future[InetSocketAddress]] =
Udp.bindFlow(bindToLocal, List(UdpSO.broadcast(true)))
- val ((pub, bound), sub) = TestSource
- .probe[Datagram](system)
+ val ((pub, bound), sub) = TestSource[Datagram]()(system)
.viaMat(bindFlow)(Keep.both)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val destination = bound.futureValue
@@ -133,16 +131,14 @@ class UdpSpec
}
"ping-pong messages" in {
- val ((pub1, bound1), sub1) = TestSource
- .probe[Datagram](system)
+ val ((pub1, bound1), sub1) = TestSource[Datagram]()(system)
.viaMat(Udp.bindFlow(bindToLocal))(Keep.both)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
- val ((pub2, bound2), sub2) = TestSource
- .probe[Datagram](system)
+ val ((pub2, bound2), sub2) = TestSource[Datagram]()(system)
.viaMat(Udp.bindFlow(bindToLocal))(Keep.both)
- .toMat(TestSink.probe)(Keep.both)
+ .toMat(TestSink())(Keep.both)
.run()
val boundAddress1 = bound1.futureValue
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]