This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 47d64b4a6c rework reactivestreams docs (#2468) (#2470)
47d64b4a6c is described below
commit 47d64b4a6cea46d7f93dbba9e397a28a891c1473
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Nov 10 19:46:31 2025 +0100
rework reactivestreams docs (#2468) (#2470)
* rework reactivestreams docs
* tidy up
* link issues
add back operators
---
.../paradox/stream/operators/Sink/asPublisher.md | 3 ++-
.../stream/operators/Sink/fromSubscriber.md | 4 ++-
.../stream/operators/Source/asSubscriber.md | 29 ++++++++++------------
.../stream/operators/Source/fromPublisher.md | 25 ++++++++-----------
docs/src/main/paradox/stream/operators/index.md | 4 +--
5 files changed, 31 insertions(+), 34 deletions(-)
diff --git a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md
b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md
index 49be6139f9..01759996ce 100644
--- a/docs/src/main/paradox/stream/operators/Sink/asPublisher.md
+++ b/docs/src/main/paradox/stream/operators/Sink/asPublisher.md
@@ -15,8 +15,9 @@ Integration with Reactive Streams, materializes into a
`org.reactivestreams.Publ
This method gives you the capability to publish the data from the `Sink`
through a Reactive Streams
[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html).
Generally, in Pekko Streams a `Sink` is considered a subscriber, which
consumes the data from source. To integrate with other Reactive Stream
implementations `Sink.asPublisher` provides a `Publisher` materialized value
when run.
Now, the data from this publisher can be consumed by subscribing to it. We can
control if we allow more than one downstream subscriber from the single running
Pekko stream through the `fanout` parameter.
+
In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is
available through
[Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html).
-Since those APIs are identical but exist at different package namespaces and
does not depend on the Reactive Streams package a separate publisher sink for
those is available
+Since those APIs are identical but exist at different package namespaces and
does not depend on the Reactive Streams package a separate API for those is
available
through
@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#asPublisher`].
diff --git a/docs/src/main/paradox/stream/operators/Sink/fromSubscriber.md
b/docs/src/main/paradox/stream/operators/Sink/fromSubscriber.md
index 637b0d4e27..bc93195abb 100644
--- a/docs/src/main/paradox/stream/operators/Sink/fromSubscriber.md
+++ b/docs/src/main/paradox/stream/operators/Sink/fromSubscriber.md
@@ -11,4 +11,6 @@ Integration with Reactive Streams, wraps a
`org.reactivestreams.Subscriber` as a
## Description
-TODO: We would welcome help on contributing descriptions and examples, see:
https://github.com/akka/akka/issues/25646
+In Java 9, the Reactive Stream API was included in the JDK, and `Subscriber`
is available through
[Flow.Subscriber](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscriber.html).
+Since those APIs are identical but exist at different package namespaces and
does not depend on the Reactive Streams package a separate API for those is
available
+through
@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Sink#fromSubscriber`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Sink#fromSubscriber`].
diff --git a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md
b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md
index 0b07e72447..c0cf7d2d7c 100644
--- a/docs/src/main/paradox/stream/operators/Source/asSubscriber.md
+++ b/docs/src/main/paradox/stream/operators/Source/asSubscriber.md
@@ -1,35 +1,32 @@
# Source.asSubscriber
-Integration with Reactive Streams, materializes into a
@javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).
+Integration with Reactive Streams, materializes into a
`org.reactivestreams.Subscriber`.
@ref[Source operators](../index.md#source-operators)
## Signature
Scala
-:
@@snip[JavaFlowSupport.scala](/stream/src/main/scala-jdk-9/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala)
{ #asSubscriber }
+:
@@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala)
{ #asSubscriber }
Java
-:
@@snip[JavaFlowSupport.java](/docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java)
{ #api }
+:
@@snip[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java)
{ #api }
## Description
If you want to create a @apidoc[Source] that gets its elements from another
library that supports
-[Reactive Streams](https://www.reactive-streams.org/), you can use
`JavaFlowSupport.Source.asSubscriber`.
+[Reactive Streams](https://www.reactive-streams.org/), you can use
`Source.asSubscriber`.
Each time this @apidoc[Source] is materialized, it produces a materialized
value of type
-@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber).
-This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be
attached to a
-[Reactive Streams](https://www.reactive-streams.org/)
@javadoc[Publisher](java.util.concurrent.Flow.Publisher)
+@javadoc[org.reactivestreams.Subscriber](java.util.concurrent.Flow.Subscriber).
+This `org.reactivestreams.Subscriber` can be attached to a
+[Reactive Streams](https://www.reactive-streams.org/)
`org.reactivestreams.Publisher`
to populate it.
-If the API you want to consume elements from provides a
@javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a
@javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see
@ref[fromPublisher](fromPublisher.md).
+If the API you want to consume elements from provides a
`org.reactivestreams.Publisher` instead of accepting a
`org.reactivestreams.Subscriber`, see @ref[fromPublisher](fromPublisher.md).
-@@@ note
-
-For JDK 8 users: since
@javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced
in JDK version 9,
-if you are still on version 8 you may use the
[org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams)
library with @apidoc[Source.asSubscriber](Source$) {
scala="#asSubscriber[T]:org.apache.pekko.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]"
java="#asSubscriber()" }.
-
-@@@
+In Java 9, the Reactive Stream API was included in the JDK, and `Subscriber`
is available through
[Flow.Subscriber](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscriber.html).
+Since those APIs are identical but exist at different package namespaces and
does not depend on the Reactive Streams package a separate API for those is
available
+through
@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Source#asSubscriber`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Source#asSubscriber`].
## Example
@@ -44,7 +41,7 @@ backpressure is applied throughout the stream, preventing us
from running out of
rows are consumed slower than they are produced by the database.
Scala
-: @@snip
[AsSubscriber.scala](/docs/src/test/scala-jdk9-only/docs/stream/operators/source/AsSubscriber.scala)
{ #imports #example }
+: @@snip
[AsSubscriber.scala](/docs/src/test/scala/docs/stream/operators/source/AsSubscriber.scala)
{ #imports #example }
Java
-: @@snip
[AsSubscriber.java](/docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java)
{ #imports #example }
+: @@snip
[AsSubscriber.java](/docs/src/test/java/jdocs/stream/operators/source/AsSubscriber.java)
{ #imports #example }
diff --git a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md
b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md
index 69565eb604..1109337f99 100644
--- a/docs/src/main/paradox/stream/operators/Source/fromPublisher.md
+++ b/docs/src/main/paradox/stream/operators/Source/fromPublisher.md
@@ -1,33 +1,30 @@
# Source.fromPublisher
-Integration with Reactive Streams, subscribes to a
@javadoc[Publisher](java.util.concurrent.Flow.Publisher).
+Integration with Reactive Streams, subscribes to a
`org.reactivestreams.Publisher`.
@ref[Source operators](../index.md#source-operators)
## Signature
Scala
-:
@@snip[JavaFlowSupport.scala](/stream/src/main/scala-jdk-9/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala)
{ #fromPublisher }
+:
@@snip[JavaFlowSupport.scala](/stream/src/main/scala/org/apache/pekko/stream/scaladsl/JavaFlowSupport.scala)
{ #fromPublisher }
Java
-:
@@snip[JavaFlowSupport.java](/docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java)
{ #api }
+:
@@snip[JavaFlowSupport.java](/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java)
{ #api }
## Description
If you want to create a @apidoc[Source] that gets its elements from another
library that supports
-[Reactive Streams](https://www.reactive-streams.org/), you can use
`JavaFlowSupport.Source.fromPublisher`.
-This source will produce the elements from the
@javadoc[Publisher](java.util.concurrent.Flow.Publisher),
+[Reactive Streams](https://www.reactive-streams.org/), you can use
`Source.fromPublisher`.
+This source will produce the elements from the `org.reactivestreams.Publisher`,
and coordinate backpressure as needed.
-If the API you want to consume elements from accepts a
@javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing
a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see
@ref[asSubscriber](asSubscriber.md).
+If the API you want to consume elements from accepts a
`org.reactivestreams.Subscriber` instead of providing a
`org.reactivestreams.Publisher`, see @ref[asSubscriber](asSubscriber.md).
-@@@ note
-
-For JDK 8 users: since
@javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced
in JDK version 9,
-if you are still on version 8 you may use the
[org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams)
library with @apidoc[Source.fromPublisher](Source$) {
scala="#fromPublisher[T](publisher:org.reactivestreams.Publisher[T]):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]"
java="#fromPublisher(org.reactivestreams.Publisher)" }.
-
-@@@
+In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is
available through
[Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html).
+Since those APIs are identical but exist at different package namespaces and
does not depend on the Reactive Streams package a separate API for those is
available
+through
@scala[`org.apache.pekko.stream.scaladsl.JavaFlowSupport.Source#fromPublisher`]@java[`org.apache.pekko.stream.javadsl.JavaFlowSupport.Source#fromPublisher`].
## Example
@@ -41,7 +38,7 @@ backpressure is applied throughout the stream, preventing us
from running out of
rows are consumed slower than they are produced by the database.
Scala
-: @@snip
[FromPublisher.scala](/docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala)
{ #imports #example }
+: @@snip
[FromPublisher.scala](/docs/src/test/scala/docs/stream/operators/source/FromPublisher.scala)
{ #imports #example }
Java
-: @@snip
[FromPublisher.java](/docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java)
{ #imports #example }
+: @@snip
[FromPublisher.java](/docs/src/test/java/jdocs/stream/operators/source/FromPublisher.java)
{ #imports #example }
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 8bbf89a16f..059349da52 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -8,7 +8,7 @@ These built-in sources are available from
@scala[`org.apache.pekko.stream.scalad
| |Operator|Description|
|--|--|--|
|Source|<a
name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Extracts
context data from the elements of a `Source` so that it can be turned into a
`SourceWithContext` which can propagate that context per element along a
stream.|
-|Source|<a
name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration
with Reactive Streams, materializes into a
@javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
+|Source|<a
name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration
with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several
sources, using a given strategy such as merge or concat, into one source.|
|Source|<a
name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send
the single value of the `CompletionStage` when it completes and there is
demand.|
|Source|<a
name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams
the elements of an asynchronous source once its given *completion* operator
completes.|
@@ -23,7 +23,7 @@ These built-in sources are available from
@scala[`org.apache.pekko.stream.scalad
|Source|<a
name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the
values from an `Iterator`, requesting the next value when there is demand.|
|Source|<a
name="fromjavastream"></a>@ref[fromJavaStream](Source/fromJavaStream.md)|Stream
the values from a Java 8 `Stream`, requesting the next value when there is
demand.|
|Source|<a name="fromoption"></a>@ref[fromOption](Source/fromOption.md)|Create
a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value, emitting the
value if it is present.|
-|Source|<a
name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration
with Reactive Streams, subscribes to a
@javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
+|Source|<a
name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration
with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.|
|Source|<a
name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated
by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single
value of the `Future` when it completes and there is demand.|
|Source|<a
name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the
elements of the given future source once it successfully completes.|
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]