This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b0f6b8214 add jspecify nullable annotations on streams Java DSL 
(#2617)
2b0f6b8214 is described below

commit 2b0f6b82143abf62557a2ad0e362b495f2b11830
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Apr 8 16:19:31 2026 +0200

    add jspecify nullable annotations on streams Java DSL (#2617)
    
    * add jspecify nullable annotations on streams Java DSL
    
    * Update package-info.java
    
    * stream-typed
    
    * testkit
---
 build.sbt                                          |  1 +
 project/Dependencies.scala                         |  7 +++++-
 .../pekko/stream/testkit/javadsl/package-info.java | 25 ++++++++++++++++++++++
 .../pekko/stream/typed/javadsl/package-info.java   | 25 ++++++++++++++++++++++
 .../apache/pekko/stream/javadsl/package-info.java  | 25 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 14 ++++++------
 .../pekko/stream/javadsl/FlowWithContext.scala     | 10 +++++----
 .../org/apache/pekko/stream/javadsl/Sink.scala     |  5 +++--
 .../org/apache/pekko/stream/javadsl/Source.scala   | 23 ++++++++++----------
 .../pekko/stream/javadsl/SourceWithContext.scala   | 11 ++++++----
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 14 ++++++------
 .../apache/pekko/stream/javadsl/SubSource.scala    | 14 ++++++------
 12 files changed, 134 insertions(+), 40 deletions(-)

diff --git a/build.sbt b/build.sbt
index 90a5a58ecf..581ad37e06 100644
--- a/build.sbt
+++ b/build.sbt
@@ -573,6 +573,7 @@ lazy val streamTyped = pekkoModule("stream-typed")
     streamTestkit % "test->test",
     actorTestkitTyped % "test->test",
     actorTypedTests % "test->test")
+  .settings(Dependencies.streamTyped)
   .settings(AutomaticModuleName.settings("pekko.stream.typed"))
   .enablePlugins(ScaladocNoVerificationOfDiagrams)
 
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index e2cc78ddee..bfe379ce50 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -109,6 +109,8 @@ object Dependencies {
 
     val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
 
+    val jspecify = "org.jspecify" % "jspecify" % "1.0.0" % Optional
+
     object Docs {
       val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % Test
       val gson = "com.google.code.gson" % "gson" % "2.13.2" % Test
@@ -362,9 +364,12 @@ object Dependencies {
 
   // pekko stream
 
-  lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, 
TestDependencies.scalatest)
+  lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, jspecify, 
TestDependencies.scalatest)
+
+  lazy val streamTyped = l ++= Seq[sbt.ModuleID](jspecify)
 
   lazy val streamTestkit = l ++= Seq(
+    jspecify,
     TestDependencies.scalatest,
     TestDependencies.scalatestScalaCheck,
     TestDependencies.junit)
diff --git 
a/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
 
b/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
new file mode 100644
index 0000000000..0694b223ab
--- /dev/null
+++ 
b/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams TestKit.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams TestKit. For the 
Scala DSL see
+ * [[org.apache.pekko.stream.testkit.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.testkit.javadsl;
diff --git 
a/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
 
b/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
new file mode 100644
index 0000000000..019aa063c8
--- /dev/null
+++ 
b/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams Typed.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams Typed. For the 
Scala DSL see
+ * [[org.apache.pekko.stream.typed.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.typed.javadsl;
diff --git 
a/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java 
b/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java
new file mode 100644
index 0000000000..e2aeae2c42
--- /dev/null
+++ b/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams. For the Scala DSL 
see
+ * [[org.apache.pekko.stream.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.javadsl;
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1622dad4c6..eb5d4c9f34 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -43,6 +43,7 @@ import pekko.stream.impl.fusing.{ StatefulMapConcat, 
ZipWithIndexJava }
 import pekko.util.ConstantFun
 import pekko.util.Timeout
 
+import org.jspecify.annotations.Nullable
 import org.reactivestreams.Processor
 
 object Flow {
@@ -3496,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * '''Cancels when''' downstream cancels
    */
   def interleaveAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       segmentSize: Int,
       eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
     import pekko.util.Collections._
@@ -3580,7 +3581,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    * '''Cancels when''' downstream cancels
    */
   def mergeAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
     import pekko.util.Collections._
     val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4322,7 +4323,8 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, extract: function.Function[Out, Any], log: 
LoggingAdapter): javadsl.Flow[In, Out, Mat] =
+  def log(name: String, extract: function.Function[Out, Any], @Nullable log: 
LoggingAdapter)
+      : javadsl.Flow[In, Out, Mat] =
     new Flow(delegate.log(name, e => extract.apply(e))(log))
 
   /**
@@ -4363,7 +4365,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
+  def log(name: String, @Nullable log: LoggingAdapter): javadsl.Flow[In, Out, 
Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -4410,7 +4412,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
       name: String,
       marker: function.Function[Out, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
     new Flow(delegate.logWithMarker(name, e => marker.apply(e), e => 
extract.apply(e))(log))
 
   /**
@@ -4457,7 +4459,7 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
   def logWithMarker(
       name: String,
       marker: function.Function[Out, LogMarker],
-      log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
index 2aeb298b16..7d3f74595a 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
@@ -29,6 +29,8 @@ import pekko.japi.{ function, Pair }
 import pekko.stream._
 import pekko.util.ConstantFun
 
+import org.jspecify.annotations.Nullable
+
 object FlowWithContext {
 
   def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, pekko.NotUsed] =
@@ -304,7 +306,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
   def log(
       name: String,
       extract: function.Function[Out, Any],
-      log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
+      @Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, 
Mat] =
     viaScala(_.log(name, e => extract.apply(e))(log))
 
   /**
@@ -320,7 +322,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
    *
    * @see [[pekko.stream.javadsl.Flow.log]]
    */
-  def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, 
CtxOut, Mat] =
+  def log(name: String, @Nullable log: LoggingAdapter): FlowWithContext[In, 
CtxIn, Out, CtxOut, Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -340,7 +342,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
       name: String,
       marker: function.Function2[Out, CtxOut, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] 
=
+      @Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, 
CtxOut, Mat] =
     viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => 
extract.apply(e))(log))
 
   /**
@@ -362,7 +364,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
   def logWithMarker(
       name: String,
       marker: function.Function2[Out, CtxOut, LogMarker],
-      log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] 
=
+      @Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, 
CtxOut, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 2b23421f8c..4530ca26e7 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -34,6 +34,7 @@ import pekko.stream.impl.LinearTraversalBuilder
 import pekko.stream.scaladsl.SinkToCompletionStage
 import pekko.util.ConstantFun.scalaAnyToUnit
 
+import org.jspecify.annotations.Nullable
 import org.reactivestreams.{ Publisher, Subscriber }
 
 /** Java API */
@@ -435,7 +436,7 @@ object Sink {
   def combine[T, U](
       output1: Sink[U, _],
       output2: Sink[U, _],
-      rest: java.util.List[Sink[U, _]],
+      @Nullable rest: java.util.List[Sink[U, _]],
       fanOutStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]])
       : Sink[T, NotUsed] = {
     import scala.jdk.CollectionConverters._
@@ -462,7 +463,7 @@ object Sink {
    * @since 1.1.0
    */
   def combine[T, U, M](
-      sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
+      @Nullable sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
       fanOutStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanOutShape[T, U], NotUsed]])
       : Sink[T, java.util.List[M]] = {
     import pekko.util.Collections._
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index fe5b1f5ef0..c15979af8e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -41,6 +41,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util._
 
+import org.jspecify.annotations.Nullable
 import org.reactivestreams.{ Publisher, Subscriber }
 
 /** Java API */
@@ -537,7 +538,7 @@ object Source {
   def combine[T, U](
       first: Source[T, _ <: Any],
       second: Source[T, _ <: Any],
-      rest: java.util.List[Source[T, _ <: Any]],
+      @Nullable rest: java.util.List[Source[T, _ <: Any]],
       fanInStrategy: function.Function[java.lang.Integer, _ <: 
Graph[UniformFanInShape[T, U], NotUsed]])
       : Source[U, NotUsed] = {
     import scala.jdk.CollectionConverters._
@@ -563,7 +564,7 @@ object Source {
    * @since 1.1.0
    */
   def combine[T, U, M](
-      sources: java.util.List[_ <: Graph[SourceShape[T], M]],
+      @Nullable sources: java.util.List[_ <: Graph[SourceShape[T], M]],
       fanInStrategy: function.Function[java.lang.Integer, 
Graph[UniformFanInShape[T, U], NotUsed]])
       : Source[U, java.util.List[M]] = {
     import pekko.util.Collections._
@@ -578,7 +579,7 @@ object Source {
   /**
    * Combine the elements of multiple streams into a stream of lists.
    */
-  def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): 
Source[java.util.List[T], NotUsed] = {
+  def zipN[T](@Nullable sources: java.util.List[Source[T, _ <: Any]]): 
Source[java.util.List[T], NotUsed] = {
     import scala.jdk.CollectionConverters._
     val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else 
immutable.Seq()
     new Source(scaladsl.Source.zipN(seq).map(_.asJava))
@@ -589,7 +590,7 @@ object Source {
    */
   def zipWithN[T, O](
       zipper: function.Function[java.util.List[T], O],
-      sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
+      @Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[O, 
NotUsed] = {
     import scala.jdk.CollectionConverters._
     val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else 
immutable.Seq()
     new Source(scaladsl.Source.zipWithN[T, O](seq => 
zipper.apply(seq.asJava))(seq))
@@ -837,7 +838,7 @@ object Source {
    * '''Cancels when''' downstream cancels
    */
   def mergePrioritizedN[T](
-      sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], 
java.lang.Integer]],
+      @Nullable sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], 
java.lang.Integer]],
       eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
     import scala.jdk.CollectionConverters._
     val seq =
@@ -1618,7 +1619,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * '''Cancels when''' downstream cancels
    */
   def interleaveAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       segmentSize: Int,
       eagerClose: Boolean): javadsl.Source[Out, Mat] = {
     import pekko.util.Collections._
@@ -1700,7 +1701,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * '''Cancels when''' downstream cancels
    */
   def mergeAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
     import pekko.util.Collections._
     val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4805,7 +4806,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, extract: function.Function[Out, Any], log: 
LoggingAdapter): javadsl.Source[Out, Mat] =
+  def log(name: String, extract: function.Function[Out, Any], @Nullable log: 
LoggingAdapter): javadsl.Source[Out, Mat] =
     new Source(delegate.log(name, e => extract.apply(e))(log))
 
   /**
@@ -4846,7 +4847,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
+  def log(name: String, @Nullable log: LoggingAdapter): javadsl.Source[Out, 
Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -4893,7 +4894,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
       name: String,
       marker: function.Function[Out, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
     new Source(delegate.logWithMarker(name, e => marker.apply(e), e => 
extract.apply(e))(log))
 
   /**
@@ -4940,7 +4941,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   def logWithMarker(
       name: String,
       marker: function.Function[Out, LogMarker],
-      log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
index 2dca6c6efc..e46e591a3e 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
@@ -31,6 +31,8 @@ import pekko.japi.function
 import pekko.stream._
 import pekko.util.ConstantFun
 
+import org.jspecify.annotations.Nullable
+
 object SourceWithContext {
 
   /**
@@ -290,7 +292,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: 
scaladsl.SourceWithCon
    *
    * @see [[pekko.stream.javadsl.Source.log]]
    */
-  def log(name: String, extract: function.Function[Out, Any], log: 
LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+  def log(name: String, extract: function.Function[Out, Any], @Nullable log: 
LoggingAdapter)
+      : SourceWithContext[Out, Ctx, Mat] =
     viaScala(_.log(name, e => extract.apply(e))(log))
 
   /**
@@ -306,7 +309,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: 
scaladsl.SourceWithCon
    *
    * @see [[pekko.stream.javadsl.Flow.log]]
    */
-  def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] 
=
+  def log(name: String, @Nullable log: LoggingAdapter): SourceWithContext[Out, 
Ctx, Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -326,7 +329,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: 
scaladsl.SourceWithCon
       name: String,
       marker: function.Function2[Out, Ctx, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
     viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => 
extract.apply(e))(log))
 
   /**
@@ -348,7 +351,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: 
scaladsl.SourceWithCon
   def logWithMarker(
       name: String,
       marker: function.Function2[Out, Ctx, LogMarker],
-      log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 14f525bb70..61a32a81fd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.ConstantFun
 
+import org.jspecify.annotations.Nullable
+
 object SubFlow {
 
   /**
@@ -2368,7 +2370,7 @@ final class SubFlow[In, Out, Mat](
    * '''Cancels when''' downstream cancels
    */
   def mergeAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
     import pekko.util.Collections._
     val seq = if (those ne null) those.collectToImmutableSeq {
@@ -2426,7 +2428,7 @@ final class SubFlow[In, Out, Mat](
    * '''Cancels when''' downstream cancels
    */
   def interleaveAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       segmentSize: Int,
       eagerClose: Boolean): SubFlow[In, Out, Mat] = {
     import pekko.util.Collections._
@@ -2918,7 +2920,7 @@ final class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, extract: function.Function[Out, Any], log: 
LoggingAdapter): SubFlow[In, Out, Mat] =
+  def log(name: String, extract: function.Function[Out, Any], @Nullable log: 
LoggingAdapter): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.log(name, e => extract.apply(e))(log))
 
   /**
@@ -2959,7 +2961,7 @@ final class SubFlow[In, Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, log: LoggingAdapter): SubFlow[In, Out, Mat] =
+  def log(name: String, @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -3006,7 +3008,7 @@ final class SubFlow[In, Out, Mat](
       name: String,
       marker: function.Function[Out, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.logWithMarker(name, e => marker.apply(e), e => 
extract.apply(e))(log))
 
   /**
@@ -3053,7 +3055,7 @@ final class SubFlow[In, Out, Mat](
   def logWithMarker(
       name: String,
       marker: function.Function[Out, LogMarker],
-      log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 7edb90afd9..0466e999e4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.ConstantFun
 
+import org.jspecify.annotations.Nullable
+
 /**
  * * Upcast a stream of elements to a stream of supertypes of that element. 
Useful in combination with
  * fan-in operators where you do not want to pay the cost of casting each 
element in a `map`.
@@ -2334,7 +2336,7 @@ final class SubSource[Out, Mat](
    * '''Cancels when''' downstream cancels
    */
   def mergeAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       eagerComplete: Boolean): SubSource[Out, Mat] = {
     import pekko.util.Collections._
     val seq = if (those ne null) those.collectToImmutableSeq {
@@ -2393,7 +2395,7 @@ final class SubSource[Out, Mat](
    * '''Cancels when''' downstream cancels
    */
   def interleaveAll(
-      those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+      @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
       segmentSize: Int,
       eagerClose: Boolean): SubSource[Out, Mat] = {
     import pekko.util.Collections._
@@ -2885,7 +2887,7 @@ final class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, extract: function.Function[Out, Any], log: 
LoggingAdapter): SubSource[Out, Mat] =
+  def log(name: String, extract: function.Function[Out, Any], @Nullable log: 
LoggingAdapter): SubSource[Out, Mat] =
     new SubSource(delegate.log(name, e => extract.apply(e))(log))
 
   /**
@@ -2926,7 +2928,7 @@ final class SubSource[Out, Mat](
    *
    * '''Cancels when''' downstream cancels
    */
-  def log(name: String, log: LoggingAdapter): SubSource[Out, Mat] =
+  def log(name: String, @Nullable log: LoggingAdapter): SubSource[Out, Mat] =
     this.log(name, ConstantFun.javaIdentityFunction[Out], log)
 
   /**
@@ -2973,7 +2975,7 @@ final class SubSource[Out, Mat](
       name: String,
       marker: function.Function[Out, LogMarker],
       extract: function.Function[Out, Any],
-      log: MarkerLoggingAdapter): SubSource[Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SubSource[Out, Mat] =
     new SubSource(delegate.logWithMarker(name, e => marker.apply(e), e => 
extract.apply(e))(log))
 
   /**
@@ -3020,7 +3022,7 @@ final class SubSource[Out, Mat](
   def logWithMarker(
       name: String,
       marker: function.Function[Out, LogMarker],
-      log: MarkerLoggingAdapter): SubSource[Out, Mat] =
+      @Nullable log: MarkerLoggingAdapter): SubSource[Out, Mat] =
     this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], 
log)
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to