This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch testkitWarning
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit c2f7490b8aa4279ed780243a166ce4209c97237d
Author: He-Pin(�邦) <[email protected]>
AuthorDate: Fri Mar 13 18:57:46 2026 +0800

    Fix Java compiler warnings for unchecked calls in TestSink/TestSource 
(#2625)
    
    Add Class<T> overloads to TestSink.create() and TestSource.create() in
    javadsl to help Java's type inference, and add explicit type parameters
    at all call sites to eliminate 'unchecked call' compiler warnings.
    
    - Add create[T](clazz: Class[T], system) overloads to TestSink and 
TestSource
    - Fix ScalaDoc links to point to TestSubscriber.Probe / TestPublisher.Probe
    - Add explicit type witnesses at ~70 call sites across 11 Java test files
    - Retain @SuppressWarnings where needed for unrelated varargs warnings
---
 .../java/jdocs/stream/StreamTestKitDocTest.java    |  2 +-
 .../javadsl/cookbook/RecipeAdhocSourceTest.java    | 10 ++---
 .../javadsl/cookbook/RecipeGlobalRateLimit.java    |  2 +-
 .../jdocs/stream/javadsl/cookbook/RecipeHold.java  |  8 ++--
 .../javadsl/cookbook/RecipeManualTrigger.java      |  8 ++--
 .../stream/javadsl/cookbook/RecipeMissedTicks.java |  4 +-
 .../stream/javadsl/cookbook/RecipeSimpleDrop.java  |  2 +-
 .../pekko/stream/testkit/javadsl/TestSink.scala    | 16 +++++++-
 .../pekko/stream/testkit/javadsl/TestSource.scala  | 16 +++++++-
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 46 ++++++++++-----------
 .../apache/pekko/stream/javadsl/RetryFlowTest.java |  8 ++--
 .../org/apache/pekko/stream/javadsl/SinkTest.java  |  4 +-
 .../apache/pekko/stream/javadsl/SourceTest.java    | 48 +++++++++++-----------
 13 files changed, 101 insertions(+), 73 deletions(-)

diff --git a/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java 
b/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java
index 929e226530..3d23438215 100644
--- a/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java
+++ b/docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java
@@ -171,7 +171,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
         Source.from(Arrays.asList(1, 2, 3, 4)).filter(elem -> elem % 2 == 
0).map(elem -> elem * 2);
 
     sourceUnderTest
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(4, 8)
         .expectComplete();
diff --git 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java
index 24423e2bb1..c0c0f5d73b 100644
--- 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java
+++ 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java
@@ -98,7 +98,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
       {
         TestSubscriber.Probe<String> probe =
             adhocSource(Source.repeat("a"), duration200mills, 3)
-                .toMat(TestSink.create(system), Keep.right())
+                .toMat(TestSink.<String>create(system), Keep.right())
                 .run(system);
         probe.requestNext("a");
       }
@@ -118,7 +118,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
                             (a, term) -> term.thenRun(() -> 
shutdown.complete(Done.getInstance()))),
                     duration200mills,
                     3)
-                .toMat(TestSink.create(system), Keep.right())
+                .toMat(TestSink.<String>create(system), Keep.right())
                 .run(system);
 
         probe.requestNext("a");
@@ -141,7 +141,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
                             (a, term) -> term.thenRun(() -> 
shutdown.complete(Done.getInstance()))),
                     duration200mills,
                     3)
-                .toMat(TestSink.create(system), Keep.right())
+                .toMat(TestSink.<String>create(system), Keep.right())
                 .run(system);
 
         probe.requestNext("a");
@@ -179,7 +179,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
                         (a, term) -> term.thenRun(() -> 
shutdown.complete(Done.getInstance()))),
                     duration200mills,
                     3)
-                .toMat(TestSink.create(system), Keep.right())
+                .toMat(TestSink.<String>create(system), Keep.right())
                 .run(system);
 
         probe.requestNext("a");
@@ -209,7 +209,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
                         (a, term) -> term.thenRun(() -> 
shutdown.complete(Done.getInstance()))),
                     duration200mills,
                     3)
-                .toMat(TestSink.create(system), Keep.right())
+                .toMat(TestSink.<String>create(system), Keep.right())
                 .run(system);
 
         probe.requestNext("a");
diff --git 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java
index b884b58a12..84fc9acfbb 100644
--- 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java
+++ 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeGlobalRateLimit.java
@@ -211,7 +211,7 @@ public class RecipeGlobalRateLimit extends RecipeTest {
 
         final Duration twoSeconds = dilated(Duration.ofSeconds(2));
 
-        final Sink<String, TestSubscriber.Probe<String>> sink = 
TestSink.create(system);
+        final Sink<String, TestSubscriber.Probe<String>> sink = 
TestSink.<String>create(system);
         final TestSubscriber.Probe<String> probe =
             RunnableGraph.<TestSubscriber.Probe<String>>fromGraph(
                     GraphDSL.create(
diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java
index f9a5dfa4b9..c7d0eb4409 100644
--- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java
+++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java
@@ -158,8 +158,8 @@ public class RecipeHold extends RecipeTest {
   public void workForVersion1() throws Exception {
     new TestKit(system) {
       {
-        final Source<Integer, TestPublisher.Probe<Integer>> source = 
TestSource.create(system);
-        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.create(system);
+        final Source<Integer, TestPublisher.Probe<Integer>> source = 
TestSource.<Integer>create(system);
+        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.<Integer>create(system);
 
         Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> 
pubSub =
             source.via(new HoldWithInitial<>(0)).toMat(sink, 
Keep.both()).run(system);
@@ -186,8 +186,8 @@ public class RecipeHold extends RecipeTest {
   public void workForVersion2() throws Exception {
     new TestKit(system) {
       {
-        final Source<Integer, TestPublisher.Probe<Integer>> source = 
TestSource.create(system);
-        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.create(system);
+        final Source<Integer, TestPublisher.Probe<Integer>> source = 
TestSource.<Integer>create(system);
+        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.<Integer>create(system);
 
         Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> 
pubSub =
             source.via(new HoldWithWait<>()).toMat(sink, 
Keep.both()).run(system);
diff --git 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java
index 4cbe536ef2..edd9098d2d 100644
--- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java
+++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java
@@ -54,8 +54,8 @@ public class RecipeManualTrigger extends RecipeTest {
     new TestKit(system) {
       {
         final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource =
-            TestSource.create(system);
-        final Sink<Message, TestSubscriber.Probe<Message>> messageSink = 
TestSink.create(system);
+            TestSource.<Trigger>create(system);
+        final Sink<Message, TestSubscriber.Probe<Message>> messageSink = 
TestSink.<Message>create(system);
 
         // #manually-triggered-stream
         final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, 
TestSubscriber.Probe<Message>>> g =
@@ -112,8 +112,8 @@ public class RecipeManualTrigger extends RecipeTest {
     new TestKit(system) {
       {
         final Source<Trigger, TestPublisher.Probe<Trigger>> triggerSource =
-            TestSource.create(system);
-        final Sink<Message, TestSubscriber.Probe<Message>> messageSink = 
TestSink.create(system);
+            TestSource.<Trigger>create(system);
+        final Sink<Message, TestSubscriber.Probe<Message>> messageSink = 
TestSink.<Message>create(system);
 
         // #manually-triggered-stream-zipwith
         final RunnableGraph<Pair<TestPublisher.Probe<Trigger>, 
TestSubscriber.Probe<Message>>> g =
diff --git 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java
index 4ba895bd26..223b02a2ce 100644
--- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java
+++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java
@@ -55,8 +55,8 @@ public class RecipeMissedTicks extends RecipeTest {
       final Tick Tick = new Tick();
 
       {
-        final Source<Tick, TestPublisher.Probe<Tick>> tickStream = 
TestSource.create(system);
-        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.create(system);
+        final Source<Tick, TestPublisher.Probe<Tick>> tickStream = 
TestSource.<Tick>create(system);
+        final Sink<Integer, TestSubscriber.Probe<Integer>> sink = 
TestSink.<Integer>create(system);
 
         @SuppressWarnings("unused")
         // #missed-ticks
diff --git 
a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java 
b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java
index 84bc9d85e8..5dee919eec 100644
--- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java
+++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSimpleDrop.java
@@ -65,7 +65,7 @@ public class RecipeSimpleDrop extends RecipeTest {
         final Pair<TestPublisher.Probe<Message>, 
TestSubscriber.Probe<Message>> pubSub =
             TestSource.<Message>create(system)
                 .via(realDroppyStream)
-                .toMat(TestSink.create(system), (pub, sub) -> new Pair<>(pub, 
sub))
+                .toMat(TestSink.<Message>create(system), (pub, sub) -> new 
Pair<>(pub, sub))
                 .run(system);
         final TestPublisher.Probe<Message> pub = pubSub.first();
         final TestSubscriber.Probe<Message> sub = pubSub.second();
diff --git 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
index d6ab2db74c..bd22d55401 100644
--- 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
+++ 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.stream.testkit.javadsl
 
+import scala.annotation.nowarn
+
 import org.apache.pekko
 import pekko.actor.ClassicActorSystemProvider
 import pekko.stream.javadsl.Sink
@@ -22,9 +24,21 @@ import pekko.stream.testkit._
 object TestSink {
 
   /**
-   * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber]].
+   * A Sink that materialized to a 
[[pekko.stream.testkit.TestSubscriber.Probe]].
    */
   def create[T](system: ClassicActorSystemProvider): Sink[T, 
TestSubscriber.Probe[T]] =
     new Sink(scaladsl.TestSink[T]()(system))
 
+  /**
+   * A Sink that materialized to a 
[[pekko.stream.testkit.TestSubscriber.Probe]].
+   *
+   * This overload accepts a `Class[T]` parameter to help Java's type 
inference,
+   * avoiding "unchecked call" compiler warnings when chaining probe methods.
+   *
+   * @since 2.0.0
+   */
+  @nowarn("cat=unused-params")
+  def create[T](clazz: Class[T], system: ClassicActorSystemProvider): Sink[T, 
TestSubscriber.Probe[T]] =
+    new Sink(scaladsl.TestSink[T]()(system))
+
 }
diff --git 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
index 968c3f3e49..ac7a87df2e 100644
--- 
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
+++ 
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.stream.testkit.javadsl
 
+import scala.annotation.nowarn
+
 import org.apache.pekko
 import pekko.actor.ClassicActorSystemProvider
 import pekko.stream.javadsl.Source
@@ -22,9 +24,21 @@ import pekko.stream.testkit._
 object TestSource {
 
   /**
-   * A Source that materializes to a [[pekko.stream.testkit.TestPublisher]].
+   * A Source that materializes to a 
[[pekko.stream.testkit.TestPublisher.Probe]].
    */
   def create[T](system: ClassicActorSystemProvider): Source[T, 
TestPublisher.Probe[T]] =
     new Source(scaladsl.TestSource[T]()(system))
 
+  /**
+   * A Source that materializes to a 
[[pekko.stream.testkit.TestPublisher.Probe]].
+   *
+   * This overload accepts a `Class[T]` parameter to help Java's type 
inference,
+   * avoiding "unchecked call" compiler warnings when chaining probe methods.
+   *
+   * @since 2.0.0
+   */
+  @nowarn("cat=unused-params")
+  def create[T](clazz: Class[T], system: ClassicActorSystemProvider): 
Source[T, TestPublisher.Probe[T]] =
+    new Source(scaladsl.TestSource[T]()(system))
+
 }
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index 6956b23739..e3ac338bfd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -172,7 +172,7 @@ public class FlowTest extends StreamTest {
   public void mustBeAbleToUseGroupedAdjacentBy() {
     Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
         .groupedAdjacentBy(str -> str.charAt(0))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<List<String>>create(system), system)
         .request(4)
         .expectNext(Lists.newArrayList("Hello", "Hi"))
         .expectNext(Lists.newArrayList("Greetings"))
@@ -184,7 +184,7 @@ public class FlowTest extends StreamTest {
   public void mustBeAbleToUseGroupedAdjacentByWeighted() {
     Source.from(Arrays.asList("Hello", "HiHi", "Hi", "Hi", "Greetings", "Hey"))
         .groupedAdjacentByWeighted(str -> str.charAt(0), 4, str -> (long) 
str.length())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<List<String>>create(system), system)
         .request(6)
         .expectNext(Lists.newArrayList("Hello"))
         .expectNext(Lists.newArrayList("HiHi"))
@@ -200,7 +200,7 @@ public class FlowTest extends StreamTest {
     final Flow<Integer, String, NotUsed> flow = 
Flow.fromFunction(String::valueOf);
     source
         .via(flow.contramap(Integer::valueOf))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(3)
         .expectNext("1")
         .expectNext("2")
@@ -214,7 +214,7 @@ public class FlowTest extends StreamTest {
     final Flow<Integer, Integer, NotUsed> flow = 
Flow.<Integer>create().map(elem -> elem * 2);
     source
         .via(flow.dimap(Integer::valueOf, String::valueOf))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(3)
         .expectNext("2")
         .expectNext("4")
@@ -303,7 +303,7 @@ public class FlowTest extends StreamTest {
                       gate.set(false);
                       return Optional.of("end");
                     }))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(4)
         .expectNext("1", "2", "3", "end")
         .expectComplete();
@@ -1100,7 +1100,7 @@ public class FlowTest extends StreamTest {
             PFBuilder.<Integer, Integer>create()
                 .match(Integer.class, elem -> elem % 2 != 0, elem -> elem)
                 .build())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .ensureSubscription()
         .request(5)
         .expectNext(1, 3, 5)
@@ -1114,7 +1114,7 @@ public class FlowTest extends StreamTest {
             PFBuilder.<Integer, Integer>create()
                 .match(Integer.class, elem -> elem % 2 != 0, elem -> elem)
                 .build())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .ensureSubscription()
         .request(5)
         .expectNextN(Arrays.asList(1, 3, 5))
@@ -1133,7 +1133,7 @@ public class FlowTest extends StreamTest {
                     elem -> elem.isPresent() && (Integer) elem.get() % 2 == 0,
                     elem -> (Integer) elem.get())
                 .build())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .ensureSubscription()
         .request(4)
         .expectNext(2)
@@ -1383,7 +1383,7 @@ public class FlowTest extends StreamTest {
               }
             })
         .via(Flow.of(Integer.class).onErrorComplete())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1403,7 +1403,7 @@ public class FlowTest extends StreamTest {
                       }
                     })
                 .onErrorContinue(error -> logger().error(error, "Error 
occurred")))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1421,7 +1421,7 @@ public class FlowTest extends StreamTest {
               }
             })
         .via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0)))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)
@@ -1440,7 +1440,7 @@ public class FlowTest extends StreamTest {
               }
             })
         
.via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1462,7 +1462,7 @@ public class FlowTest extends StreamTest {
                 .onErrorContinue(
                     IllegalArgumentException.class,
                     error -> logger().error(error, "Error occurred")))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1482,7 +1482,7 @@ public class FlowTest extends StreamTest {
         .via(
             Flow.of(Integer.class)
                 .onErrorResume(IllegalArgumentException.class, e -> 
Source.single(0)))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)
@@ -1502,7 +1502,7 @@ public class FlowTest extends StreamTest {
               }
             })
         .via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1524,7 +1524,7 @@ public class FlowTest extends StreamTest {
             Flow.of(Integer.class)
                 .onErrorContinue(
                     TimeoutException.class, error -> logger().error(error, 
"Error occurred")))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1543,7 +1543,7 @@ public class FlowTest extends StreamTest {
               }
             })
         .via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> 
Source.single(0)))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1561,7 +1561,7 @@ public class FlowTest extends StreamTest {
               }
             })
         .via(Flow.of(Integer.class).onErrorComplete(ex -> 
ex.getMessage().contains("Boom")))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1583,7 +1583,7 @@ public class FlowTest extends StreamTest {
                 .onErrorContinue(
                     ex -> ex.getMessage().contains("Boom"),
                     error -> logger().error(error, "Error occurred")))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1603,7 +1603,7 @@ public class FlowTest extends StreamTest {
         .via(
             Flow.of(Integer.class)
                 .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> 
Source.single(0)))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)
@@ -1622,7 +1622,7 @@ public class FlowTest extends StreamTest {
 
     source
         .via(flow)
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(2)
         .expectNext(head)
         .expectError(boom);
@@ -1636,7 +1636,7 @@ public class FlowTest extends StreamTest {
             .mapError(NoSuchElementException.class, 
IllegalArgumentException::new);
 
     final Throwable actual =
-        source.via(flow).runWith(TestSink.create(system), 
system).request(1).expectError();
+        source.via(flow).runWith(TestSink.<Character>create(system), 
system).request(1).expectError();
     org.junit.Assert.assertTrue(actual instanceof IndexOutOfBoundsException);
   }
 
@@ -1652,7 +1652,7 @@ public class FlowTest extends StreamTest {
 
     source
         .via(flow)
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(2)
         .expectNext(head)
         .expectError(boom);
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java
index 711971353d..061919d14e 100644
--- 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java
+++ 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/RetryFlowTest.java
@@ -104,7 +104,7 @@ public class RetryFlowTest extends StreamTest {
     final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> 
probes =
         TestSource.<Integer>create(system)
             .via(retryFlow)
-            .toMat(TestSink.create(system), Keep.both())
+            .toMat(TestSink.<Integer>create(system), Keep.both())
             .run(system);
 
     final TestPublisher.Probe<Integer> source = probes.first();
@@ -164,7 +164,7 @@ public class RetryFlowTest extends StreamTest {
             TestSource.<Integer>create(system)
                 .map(i -> Pair.create(i, new SomeContext()))
                 .via(retryFlow)
-                .toMat(TestSink.create(system), Keep.both())
+                .toMat(TestSink.<Pair<Integer, SomeContext>>create(system), 
Keep.both())
                 .run(system);
 
     final TestPublisher.Probe<Integer> source = probes.first();
@@ -215,7 +215,7 @@ public class RetryFlowTest extends StreamTest {
             TestSource.<Integer>create(system)
                 .map(i -> Pair.create(i, i))
                 .via(retryFlow)
-                .toMat(TestSink.create(system), Keep.both())
+                .toMat(TestSink.<Pair<Try<Integer>, Integer>>create(system), 
Keep.both())
                 .run(system);
 
     final TestPublisher.Probe<Integer> source = probes.first();
@@ -266,7 +266,7 @@ public class RetryFlowTest extends StreamTest {
                           }
                           return Optional.empty();
                         }))
-                .toMat(TestSink.create(system), Keep.both())
+                .toMat(TestSink.<Pair<Try<Integer>, Integer>>create(system), 
Keep.both())
                 .run(system);
 
     final TestPublisher.Probe<Integer> source = probes.first();
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index c912e734aa..764831304f 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -144,8 +144,8 @@ public class SinkTest extends StreamTest {
 
   @Test
   public void mustBeAbleToUseCombineMat() {
-    final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 = 
TestSink.create(system);
-    final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 = 
TestSink.create(system);
+    final Sink<Integer, TestSubscriber.Probe<Integer>> sink1 = 
TestSink.<Integer>create(system);
+    final Sink<Integer, TestSubscriber.Probe<Integer>> sink2 = 
TestSink.<Integer>create(system);
     final Sink<Integer, Pair<TestSubscriber.Probe<Integer>, 
TestSubscriber.Probe<Integer>>> sink =
         Sink.combineMat(sink1, sink2, Broadcast::create, Keep.both());
 
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index dda5bb4bb7..fe829b5dad 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -124,7 +124,7 @@ public class SourceTest extends StreamTest {
   @Test
   public void mustBeAbleToCompleteWhenArrayIsEmpty() {
     Source.fromArray(new String[] {})
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Object>create(system), system)
         .ensureSubscription()
         .expectComplete();
   }
@@ -132,7 +132,7 @@ public class SourceTest extends StreamTest {
   @Test
   public void mustBeAbleToEmitEveryArrayElementSequentially() {
     Source.fromArray(new String[] {"a", "b", "c"})
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Object>create(system), system)
         .ensureSubscription()
         .request(3)
         .expectNext("a")
@@ -144,7 +144,7 @@ public class SourceTest extends StreamTest {
   @Test
   public void mustBeAbleToUseItems() {
     Source.items("a", "b", "c")
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .ensureSubscription()
         .request(3)
         .expectNext("a")
@@ -156,7 +156,7 @@ public class SourceTest extends StreamTest {
   @Test
   public void mustBeAbleToUseItemsWhenEmpty() {
     Source.<String>items()
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .ensureSubscription()
         .request(1)
         .expectComplete();
@@ -348,7 +348,7 @@ public class SourceTest extends StreamTest {
   public void mustBeAbleToConcatEmptySource() {
     Source.from(Arrays.asList("A", "B", "C"))
         .concat(Source.empty())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .ensureSubscription()
         .request(3)
         .expectNext("A", "B", "C")
@@ -361,7 +361,7 @@ public class SourceTest extends StreamTest {
     final Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(4, 5, 
6));
     final Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(7, 8, 
9));
     final TestSubscriber.Probe<Integer> sub =
-        sourceA.concatAllLazy(sourceB, 
sourceC).runWith(TestSink.create(system), system);
+        sourceA.concatAllLazy(sourceB, 
sourceC).runWith(TestSink.<Integer>create(system), system);
     sub.expectSubscription().request(9);
     sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete();
   }
@@ -862,7 +862,7 @@ public class SourceTest extends StreamTest {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("unchecked") // generic array creation for varargs 
expectNext(List<Integer>...)
   public void mustBeAbleToUseAggregateWithBoundary() {
     final java.lang.Iterable<Integer> input = Arrays.asList(1, 1, 2, 3, 3, 4, 
5, 5, 6);
     // used to implement grouped(2)
@@ -880,7 +880,7 @@ public class SourceTest extends StreamTest {
             },
             Function.identity(),
             Optional.empty())
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<List<Integer>>create(system), system)
         .ensureSubscription()
         .request(6)
         .expectNext(
@@ -928,7 +928,7 @@ public class SourceTest extends StreamTest {
               gate.set(false);
               return Optional.of("end");
             })
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .request(4)
         .expectNext("1", "2", "3", "end")
         .expectComplete();
@@ -1013,7 +1013,7 @@ public class SourceTest extends StreamTest {
     final TestSubscriber.Probe<Integer> sub =
         sourceA
             .interleaveAll(Arrays.asList(sourceB, sourceC), 2, false)
-            .runWith(TestSink.create(system), system);
+            .runWith(TestSink.<Integer>create(system), system);
     sub.expectSubscription().request(9);
     sub.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9).expectComplete();
   }
@@ -1361,7 +1361,7 @@ public class SourceTest extends StreamTest {
     final TestSubscriber.Probe<Integer> sub =
         sourceA
             .mergeAll(Arrays.asList(sourceB, sourceC), false)
-            .runWith(TestSink.create(system), system);
+            .runWith(TestSink.<Integer>create(system), system);
     sub.expectSubscription().request(9);
     sub.expectNextUnorderedN(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 
9)).expectComplete();
   }
@@ -1677,7 +1677,7 @@ public class SourceTest extends StreamTest {
               queue.offer("Message2");
               queue.complete();
             })
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<String>create(system), system)
         .ensureSubscription()
         .request(2)
         .expectNext("Message1", "Message2")
@@ -1696,7 +1696,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorComplete()
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1714,7 +1714,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorContinue(e -> logger().error(e, "Error encountered"))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1732,7 +1732,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorResume(e -> Source.single(0))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)
@@ -1751,7 +1751,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorComplete(IllegalArgumentException.class)
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1770,7 +1770,7 @@ public class SourceTest extends StreamTest {
             })
         .onErrorContinue(
             IllegalArgumentException.class, e -> logger().error(e, "Error 
encountered"))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1788,7 +1788,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorResume(IllegalArgumentException.class, e -> Source.single(0))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)
@@ -1808,7 +1808,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorComplete(TimeoutException.class)
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1827,7 +1827,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorContinue(TimeoutException.class, e -> logger().error(e, "Error 
encountered"))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1846,7 +1846,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorResume(TimeoutException.class, e -> Source.single(0))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectError(ex);
@@ -1864,7 +1864,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorComplete(ex -> ex.getMessage().contains("Boom"))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1883,7 +1883,7 @@ public class SourceTest extends StreamTest {
             })
         .onErrorContinue(
             ex -> ex.getMessage().contains("Boom"), e -> logger().error(e, 
"Error encountered"))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectComplete();
@@ -1901,7 +1901,7 @@ public class SourceTest extends StreamTest {
               }
             })
         .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> 
Source.single(0))
-        .runWith(TestSink.create(system), system)
+        .runWith(TestSink.<Integer>create(system), system)
         .request(2)
         .expectNext(1)
         .expectNext(0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to