This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 7de1fb2edb feat: Add AutoCloseable shortcut on mapWithResource (#1053)
7de1fb2edb is described below

commit 7de1fb2edbd18fe7e3a912d3021c3344ed9522b6
Author: injae kim <[email protected]>
AuthorDate: Thu Feb 1 19:47:57 2024 +0900

    feat: Add AutoCloseable shortcut on mapWithResource (#1053)
    
    * feat: Add AutoCloseable shortcut on mapWithResource
    
    * Enhance test to check resource is closed after stream is completed
    
    * Enhance comment
    
    * Update doc
    
    * Address comment
    
    * Add resume, restart, stop strategy test
    
    * Address comment
    
    * Fix doc
    
    * Fix typo
---
 .../operators/Source-or-Flow/mapWithResource.md    |   6 +-
 .../org/apache/pekko/stream/javadsl/FlowTest.java  |  16 ++
 .../apache/pekko/stream/javadsl/SourceTest.java    |  13 ++
 .../stream/scaladsl/FlowMapWithResourceSpec.scala  | 194 ++++++++++++++++++++-
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  39 +++++
 .../org/apache/pekko/stream/javadsl/Source.scala   |  39 +++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  |  39 +++++
 .../apache/pekko/stream/javadsl/SubSource.scala    |  39 +++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  37 ++++
 9 files changed, 420 insertions(+), 2 deletions(-)

diff --git 
a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
index 015d149e99..d18bb0b593 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
@@ -6,11 +6,15 @@ Map elements with the help of a resource that can be opened, 
transform each elem
 
 ## Signature
 
-@apidoc[Flow.mapWithResource](Flow) { 
scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D"
 
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)"
 }
+@apidoc[Flow.mapWithResource](Flow) { 
scala="#mapWithResource[S,T](create:()=%3ES)(f:(S,Out)=%3ET,close:S=%3EOption[T]):Repr[T]"
 
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)"
 }
 1. `create`: Open or Create the resource.
 2. `f`: Transform each element inputs with the help of resource.
 3. `close`: Close the resource, invoked on end of stream or if the stream 
fails, optionally outputting a last element.
 
+@apidoc[Flow.mapWithResource](Flow) { 
scala="#mapWithResource[S%3C:AutoCloseable,T](create:()=%3ES,f:(S,Out)=%3ET):Repr[T]"
 
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)"
 }
+1. `create`: Open or Create the autocloseable resource.
+2. `f`: Transform each element inputs with the help of resource.
+
 ## Description
 
 Transform each stream element with the help of a resource.
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index bdfcf3a6d5..4110cf61ee 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -37,6 +37,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -237,6 +238,21 @@ public class FlowTest extends StreamTest {
     Assert.assertFalse(gate.get());
   }
 
+  @Test
+  public void mustBeAbleToUseMapWithAutoCloseableResource() {
+    final TestKit probe = new TestKit(system);
+    final AtomicInteger closed = new AtomicInteger();
+    Source.from(Arrays.asList("1", "2", "3"))
+        .via(
+            Flow.of(String.class)
+                .mapWithResource(
+                    () -> (AutoCloseable) closed::incrementAndGet, (resource, 
elem) -> elem))
+        .runWith(Sink.foreach(elem -> probe.getRef().tell(elem, 
ActorRef.noSender())), system);
+
+    probe.expectMsgAllOf("1", "2", "3");
+    Assert.assertEquals(closed.get(), 1);
+  }
+
   @Test
   public void mustBeAbleToUseFoldWhile() throws Exception {
     final int result =
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index b1ddb49111..997d401a83 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -47,6 +47,7 @@ import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -815,6 +816,18 @@ public class SourceTest extends StreamTest {
     Assert.assertFalse(gate.get());
   }
 
+  @Test
+  public void mustBeAbleToUseMapWithAutoCloseableResource() {
+    final TestKit probe = new TestKit(system);
+    final AtomicInteger closed = new AtomicInteger();
+    Source.from(Arrays.asList("1", "2", "3"))
+        .mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, 
(resource, elem) -> elem)
+        .runWith(Sink.foreach(elem -> probe.getRef().tell(elem, 
ActorRef.noSender())), system);
+
+    probe.expectMsgAllOf("1", "2", "3");
+    Assert.assertEquals(closed.get(), 1);
+  }
+
   @Test
   public void mustBeAbleToUseFoldWhile() throws Exception {
     final int result =
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index 32eae23712..56db0ed625 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -35,7 +35,7 @@ import org.apache.pekko
 import pekko.Done
 import pekko.stream.{ AbruptTerminationException, ActorAttributes, 
ActorMaterializer, SystemMaterializer }
 import pekko.stream.ActorAttributes.supervisionStrategy
-import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
+import pekko.stream.Supervision.{ restartingDecider, resumingDecider, 
stoppingDecider }
 import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
 import pekko.stream.impl.StreamSupervisor.Children
 import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
@@ -410,6 +410,198 @@ class FlowMapWithResourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
       Await.result(promise.future, 3.seconds) shouldBe Done
     }
 
+    "will close the autocloseable resource when upstream complete" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val (pub, sub) = TestSource
+        .probe[Int]
+        .mapWithResource(create, (_: AutoCloseable, count) => count)
+        .toMat(TestSink.probe)(Keep.both)
+        .run()
+      sub.expectSubscription().request(2)
+      closedCounter.get shouldBe 0
+      pub.sendNext(1)
+      sub.expectNext(1)
+      closedCounter.get shouldBe 0
+      pub.sendComplete()
+      sub.expectComplete()
+      closedCounter.get shouldBe 1
+    }
+
+    "will close the autocloseable resource when upstream fail" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val (pub, sub) = TestSource
+        .probe[Int]
+        .mapWithResource(create, (_: AutoCloseable, count) => count)
+        .toMat(TestSink.probe)(Keep.both)
+        .run()
+      sub.expectSubscription().request(2)
+      closedCounter.get shouldBe 0
+      pub.sendNext(1)
+      sub.expectNext(1)
+      closedCounter.get shouldBe 0
+      pub.sendError(ex)
+      sub.expectError(ex)
+      closedCounter.get shouldBe 1
+    }
+
+    "will close the autocloseable resource when downstream cancel" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val (pub, sub) = TestSource
+        .probe[Int]
+        .mapWithResource(create, (_: AutoCloseable, count) => count)
+        .toMat(TestSink.probe)(Keep.both)
+        .run()
+      val subscription = sub.expectSubscription()
+      subscription.request(2)
+      closedCounter.get shouldBe 0
+      pub.sendNext(1)
+      sub.expectNext(1)
+      closedCounter.get shouldBe 0
+      subscription.cancel()
+      pub.expectCancellation()
+      closedCounter.get shouldBe 1
+    }
+
+    "will close the autocloseable resource when downstream fail" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val (pub, sub) = TestSource
+        .probe[Int]
+        .mapWithResource(create, (_: AutoCloseable, count) => count)
+        .toMat(TestSink.probe)(Keep.both)
+        .run()
+      sub.request(2)
+      closedCounter.get shouldBe 0
+      pub.sendNext(1)
+      sub.expectNext(1)
+      closedCounter.get shouldBe 0
+      sub.cancel(ex)
+      pub.expectCancellationWithCause(ex)
+      closedCounter.get shouldBe 1
+    }
+
+    "will close the autocloseable resource on abrupt materializer termination" 
in {
+      val closedCounter = new AtomicInteger(0)
+      @nowarn("msg=deprecated")
+      val mat = ActorMaterializer()
+      val promise = Promise[Done]()
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = {
+            closedCounter.incrementAndGet()
+            promise.complete(Success(Done))
+          }
+        }
+      val matVal = Source
+        .single(1)
+        .mapWithResource(create, (_: AutoCloseable, count) => count)
+        .runWith(Sink.never)(mat)
+      closedCounter.get shouldBe 0
+      mat.shutdown()
+      matVal.failed.futureValue shouldBe an[AbruptTerminationException]
+      Await.result(promise.future, 3.seconds) shouldBe Done
+      closedCounter.get shouldBe 1
+    }
+
+    "continue with autoCloseable when Strategy is Resume and exception 
happened" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val p = Source
+        .fromIterator(() => (0 to 50).iterator)
+        .mapWithResource(create,
+          (_: AutoCloseable, elem) => {
+            if (elem == 10) throw TE("") else elem
+          })
+        .withAttributes(supervisionStrategy(resumingDecider))
+        .runWith(Sink.asPublisher(false))
+      val c = TestSubscriber.manualProbe[Int]()
+
+      p.subscribe(c)
+      val sub = c.expectSubscription()
+
+      (0 to 48).foreach(i => {
+        sub.request(1)
+        c.expectNext() should ===(if (i < 10) i else i + 1)
+      })
+      sub.request(1)
+      c.expectNext(50)
+      c.expectComplete()
+      closedCounter.get shouldBe 1
+    }
+
+    "close and open stream with autocloseable again when Strategy is Restart" 
in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val p = Source
+        .fromIterator(() => (0 to 50).iterator)
+        .mapWithResource(create,
+          (_: AutoCloseable, elem) => {
+            if (elem == 10 || elem == 20) throw TE("") else elem
+          })
+        .withAttributes(supervisionStrategy(restartingDecider))
+        .runWith(Sink.asPublisher(false))
+      val c = TestSubscriber.manualProbe[Int]()
+
+      p.subscribe(c)
+      val sub = c.expectSubscription()
+
+      (0 to 30).filter(i => i != 10 && i != 20).foreach(i => {
+        sub.request(1)
+        c.expectNext() shouldBe i
+        closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2)
+      })
+      sub.cancel()
+    }
+
+    "stop stream with autoCloseable when Strategy is Stop and exception 
happened" in {
+      val closedCounter = new AtomicInteger(0)
+      val create = () =>
+        new AutoCloseable {
+          override def close(): Unit = closedCounter.incrementAndGet()
+        }
+      val p = Source
+        .fromIterator(() => (0 to 50).iterator)
+        .mapWithResource(create,
+          (_: AutoCloseable, elem) => {
+            if (elem == 10) throw TE("") else elem
+          })
+        .withAttributes(supervisionStrategy(stoppingDecider))
+        .runWith(Sink.asPublisher(false))
+      val c = TestSubscriber.manualProbe[Int]()
+
+      p.subscribe(c)
+      val sub = c.expectSubscription()
+
+      (0 to 9).foreach(i => {
+        sub.request(1)
+        c.expectNext() shouldBe i
+      })
+      sub.request(1)
+      c.expectError()
+      closedCounter.get shouldBe 1
+    }
+
   }
   override def afterTermination(): Unit = {
     fs.close()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index c48f246983..3ed9bdf150 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
         (resource, out) => f(resource, out),
         resource => close.apply(resource).toScala))
 
+  /**
+   * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The [[AutoCloseable]] resource is closed only once when the upstream or 
downstream finishes or fails.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`pekko.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @since 1.1.0
+   */
+  def mapWithResource[R <: AutoCloseable, T](
+      create: function.Creator[R],
+      f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
+    mapWithResource(create, f,
+      (resource: AutoCloseable) => {
+        resource.close()
+        Optional.empty()
+      })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream. The transformation is meant to be 
stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 43125ade66..b8d521a420 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
         (resource, out) => f(resource, out),
         resource => close.apply(resource).toScala))
 
+  /**
+   * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The [[AutoCloseable]] resource is closed only once when the upstream or 
downstream finishes or fails.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`pekko.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @since 1.1.0
+   */
+  def mapWithResource[R <: AutoCloseable, T](
+      create: function.Creator[R],
+      f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
+    mapWithResource(create, f,
+      (resource: AutoCloseable) => {
+        resource.close()
+        Optional.empty()
+      })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream. The transformation is meant to be 
stateful,
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 8fa5662ab8..fc258e512b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -285,6 +285,45 @@ class SubFlow[In, Out, Mat](
         (resource, out) => f(resource, out),
         resource => close.apply(resource).toScala))
 
+  /**
+   * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The [[AutoCloseable]] resource is closed only once when the upstream or 
downstream finishes or fails.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`pekko.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @since 1.1.0
+   */
+  def mapWithResource[R <: AutoCloseable, T](
+      create: function.Creator[R],
+      f: function.Function2[R, Out, T]): javadsl.SubFlow[In, T, Mat] =
+    mapWithResource(create, f,
+      (resource: AutoCloseable) => {
+        resource.close()
+        Optional.empty()
+      })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream. The transformation is meant to be 
stateful,
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index f64f003a9c..340ea3063c 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -276,6 +276,45 @@ class SubSource[Out, Mat](
         (resource, out) => f(resource, out),
         resource => close.apply(resource).toScala))
 
+  /**
+   * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The [[AutoCloseable]] resource is closed only once when the upstream or 
downstream finishes or fails.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`pekko.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @since 1.1.0
+   */
+  def mapWithResource[R <: AutoCloseable, T](
+      create: function.Creator[R],
+      f: function.Function2[R, Out, T]): javadsl.SubSource[T, Mat] =
+    mapWithResource(create, f,
+      (resource: AutoCloseable) => {
+        resource.close()
+        Optional.empty()
+      })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream. The transformation is meant to be 
stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index d328a8ebb2..0b73b0b90a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1142,6 +1142,43 @@ trait FlowOps[+Out, +Mat] {
         resource => close(resource))
         .withAttributes(DefaultAttributes.mapWithResource))
 
+  /**
+   * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.
+   *
+   * The resource creation function is invoked once when the stream is 
materialized and the returned resource is passed to
+   * the mapping function for mapping the first element. The mapping function 
returns a mapped element to emit
+   * downstream. The returned `T` MUST NOT be `null` as it is illegal as 
stream element - according to the Reactive Streams specification.
+   *
+   * The [[AutoCloseable]] resource is closed only once when the upstream or 
downstream finishes or fails.
+   *
+   * Early completion can be done with combination of the [[takeWhile]] 
operator.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * You can configure the default dispatcher for this Source by changing the 
`pekko.stream.materializer.blocking-io-dispatcher` or
+   * set it for a given Source by using [[ActorAttributes]].
+   *
+   * '''Emits when''' the mapping function returns an element and downstream 
is ready to consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @tparam R the type of the resource
+   * @tparam T the type of the output elements
+   * @param create function that creates the resource
+   * @param f function that transforms the upstream element and the resource 
to output element
+   * @since 1.1.0
+   */
+  def mapWithResource[R <: AutoCloseable, T](create: () => R, f: (R, Out) => 
T): Repr[T] =
+    mapWithResource(create)(f,
+      (resource: AutoCloseable) => {
+        resource.close()
+        None
+      })
+
   /**
    * Transform each input element into an `Iterable` of output elements that is
    * then flattened into the output stream. The transformation is meant to be 
stateful,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to