This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 2b0f6b8214 add jspecify nullable annotations on streams Java DSL
(#2617)
2b0f6b8214 is described below
commit 2b0f6b82143abf62557a2ad0e362b495f2b11830
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Apr 8 16:19:31 2026 +0200
add jspecify nullable annotations on streams Java DSL (#2617)
* add jspecify nullable annotations on streams Java DSL
* Update package-info.java
* stream-typed
* testkit
---
build.sbt | 1 +
project/Dependencies.scala | 7 +++++-
.../pekko/stream/testkit/javadsl/package-info.java | 25 ++++++++++++++++++++++
.../pekko/stream/typed/javadsl/package-info.java | 25 ++++++++++++++++++++++
.../apache/pekko/stream/javadsl/package-info.java | 25 ++++++++++++++++++++++
.../org/apache/pekko/stream/javadsl/Flow.scala | 14 ++++++------
.../pekko/stream/javadsl/FlowWithContext.scala | 10 +++++----
.../org/apache/pekko/stream/javadsl/Sink.scala | 5 +++--
.../org/apache/pekko/stream/javadsl/Source.scala | 23 ++++++++++----------
.../pekko/stream/javadsl/SourceWithContext.scala | 11 ++++++----
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 14 ++++++------
.../apache/pekko/stream/javadsl/SubSource.scala | 14 ++++++------
12 files changed, 134 insertions(+), 40 deletions(-)
diff --git a/build.sbt b/build.sbt
index 90a5a58ecf..581ad37e06 100644
--- a/build.sbt
+++ b/build.sbt
@@ -573,6 +573,7 @@ lazy val streamTyped = pekkoModule("stream-typed")
streamTestkit % "test->test",
actorTestkitTyped % "test->test",
actorTypedTests % "test->test")
+ .settings(Dependencies.streamTyped)
.settings(AutomaticModuleName.settings("pekko.stream.typed"))
.enablePlugins(ScaladocNoVerificationOfDiagrams)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index e2cc78ddee..bfe379ce50 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -109,6 +109,8 @@ object Dependencies {
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
+ val jspecify = "org.jspecify" % "jspecify" % "1.0.0" % Optional
+
object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % Test
val gson = "com.google.code.gson" % "gson" % "2.13.2" % Test
@@ -362,9 +364,12 @@ object Dependencies {
// pekko stream
- lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams,
TestDependencies.scalatest)
+ lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, jspecify,
TestDependencies.scalatest)
+
+ lazy val streamTyped = l ++= Seq[sbt.ModuleID](jspecify)
lazy val streamTestkit = l ++= Seq(
+ jspecify,
TestDependencies.scalatest,
TestDependencies.scalatestScalaCheck,
TestDependencies.junit)
diff --git
a/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
b/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
new file mode 100644
index 0000000000..0694b223ab
--- /dev/null
+++
b/stream-testkit/src/main/java/org/apache/pekko/stream/testkit/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams TestKit.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams TestKit. For the
Scala DSL see
+ * [[org.apache.pekko.stream.testkit.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.testkit.javadsl;
diff --git
a/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
b/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
new file mode 100644
index 0000000000..019aa063c8
--- /dev/null
+++
b/stream-typed/src/main/java/org/apache/pekko/stream/typed/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams Typed.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams Typed. For the
Scala DSL see
+ * [[org.apache.pekko.stream.typed.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.typed.javadsl;
diff --git
a/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java
b/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java
new file mode 100644
index 0000000000..e2aeae2c42
--- /dev/null
+++ b/stream/src/main/java/org/apache/pekko/stream/javadsl/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Java API for Pekko Streams.
+ *
+ * <p>This package contains the Java DSL for Pekko Streams. For the Scala DSL
see
+ * [[org.apache.pekko.stream.scaladsl]].
+ */
[email protected]
+package org.apache.pekko.stream.javadsl;
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1622dad4c6..eb5d4c9f34 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -43,6 +43,7 @@ import pekko.stream.impl.fusing.{ StatefulMapConcat,
ZipWithIndexJava }
import pekko.util.ConstantFun
import pekko.util.Timeout
+import org.jspecify.annotations.Nullable
import org.reactivestreams.Processor
object Flow {
@@ -3496,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
import pekko.util.Collections._
@@ -3580,7 +3581,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def mergeAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4322,7 +4323,8 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, extract: function.Function[Out, Any], log:
LoggingAdapter): javadsl.Flow[In, Out, Mat] =
+ def log(name: String, extract: function.Function[Out, Any], @Nullable log:
LoggingAdapter)
+ : javadsl.Flow[In, Out, Mat] =
new Flow(delegate.log(name, e => extract.apply(e))(log))
/**
@@ -4363,7 +4365,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
+ def log(name: String, @Nullable log: LoggingAdapter): javadsl.Flow[In, Out,
Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -4410,7 +4412,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.logWithMarker(name, e => marker.apply(e), e =>
extract.apply(e))(log))
/**
@@ -4457,7 +4459,7 @@ final class Flow[In, Out, Mat](delegate:
scaladsl.Flow[In, Out, Mat]) extends Gr
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
- log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
index 2aeb298b16..7d3f74595a 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala
@@ -29,6 +29,8 @@ import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.util.ConstantFun
+import org.jspecify.annotations.Nullable
+
object FlowWithContext {
def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, pekko.NotUsed] =
@@ -304,7 +306,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def log(
name: String,
extract: function.Function[Out, Any],
- log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
+ @Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut,
Mat] =
viaScala(_.log(name, e => extract.apply(e))(log))
/**
@@ -320,7 +322,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
*
* @see [[pekko.stream.javadsl.Flow.log]]
*/
- def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out,
CtxOut, Mat] =
+ def log(name: String, @Nullable log: LoggingAdapter): FlowWithContext[In,
CtxIn, Out, CtxOut, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -340,7 +342,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
name: String,
marker: function.Function2[Out, CtxOut, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat]
=
+ @Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out,
CtxOut, Mat] =
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e =>
extract.apply(e))(log))
/**
@@ -362,7 +364,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def logWithMarker(
name: String,
marker: function.Function2[Out, CtxOut, LogMarker],
- log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat]
=
+ @Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out,
CtxOut, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 2b23421f8c..4530ca26e7 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -34,6 +34,7 @@ import pekko.stream.impl.LinearTraversalBuilder
import pekko.stream.scaladsl.SinkToCompletionStage
import pekko.util.ConstantFun.scalaAnyToUnit
+import org.jspecify.annotations.Nullable
import org.reactivestreams.{ Publisher, Subscriber }
/** Java API */
@@ -435,7 +436,7 @@ object Sink {
def combine[T, U](
output1: Sink[U, _],
output2: Sink[U, _],
- rest: java.util.List[Sink[U, _]],
+ @Nullable rest: java.util.List[Sink[U, _]],
fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, NotUsed] = {
import scala.jdk.CollectionConverters._
@@ -462,7 +463,7 @@ object Sink {
* @since 1.1.0
*/
def combine[T, U, M](
- sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
+ @Nullable sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer,
Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
import pekko.util.Collections._
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index fe5b1f5ef0..c15979af8e 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -41,6 +41,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util._
+import org.jspecify.annotations.Nullable
import org.reactivestreams.{ Publisher, Subscriber }
/** Java API */
@@ -537,7 +538,7 @@ object Source {
def combine[T, U](
first: Source[T, _ <: Any],
second: Source[T, _ <: Any],
- rest: java.util.List[Source[T, _ <: Any]],
+ @Nullable rest: java.util.List[Source[T, _ <: Any]],
fanInStrategy: function.Function[java.lang.Integer, _ <:
Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
import scala.jdk.CollectionConverters._
@@ -563,7 +564,7 @@ object Source {
* @since 1.1.0
*/
def combine[T, U, M](
- sources: java.util.List[_ <: Graph[SourceShape[T], M]],
+ @Nullable sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer,
Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
import pekko.util.Collections._
@@ -578,7 +579,7 @@ object Source {
/**
* Combine the elements of multiple streams into a stream of lists.
*/
- def zipN[T](sources: java.util.List[Source[T, _ <: Any]]):
Source[java.util.List[T], NotUsed] = {
+ def zipN[T](@Nullable sources: java.util.List[Source[T, _ <: Any]]):
Source[java.util.List[T], NotUsed] = {
import scala.jdk.CollectionConverters._
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else
immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
@@ -589,7 +590,7 @@ object Source {
*/
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
- sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
+ @Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[O,
NotUsed] = {
import scala.jdk.CollectionConverters._
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else
immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq =>
zipper.apply(seq.asJava))(seq))
@@ -837,7 +838,7 @@ object Source {
* '''Cancels when''' downstream cancels
*/
def mergePrioritizedN[T](
- sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any],
java.lang.Integer]],
+ @Nullable sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any],
java.lang.Integer]],
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
import scala.jdk.CollectionConverters._
val seq =
@@ -1618,7 +1619,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
import pekko.util.Collections._
@@ -1700,7 +1701,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
*/
def mergeAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4805,7 +4806,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, extract: function.Function[Out, Any], log:
LoggingAdapter): javadsl.Source[Out, Mat] =
+ def log(name: String, extract: function.Function[Out, Any], @Nullable log:
LoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.log(name, e => extract.apply(e))(log))
/**
@@ -4846,7 +4847,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
+ def log(name: String, @Nullable log: LoggingAdapter): javadsl.Source[Out,
Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -4893,7 +4894,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.logWithMarker(name, e => marker.apply(e), e =>
extract.apply(e))(log))
/**
@@ -4940,7 +4941,7 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
- log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
index 2dca6c6efc..e46e591a3e 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala
@@ -31,6 +31,8 @@ import pekko.japi.function
import pekko.stream._
import pekko.util.ConstantFun
+import org.jspecify.annotations.Nullable
+
object SourceWithContext {
/**
@@ -290,7 +292,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate:
scaladsl.SourceWithCon
*
* @see [[pekko.stream.javadsl.Source.log]]
*/
- def log(name: String, extract: function.Function[Out, Any], log:
LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+ def log(name: String, extract: function.Function[Out, Any], @Nullable log:
LoggingAdapter)
+ : SourceWithContext[Out, Ctx, Mat] =
viaScala(_.log(name, e => extract.apply(e))(log))
/**
@@ -306,7 +309,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate:
scaladsl.SourceWithCon
*
* @see [[pekko.stream.javadsl.Flow.log]]
*/
- def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat]
=
+ def log(name: String, @Nullable log: LoggingAdapter): SourceWithContext[Out,
Ctx, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -326,7 +329,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate:
scaladsl.SourceWithCon
name: String,
marker: function.Function2[Out, Ctx, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e =>
extract.apply(e))(log))
/**
@@ -348,7 +351,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate:
scaladsl.SourceWithCon
def logWithMarker(
name: String,
marker: function.Function2[Out, Ctx, LogMarker],
- log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 14f525bb70..61a32a81fd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
+import org.jspecify.annotations.Nullable
+
object SubFlow {
/**
@@ -2368,7 +2370,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
*/
def mergeAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -2426,7 +2428,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
import pekko.util.Collections._
@@ -2918,7 +2920,7 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, extract: function.Function[Out, Any], log:
LoggingAdapter): SubFlow[In, Out, Mat] =
+ def log(name: String, extract: function.Function[Out, Any], @Nullable log:
LoggingAdapter): SubFlow[In, Out, Mat] =
new SubFlow(delegate.log(name, e => extract.apply(e))(log))
/**
@@ -2959,7 +2961,7 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, log: LoggingAdapter): SubFlow[In, Out, Mat] =
+ def log(name: String, @Nullable log: LoggingAdapter): SubFlow[In, Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -3006,7 +3008,7 @@ final class SubFlow[In, Out, Mat](
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
new SubFlow(delegate.logWithMarker(name, e => marker.apply(e), e =>
extract.apply(e))(log))
/**
@@ -3053,7 +3055,7 @@ final class SubFlow[In, Out, Mat](
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
- log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SubFlow[In, Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index 7edb90afd9..0466e999e4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -35,6 +35,8 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
+import org.jspecify.annotations.Nullable
+
/**
* * Upcast a stream of elements to a stream of supertypes of that element.
Useful in combination with
* fan-in operators where you do not want to pay the cost of casting each
element in a `map`.
@@ -2334,7 +2336,7 @@ final class SubSource[Out, Mat](
* '''Cancels when''' downstream cancels
*/
def mergeAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubSource[Out, Mat] = {
import pekko.util.Collections._
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -2393,7 +2395,7 @@ final class SubSource[Out, Mat](
* '''Cancels when''' downstream cancels
*/
def interleaveAll(
- those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
+ @Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubSource[Out, Mat] = {
import pekko.util.Collections._
@@ -2885,7 +2887,7 @@ final class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, extract: function.Function[Out, Any], log:
LoggingAdapter): SubSource[Out, Mat] =
+ def log(name: String, extract: function.Function[Out, Any], @Nullable log:
LoggingAdapter): SubSource[Out, Mat] =
new SubSource(delegate.log(name, e => extract.apply(e))(log))
/**
@@ -2926,7 +2928,7 @@ final class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
- def log(name: String, log: LoggingAdapter): SubSource[Out, Mat] =
+ def log(name: String, @Nullable log: LoggingAdapter): SubSource[Out, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
/**
@@ -2973,7 +2975,7 @@ final class SubSource[Out, Mat](
name: String,
marker: function.Function[Out, LogMarker],
extract: function.Function[Out, Any],
- log: MarkerLoggingAdapter): SubSource[Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SubSource[Out, Mat] =
new SubSource(delegate.logWithMarker(name, e => marker.apply(e), e =>
extract.apply(e))(log))
/**
@@ -3020,7 +3022,7 @@ final class SubSource[Out, Mat](
def logWithMarker(
name: String,
marker: function.Function[Out, LogMarker],
- log: MarkerLoggingAdapter): SubSource[Out, Mat] =
+ @Nullable log: MarkerLoggingAdapter): SubSource[Out, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out],
log)
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]