This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new abad72d869 feat: Add Sink.exists operator (#990)
abad72d869 is described below

commit abad72d8696e27c1c84ed35108240c1ad5cf4b05
Author: Luigi <[email protected]>
AuthorDate: Tue Jan 30 00:31:23 2024 -0600

    feat: Add Sink.exists operator (#990)
    
    * feat: +Flow.exists & Sink.exists
    
    * chore: Update some doc and code.
    
    * chore: Update method names in doc.
    
    ---------
    
    Co-authored-by: He-Pin <[email protected]>
---
 .../main/paradox/stream/operators/Sink/exists.md   | 47 ++++++++++++++++++++++
 .../main/paradox/stream/operators/Sink/forall.md   |  2 +-
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../java/jdocs/stream/operators/sink/Exists.java   | 44 ++++++++++++++++++++
 .../scala/docs/stream/operators/sink/Exists.scala  | 44 ++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/SinkTest.java  |  9 +++++
 .../apache/pekko/stream/scaladsl/SinkSpec.scala    | 41 +++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Sink.scala     | 25 ++++++++++++
 .../org/apache/pekko/stream/scaladsl/Sink.scala    | 24 +++++++++++
 9 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Sink/exists.md 
b/docs/src/main/paradox/stream/operators/Sink/exists.md
new file mode 100644
index 0000000000..c111539137
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/exists.md
@@ -0,0 +1,47 @@
+# Sink.exists
+
+A `Sink` that will test the given predicate `p` for every received element and 
completes with the result.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.exists](Sink$) { 
scala="#exists[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]"
 java="#exists(org.apache.pekko.japi.function.Predicate)" }
+
+## Description
+`exists` applies a predicate function to assert each element received, it 
returns true if any elements satisfy the assertion, otherwise it returns false.
+
+It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) 
that completes with the last state when the stream has finished.
+
+Notes that if source is empty, it will return false
+
+A `Sink` that will test the given predicate `p` for every received element and
+
+- completes and returns  @scala[`Future`] @java[`CompletionStage`] of `true` 
if the predicate is true for any element;
+- completes and returns  @scala[`Future`] @java[`CompletionStage`] of `false` 
if the stream is empty (i.e. completes before signalling any elements);
+- completes and returns  @scala[`Future`] @java[`CompletionStage`] of `false` 
if the predicate is false for all elements.
+
+The materialized value @scala[`Future`] @java[`CompletionStage`] will be 
completed with the value `true` or `false`
+when the input stream ends, or completed with `Failure` if there is a failure 
signaled in the stream.
+
+## Example
+
+This example tests any element in the stream is `>` 3.
+
+Scala
+:   @@snip 
[exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { 
#exists }
+
+Java
+:   @@snip 
[exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { 
#exists }
+
+## Reactive Streams Semantics
+
+@@@div { .callout }
+
+***Completes*** when upstream completes or the predicate `p` returns `true`
+
+**cancels** when predicate `p` returns `true`
+
+**backpressures** when the invocation of predicate `p` has not yet completed
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/Sink/forall.md 
b/docs/src/main/paradox/stream/operators/Sink/forall.md
index 387f5b2fb6..a1a6cc2732 100644
--- a/docs/src/main/paradox/stream/operators/Sink/forall.md
+++ b/docs/src/main/paradox/stream/operators/Sink/forall.md
@@ -44,4 +44,4 @@ Java
 
 **backpressures** when the invocation of predicate `p` has not yet completed
 
-@@@
\ No newline at end of file
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index e6d564d962..908320c26d 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -60,6 +60,7 @@ These built-in sinks are available from 
@scala[`org.apache.pekko.stream.scaladsl
 |Sink|<a 
name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all 
values emitted from the stream into a collection.]@java[Operator only available 
in the Scala API. The closest operator in the Java API is 
@ref[`Sink.seq`](Sink/seq.md)].|
 |Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several 
sinks into one using a user specified strategy|
 |Sink|<a 
name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams
 the elements to the given future sink once it successfully completes. |
+|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will 
test the given predicate `p` for every received element and completes with the 
result.|
 |Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements 
with a function, where each invocation will get the new element and the result 
from the previous fold invocation.|
 |Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over 
emitted elements with a function, where each invocation will get the new 
element and the result from the previous fold invocation.|
 |Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will 
test the given predicate `p` for every received element and completes with the 
result.|
@@ -445,6 +446,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [dropWhile](Source-or-Flow/dropWhile.md)
 * [dropWithin](Source-or-Flow/dropWithin.md)
 * [empty](Source/empty.md)
+* [exists](Sink/exists.md)
 * [expand](Source-or-Flow/expand.md)
 * [extrapolate](Source-or-Flow/extrapolate.md)
 * [failed](Source/failed.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/sink/Exists.java 
b/docs/src/test/java/jdocs/stream/operators/sink/Exists.java
new file mode 100644
index 0000000000..0b1513cdd0
--- /dev/null
+++ b/docs/src/test/java/jdocs/stream/operators/sink/Exists.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdocs.stream.operators.sink;
+
+// #imports
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+
+import java.util.concurrent.TimeUnit;
+// #imports
+
+public class Exists {
+  private static final ActorSystem system = null;
+
+  private void existsExample() throws Exception {
+    // #exists
+    final boolean anyMatch =
+        Source.range(1, 4)
+            .runWith(Sink.exists(elem -> elem > 3), system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    System.out.println(anyMatch);
+    // Expected prints:
+    // true
+    // #exists
+  }
+}
diff --git a/docs/src/test/scala/docs/stream/operators/sink/Exists.scala 
b/docs/src/test/scala/docs/stream/operators/sink/Exists.scala
new file mode 100644
index 0000000000..e5648057fc
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sink/Exists.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.sink
+
+//#imports
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.stream.scaladsl._
+
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.{ Await, ExecutionContext }
+//#imports
+
+object Exists {
+
+  implicit val system: ActorSystem = null
+  implicit val ec: ExecutionContext = system.dispatcher
+
+  def existsExample(): Unit = {
+    // #exists
+    val result = Source(1 to 4)
+      .runWith(Sink.exists(_ > 3))
+    val anyMatch = Await.result(result, 3.seconds)
+    println(anyMatch)
+    // Expect prints:
+    // true
+    // #exists
+  }
+
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index ed4d283867..652754ce8a 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -245,4 +245,13 @@ public class SinkTest extends StreamTest {
     boolean allMatch = cs.toCompletableFuture().get(100, 
TimeUnit.MILLISECONDS);
     assertTrue(allMatch);
   }
+
+  @Test
+  public void sinkMustBeAbleToUseForExists()
+      throws InterruptedException, ExecutionException, TimeoutException {
+    CompletionStage<Boolean> cs =
+        Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.exists(param -> 
param > 3), system);
+    boolean anyMatch = cs.toCompletableFuture().get(100, 
TimeUnit.MILLISECONDS);
+    assertTrue(anyMatch);
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index 2ab8601998..8368d7a96c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -381,6 +381,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with 
ScalaFutures {
 
   }
 
+  "The exists sink" must {
+
+    "completes with `false` when none element match" in {
+      Source(1 to 4)
+        .runWith(Sink.exists[Int](_ > 5))
+        .futureValue shouldBe false
+    }
+
+    "completes with `true` when any element match" in {
+      Source(1 to 4)
+        .runWith(Sink.exists(_ > 2))
+        .futureValue shouldBe true
+    }
+
+    "completes with `false` if the stream is empty" in {
+      Source.empty[Int]
+        .runWith(Sink.exists(_ > 2))
+        .futureValue shouldBe false
+    }
+
+    "completes with `Failure` if the stream failed" in {
+      Source.failed[Int](new RuntimeException("Oops"))
+        .runWith(Sink.exists(_ > 2))
+        .failed.futureValue shouldBe a[RuntimeException]
+    }
+
+    "completes with `exists` with restart strategy" in {
+      val sink = Sink.exists[Int](elem => {
+        if (elem == 2) {
+          throw new RuntimeException("Oops")
+        }
+        elem > 1
+      }).withAttributes(supervisionStrategy(Supervision.restartingDecider))
+
+      Source(1 to 2)
+        .runWith(sink)
+        .futureValue shouldBe false
+    }
+
+  }
+
   "Sink pre-materialization" must {
     "materialize the sink and wrap its exposed publisher in a Source" in {
       val publisherSink: Sink[String, Publisher[String]] = 
Sink.asPublisher[String](false)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 0e2a0a9f2b..0805e5df74 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -99,6 +99,31 @@ object Sink {
       
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
   }
 
+  /**
+   * A `Sink` that will test the given predicate `p` for every received 
element and
+   *  1. completes and returns [[java.util.concurrent.CompletionStage]] of 
`true` if the predicate is true for any element;
+   *  2. completes and returns [[java.util.concurrent.CompletionStage]] of 
`false` if the stream is empty (i.e. completes before signalling any elements);
+   *  3. completes and returns [[java.util.concurrent.CompletionStage]] of 
`false` if the predicate is false for all elements.
+   *
+   * The materialized value [[java.util.concurrent.CompletionStage]] will be 
completed with the value `true` or `false`
+   * when the input stream ends, or completed with `Failure` if there is a 
failure signaled in the stream.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Completes when''' upstream completes or the predicate `p` returns 
`true`
+   *
+   * '''Backpressures when''' the invocation of predicate `p` has not yet 
completed
+   *
+   * '''Cancels when''' predicate `p` returns `true`
+   *
+   * @since 1.1.0
+   */
+  def exists[In](p: function.Predicate[In]): javadsl.Sink[In, 
CompletionStage[java.lang.Boolean]] = {
+    import pekko.util.FutureConverters._
+    new Sink(scaladsl.Sink.exists[In](p.test)
+      
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
+  }
+
   /**
    * Creates a sink which materializes into a ``CompletionStage`` which will 
be completed with a result of the Java ``Collector``
    * transformation and reduction operations. This allows usage of Java 
streams transformations for reactive streams.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index b607f1b12c..841685e795 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -464,6 +464,30 @@ object Sink {
       .toMat(Sink.head)(Keep.right)
       .named("forallSink")
 
+  /**
+   * A `Sink` that will test the given predicate `p` for every received 
element and
+   *  1. completes and returns [[scala.concurrent.Future]] of `true` if the 
predicate is true for any element;
+   *  2. completes and returns [[scala.concurrent.Future]] of `false` if the 
stream is empty (i.e. completes before signalling any elements);
+   *  3. completes and returns [[scala.concurrent.Future]] of `false` if the 
predicate is false for all elements.
+   *
+   * The materialized value [[scala.concurrent.Future]] will be completed with 
the value `true` or `false`
+   * when the input stream ends, or completed with `Failure` if there is a 
failure signaled in the stream.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Completes when''' upstream completes or the predicate `p` returns 
`true`
+   *
+   * '''Backpressures when''' the invocation of predicate `p` has not yet 
completed
+   *
+   * '''Cancels when''' predicate `p` returns `true`
+   *
+   * @since 1.1.0
+   */
+  def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] =
+    Flow[T].foldWhile(false)(!_)(_ || p(_))
+      .toMat(Sink.head)(Keep.right)
+      .named("existsSink")
+
   /**
    * A `Sink` that will invoke the given function for every received element, 
giving it its previous
    * output (from the second element) and the element as input.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to