This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch testkitWarning in repository https://gitbox.apache.org/repos/asf/pekko.git
commit c2f7490b8aa4279ed780243a166ce4209c97237d Author: He-Pin(�邦) <[email protected]> AuthorDate: Fri Mar 13 18:57:46 2026 +0800 Fix Java compiler warnings for unchecked calls in TestSink/TestSource (#2625) Add Class<T> overloads to TestSink.create() and TestSource.create() in javadsl to help Java's type inference, and add explicit type parameters at all call sites to eliminate 'unchecked call' compiler warnings. - Add create[T](clazz: Class[T], system) overloads to TestSink and TestSource - Fix ScalaDoc links to point to TestSubscriber.Probe / TestPublisher.Probe - Add explicit type witnesses at ~70 call sites across 11 Java test files - Retain @SuppressWarnings where needed for unrelated varargs warnings --- .../java/jdocs/stream/StreamTestKitDocTest.java | 2 +- .../javadsl/cookbook/RecipeAdhocSourceTest.java | 10 ++--- .../javadsl/cookbook/RecipeGlobalRateLimit.java | 2 +- .../jdocs/stream/javadsl/cookbook/RecipeHold.java | 8 ++-- .../javadsl/cookbook/RecipeManualTrigger.java | 8 ++-- .../stream/javadsl/cookbook/RecipeMissedTicks.java | 4 +- .../stream/javadsl/cookbook/RecipeSimpleDrop.java | 2 +- .../pekko/stream/testkit/javadsl/TestSink.scala | 16 +++++++- .../pekko/stream/testkit/javadsl/TestSource.scala | 16 +++++++- .../org/apache/pekko/stream/javadsl/FlowTest.java | 46 ++++++++++----------- .../apache/pekko/stream/javadsl/RetryFlowTest.java | 8 ++-- .../org/apache/pekko/stream/javadsl/SinkTest.java | 4 +- .../apache/pekko/stream/javadsl/SourceTest.java | 48 +++++++++++----------- 13 files changed, 101 insertions(+), 73 deletions(-) diff --git a/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java b/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java index 929e226530..3d23438215 100644 --- a/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java +++ b/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java @@ -171,7 +171,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { Source.from(Arrays.asList(1, 2, 3, 4)).filter(elem -> elem % 2 == 0).map(elem -> elem * 2); sourceUnderTest - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(4, 8) .expectComplete(); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index 24423e2bb1..c0c0f5d73b 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -98,7 +98,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { { TestSubscriber.Probe<String> probe = adhocSource(Source.repeat("a"), duration200mills, 3) - .toMat(TestSink.create(system), Keep.right()) + .toMat(TestSink.<String>create(system), Keep.right()) .run(system); probe.requestNext("a"); } @@ -118,7 +118,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) - .toMat(TestSink.create(system), Keep.right()) + .toMat(TestSink.<String>create(system), Keep.right()) .run(system); probe.requestNext("a"); @@ -141,7 +141,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) - .toMat(TestSink.create(system), Keep.right()) + .toMat(TestSink.<String>create(system), Keep.right()) .run(system); probe.requestNext("a"); @@ -179,7 +179,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) - .toMat(TestSink.create(system), Keep.right()) + .toMat(TestSink.<String>create(system), Keep.right()) .run(system); probe.requestNext("a"); @@ -209,7 +209,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) - .toMat(TestSink.create(system), Keep.right()) + .toMat(TestSink.<String>create(system), Keep.right()) .run(system); probe.requestNext("a"); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java index b884b58a12..84fc9acfbb 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java @@ -211,7 +211,7 @@ public class RecipeGlobalRateLimit extends RecipeTest { final Duration twoSeconds = dilated(Duration.ofSeconds(2)); - final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.create(system); + final Sink<String, TestSubscriber.Probe<String>> sink = TestSink.<String>create(system); final TestSubscriber.Probe<String> probe = RunnableGraph.<TestSubscriber.Probe<String>>fromGraph( GraphDSL.create( diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java index f9a5dfa4b9..c7d0eb4409 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java @@ -158,8 +158,8 @@ public class RecipeHold extends RecipeTest { public void workForVersion1() throws Exception { new TestKit(system) { { - final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.create(system); - final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.create(system); + final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.<Integer>create(system); + final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.<Integer>create(system); Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub = source.via(new HoldWithInitial<>(0)).toMat(sink, Keep.both()).run(system); @@ -186,8 +186,8 @@ public class RecipeHold extends RecipeTest { public void workForVersion2() throws Exception { new TestKit(system) { { - final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.create(system); - final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.create(system); + final Source<Integer, TestPublisher.Probe<Integer>> source = TestSource.<Integer>create(system); + final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.<Integer>create(system); Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubSub = source.via(new HoldWithWait<>()).toMat(sink, Keep.both()).run(system); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java index 4cbe536ef2..edd9098d2d 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java @@ -54,8 +54,8 @@ public class RecipeManualTrigger extends RecipeTest { new TestKit(system) { { final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource = - TestSource.create(system); - final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.create(system); + TestSource.<Trigger>create(system); + final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.<Message>create(system); // #manually-triggered-stream final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>> g = @@ -112,8 +112,8 @@ public class RecipeManualTrigger extends RecipeTest { new TestKit(system) { { final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource = - TestSource.create(system); - final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.create(system); + TestSource.<Trigger>create(system); + final Sink<Message, TestSubscriber.Probe<Message>> messageSink = TestSink.<Message>create(system); // #manually-triggered-stream-zipwith final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, TestSubscriber.Probe<Message>>> g = diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java index 4ba895bd26..223b02a2ce 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java @@ -55,8 +55,8 @@ public class RecipeMissedTicks extends RecipeTest { final Tick Tick = new Tick(); { - final Source<Tick, TestPublisher.Probe<Tick>> tickStream = TestSource.create(system); - final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.create(system); + final Source<Tick, TestPublisher.Probe<Tick>> tickStream = TestSource.<Tick>create(system); + final Sink<Integer, TestSubscriber.Probe<Integer>> sink = TestSink.<Integer>create(system); @SuppressWarnings("unused") // #missed-ticks diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java index 84bc9d85e8..5dee919eec 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java @@ -65,7 +65,7 @@ public class RecipeSimpleDrop extends RecipeTest { final Pair<TestPublisher.Probe<Message>, TestSubscriber.Probe<Message>> pubSub = TestSource.<Message>create(system) .via(realDroppyStream) - .toMat(TestSink.create(system), (pub, sub) -> new Pair<>(pub, sub)) + .toMat(TestSink.<Message>create(system), (pub, sub) -> new Pair<>(pub, sub)) .run(system); final TestPublisher.Probe<Message> pub = pubSub.first(); final TestSubscriber.Probe<Message> sub = pubSub.second(); diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala index d6ab2db74c..bd22d55401 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala @@ -13,6 +13,8 @@ package org.apache.pekko.stream.testkit.javadsl +import scala.annotation.nowarn + import org.apache.pekko import pekko.actor.ClassicActorSystemProvider import pekko.stream.javadsl.Sink @@ -22,9 +24,21 @@ import pekko.stream.testkit._ object TestSink { /** - * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber]]. + * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber.Probe]]. */ def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]] = new Sink(scaladsl.TestSink[T]()(system)) + /** + * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber.Probe]]. + * + * This overload accepts a `Class[T]` parameter to help Java's type inference, + * avoiding "unchecked call" compiler warnings when chaining probe methods. + * + * @since 2.0.0 + */ + @nowarn("cat=unused-params") + def create[T](clazz: Class[T], system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]] = + new Sink(scaladsl.TestSink[T]()(system)) + } diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala index 968c3f3e49..ac7a87df2e 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala @@ -13,6 +13,8 @@ package org.apache.pekko.stream.testkit.javadsl +import scala.annotation.nowarn + import org.apache.pekko import pekko.actor.ClassicActorSystemProvider import pekko.stream.javadsl.Source @@ -22,9 +24,21 @@ import pekko.stream.testkit._ object TestSource { /** - * A Source that materializes to a [[pekko.stream.testkit.TestPublisher]]. + * A Source that materializes to a [[pekko.stream.testkit.TestPublisher.Probe]]. */ def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]] = new Source(scaladsl.TestSource[T]()(system)) + /** + * A Source that materializes to a [[pekko.stream.testkit.TestPublisher.Probe]]. + * + * This overload accepts a `Class[T]` parameter to help Java's type inference, + * avoiding "unchecked call" compiler warnings when chaining probe methods. + * + * @since 2.0.0 + */ + @nowarn("cat=unused-params") + def create[T](clazz: Class[T], system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]] = + new Source(scaladsl.TestSource[T]()(system)) + } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 6956b23739..e3ac338bfd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -172,7 +172,7 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseGroupedAdjacentBy() { Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey")) .groupedAdjacentBy(str -> str.charAt(0)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<List<String>>create(system), system) .request(4) .expectNext(Lists.newArrayList("Hello", "Hi")) .expectNext(Lists.newArrayList("Greetings")) @@ -184,7 +184,7 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseGroupedAdjacentByWeighted() { Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey")) .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) str.length()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<List<String>>create(system), system) .request(6) .expectNext(Lists.newArrayList("Hello")) .expectNext(Lists.newArrayList("HiHi")) @@ -200,7 +200,7 @@ public class FlowTest extends StreamTest { final Flow<Integer, String, NotUsed> flow = Flow.fromFunction(String::valueOf); source .via(flow.contramap(Integer::valueOf)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(3) .expectNext("1") .expectNext("2") @@ -214,7 +214,7 @@ public class FlowTest extends StreamTest { final Flow<Integer, Integer, NotUsed> flow = Flow.<Integer>create().map(elem -> elem * 2); source .via(flow.dimap(Integer::valueOf, String::valueOf)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(3) .expectNext("2") .expectNext("4") @@ -303,7 +303,7 @@ public class FlowTest extends StreamTest { gate.set(false); return Optional.of("end"); })) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(4) .expectNext("1", "2", "3", "end") .expectComplete(); @@ -1100,7 +1100,7 @@ public class FlowTest extends StreamTest { PFBuilder.<Integer, Integer>create() .match(Integer.class, elem -> elem % 2 != 0, elem -> elem) .build()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .ensureSubscription() .request(5) .expectNext(1, 3, 5) @@ -1114,7 +1114,7 @@ public class FlowTest extends StreamTest { PFBuilder.<Integer, Integer>create() .match(Integer.class, elem -> elem % 2 != 0, elem -> elem) .build()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .ensureSubscription() .request(5) .expectNextN(Arrays.asList(1, 3, 5)) @@ -1133,7 +1133,7 @@ public class FlowTest extends StreamTest { elem -> elem.isPresent() && (Integer) elem.get() % 2 == 0, elem -> (Integer) elem.get()) .build()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .ensureSubscription() .request(4) .expectNext(2) @@ -1383,7 +1383,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorComplete()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1403,7 +1403,7 @@ public class FlowTest extends StreamTest { } }) .onErrorContinue(error -> logger().error(error, "Error occurred"))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1421,7 +1421,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) @@ -1440,7 +1440,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1462,7 +1462,7 @@ public class FlowTest extends StreamTest { .onErrorContinue( IllegalArgumentException.class, error -> logger().error(error, "Error occurred"))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1482,7 +1482,7 @@ public class FlowTest extends StreamTest { .via( Flow.of(Integer.class) .onErrorResume(IllegalArgumentException.class, e -> Source.single(0))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) @@ -1502,7 +1502,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1524,7 +1524,7 @@ public class FlowTest extends StreamTest { Flow.of(Integer.class) .onErrorContinue( TimeoutException.class, error -> logger().error(error, "Error occurred"))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1543,7 +1543,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> Source.single(0))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1561,7 +1561,7 @@ public class FlowTest extends StreamTest { } }) .via(Flow.of(Integer.class).onErrorComplete(ex -> ex.getMessage().contains("Boom"))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1583,7 +1583,7 @@ public class FlowTest extends StreamTest { .onErrorContinue( ex -> ex.getMessage().contains("Boom"), error -> logger().error(error, "Error occurred"))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1603,7 +1603,7 @@ public class FlowTest extends StreamTest { .via( Flow.of(Integer.class) .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0))) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) @@ -1622,7 +1622,7 @@ public class FlowTest extends StreamTest { source .via(flow) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(2) .expectNext(head) .expectError(boom); @@ -1636,7 +1636,7 @@ public class FlowTest extends StreamTest { .mapError(NoSuchElementException.class, IllegalArgumentException::new); final Throwable actual = - source.via(flow).runWith(TestSink.create(system), system).request(1).expectError(); + source.via(flow).runWith(TestSink.<Character>create(system), system).request(1).expectError(); org.junit.Assert.assertTrue(actual instanceof IndexOutOfBoundsException); } @@ -1652,7 +1652,7 @@ public class FlowTest extends StreamTest { source .via(flow) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(2) .expectNext(head) .expectError(boom); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java index 711971353d..061919d14e 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java @@ -104,7 +104,7 @@ public class RetryFlowTest extends StreamTest { final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> probes = TestSource.<Integer>create(system) .via(retryFlow) - .toMat(TestSink.create(system), Keep.both()) + .toMat(TestSink.<Integer>create(system), Keep.both()) .run(system); final TestPublisher.Probe<Integer> source = probes.first(); @@ -164,7 +164,7 @@ public class RetryFlowTest extends StreamTest { TestSource.<Integer>create(system) .map(i -> Pair.create(i, new SomeContext())) .via(retryFlow) - .toMat(TestSink.create(system), Keep.both()) + .toMat(TestSink.<Pair<Integer, SomeContext>>create(system), Keep.both()) .run(system); final TestPublisher.Probe<Integer> source = probes.first(); @@ -215,7 +215,7 @@ public class RetryFlowTest extends StreamTest { TestSource.<Integer>create(system) .map(i -> Pair.create(i, i)) .via(retryFlow) - .toMat(TestSink.create(system), Keep.both()) + .toMat(TestSink.<Pair<Try<Integer>, Integer>>create(system), Keep.both()) .run(system); final TestPublisher.Probe<Integer> source = probes.first(); @@ -266,7 +266,7 @@ public class RetryFlowTest extends StreamTest { } return Optional.empty(); })) - .toMat(TestSink.create(system), Keep.both()) + .toMat(TestSink.<Pair<Try<Integer>, Integer>>create(system), Keep.both()) .run(system); final TestPublisher.Probe<Integer> source = probes.first(); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index c912e734aa..764831304f 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -144,8 +144,8 @@ public class SinkTest extends StreamTest { @Test public void mustBeAbleToUseCombineMat() { - final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 = TestSink.create(system); - final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 = TestSink.create(system); + final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 = TestSink.<Integer>create(system); + final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 = TestSink.<Integer>create(system); final Sink<Integer, Pair<TestSubscriber.Probe<Integer>, TestSubscriber.Probe<Integer>>> sink = Sink.combineMat(sink1, sink2, Broadcast::create, Keep.both()); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index dda5bb4bb7..fe829b5dad 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -124,7 +124,7 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToCompleteWhenArrayIsEmpty() { Source.fromArray(new String[] {}) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Object>create(system), system) .ensureSubscription() .expectComplete(); } @@ -132,7 +132,7 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToEmitEveryArrayElementSequentially() { Source.fromArray(new String[] {"a", "b", "c"}) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Object>create(system), system) .ensureSubscription() .request(3) .expectNext("a") @@ -144,7 +144,7 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToUseItems() { Source.items("a", "b", "c") - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .ensureSubscription() .request(3) .expectNext("a") @@ -156,7 +156,7 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToUseItemsWhenEmpty() { Source.<String>items() - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .ensureSubscription() .request(1) .expectComplete(); @@ -348,7 +348,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToConcatEmptySource() { Source.from(Arrays.asList("A", "B", "C")) .concat(Source.empty()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .ensureSubscription() .request(3) .expectNext("A", "B", "C") @@ -361,7 +361,7 @@ public class SourceTest extends StreamTest { final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 6)); final Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8, 9)); final TestSubscriber.Probe<Integer> sub = - sourceA.concatAllLazy(sourceB, sourceC).runWith(TestSink.create(system), system); + sourceA.concatAllLazy(sourceB, sourceC).runWith(TestSink.<Integer>create(system), system); sub.expectSubscription().request(9); sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete(); } @@ -862,7 +862,7 @@ public class SourceTest extends StreamTest { } @Test - @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") // generic array creation for varargs expectNext(List<Integer>...) public void mustBeAbleToUseAggregateWithBoundary() { final java.lang.Iterable<Integer> input = Arrays.asList(1, 1, 2, 3, 3, 4, 5, 5, 6); // used to implement grouped(2) @@ -880,7 +880,7 @@ public class SourceTest extends StreamTest { }, Function.identity(), Optional.empty()) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<List<Integer>>create(system), system) .ensureSubscription() .request(6) .expectNext( @@ -928,7 +928,7 @@ public class SourceTest extends StreamTest { gate.set(false); return Optional.of("end"); }) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .request(4) .expectNext("1", "2", "3", "end") .expectComplete(); @@ -1013,7 +1013,7 @@ public class SourceTest extends StreamTest { final TestSubscriber.Probe<Integer> sub = sourceA .interleaveAll(Arrays.asList(sourceB, sourceC), 2, false) - .runWith(TestSink.create(system), system); + .runWith(TestSink.<Integer>create(system), system); sub.expectSubscription().request(9); sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete(); } @@ -1361,7 +1361,7 @@ public class SourceTest extends StreamTest { final TestSubscriber.Probe<Integer> sub = sourceA .mergeAll(Arrays.asList(sourceB, sourceC), false) - .runWith(TestSink.create(system), system); + .runWith(TestSink.<Integer>create(system), system); sub.expectSubscription().request(9); sub.expectNextUnorderedN(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)).expectComplete(); } @@ -1677,7 +1677,7 @@ public class SourceTest extends StreamTest { queue.offer("Message2"); queue.complete(); }) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<String>create(system), system) .ensureSubscription() .request(2) .expectNext("Message1", "Message2") @@ -1696,7 +1696,7 @@ public class SourceTest extends StreamTest { } }) .onErrorComplete() - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1714,7 +1714,7 @@ public class SourceTest extends StreamTest { } }) .onErrorContinue(e -> logger().error(e, "Error encountered")) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1732,7 +1732,7 @@ public class SourceTest extends StreamTest { } }) .onErrorResume(e -> Source.single(0)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) @@ -1751,7 +1751,7 @@ public class SourceTest extends StreamTest { } }) .onErrorComplete(IllegalArgumentException.class) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1770,7 +1770,7 @@ public class SourceTest extends StreamTest { }) .onErrorContinue( IllegalArgumentException.class, e -> logger().error(e, "Error encountered")) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1788,7 +1788,7 @@ public class SourceTest extends StreamTest { } }) .onErrorResume(IllegalArgumentException.class, e -> Source.single(0)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) @@ -1808,7 +1808,7 @@ public class SourceTest extends StreamTest { } }) .onErrorComplete(TimeoutException.class) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1827,7 +1827,7 @@ public class SourceTest extends StreamTest { } }) .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error encountered")) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1846,7 +1846,7 @@ public class SourceTest extends StreamTest { } }) .onErrorResume(TimeoutException.class, e -> Source.single(0)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectError(ex); @@ -1864,7 +1864,7 @@ public class SourceTest extends StreamTest { } }) .onErrorComplete(ex -> ex.getMessage().contains("Boom")) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1883,7 +1883,7 @@ public class SourceTest extends StreamTest { }) .onErrorContinue( ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, "Error encountered")) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectComplete(); @@ -1901,7 +1901,7 @@ public class SourceTest extends StreamTest { } }) .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0)) - .runWith(TestSink.create(system), system) + .runWith(TestSink.<Integer>create(system), system) .request(2) .expectNext(1) .expectNext(0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
