This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 7de1fb2edb feat: Add AutoCloseable shortcut on mapWithResource (#1053)
7de1fb2edb is described below
commit 7de1fb2edbd18fe7e3a912d3021c3344ed9522b6
Author: injae kim <[email protected]>
AuthorDate: Thu Feb 1 19:47:57 2024 +0900
feat: Add AutoCloseable shortcut on mapWithResource (#1053)
* feat: Add AutoCloseable shortcut on mapWithResource
* Enhance test to check resource is closed after stream is completed
* Enhance comment
* Update doc
* Address comment
* Add resume, restart, stop strategy test
* Address comment
* Fix doc
* Fix typo
---
.../operators/Source-or-Flow/mapWithResource.md | 6 +-
.../org/apache/pekko/stream/javadsl/FlowTest.java | 16 ++
.../apache/pekko/stream/javadsl/SourceTest.java | 13 ++
.../stream/scaladsl/FlowMapWithResourceSpec.scala | 194 ++++++++++++++++++++-
.../org/apache/pekko/stream/javadsl/Flow.scala | 39 +++++
.../org/apache/pekko/stream/javadsl/Source.scala | 39 +++++
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 39 +++++
.../apache/pekko/stream/javadsl/SubSource.scala | 39 +++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 37 ++++
9 files changed, 420 insertions(+), 2 deletions(-)
diff --git
a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
index 015d149e99..d18bb0b593 100644
--- a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
@@ -6,11 +6,15 @@ Map elements with the help of a resource that can be opened,
transform each elem
## Signature
-@apidoc[Flow.mapWithResource](Flow) {
scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D"
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)"
}
+@apidoc[Flow.mapWithResource](Flow) {
scala="#mapWithResource[S,T](create:()=%3ES)(f:(S,Out)=%3ET,close:S=%3EOption[T]):Repr[T]"
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)"
}
1. `create`: Open or Create the resource.
2. `f`: Transform each element inputs with the help of resource.
3. `close`: Close the resource, invoked on end of stream or if the stream
fails, optionally outputting a last element.
+@apidoc[Flow.mapWithResource](Flow) {
scala="#mapWithResource[S%3C:AutoCloseable,T](create:()=%3ES,f:(S,Out)=%3ET):Repr[T]"
java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)"
}
+1. `create`: Open or Create the autocloseable resource.
+2. `f`: Transform each element inputs with the help of resource.
+
## Description
Transform each stream element with the help of a resource.
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index bdfcf3a6d5..4110cf61ee 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -37,6 +37,7 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -237,6 +238,21 @@ public class FlowTest extends StreamTest {
Assert.assertFalse(gate.get());
}
+ @Test
+ public void mustBeAbleToUseMapWithAutoCloseableResource() {
+ final TestKit probe = new TestKit(system);
+ final AtomicInteger closed = new AtomicInteger();
+ Source.from(Arrays.asList("1", "2", "3"))
+ .via(
+ Flow.of(String.class)
+ .mapWithResource(
+ () -> (AutoCloseable) closed::incrementAndGet, (resource,
elem) -> elem))
+ .runWith(Sink.foreach(elem -> probe.getRef().tell(elem,
ActorRef.noSender())), system);
+
+ probe.expectMsgAllOf("1", "2", "3");
+ Assert.assertEquals(closed.get(), 1);
+ }
+
@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index b1ddb49111..997d401a83 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -47,6 +47,7 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -815,6 +816,18 @@ public class SourceTest extends StreamTest {
Assert.assertFalse(gate.get());
}
+ @Test
+ public void mustBeAbleToUseMapWithAutoCloseableResource() {
+ final TestKit probe = new TestKit(system);
+ final AtomicInteger closed = new AtomicInteger();
+ Source.from(Arrays.asList("1", "2", "3"))
+ .mapWithResource(() -> (AutoCloseable) closed::incrementAndGet,
(resource, elem) -> elem)
+ .runWith(Sink.foreach(elem -> probe.getRef().tell(elem,
ActorRef.noSender())), system);
+
+ probe.expectMsgAllOf("1", "2", "3");
+ Assert.assertEquals(closed.get(), 1);
+ }
+
@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index 32eae23712..56db0ed625 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -35,7 +35,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.{ AbruptTerminationException, ActorAttributes,
ActorMaterializer, SystemMaterializer }
import pekko.stream.ActorAttributes.supervisionStrategy
-import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
+import pekko.stream.Supervision.{ restartingDecider, resumingDecider,
stoppingDecider }
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import pekko.stream.impl.StreamSupervisor.Children
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
@@ -410,6 +410,198 @@ class FlowMapWithResourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
Await.result(promise.future, 3.seconds) shouldBe Done
}
+ "will close the autocloseable resource when upstream complete" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(create, (_: AutoCloseable, count) => count)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.expectSubscription().request(2)
+ closedCounter.get shouldBe 0
+ pub.sendNext(1)
+ sub.expectNext(1)
+ closedCounter.get shouldBe 0
+ pub.sendComplete()
+ sub.expectComplete()
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the autocloseable resource when upstream fail" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(create, (_: AutoCloseable, count) => count)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.expectSubscription().request(2)
+ closedCounter.get shouldBe 0
+ pub.sendNext(1)
+ sub.expectNext(1)
+ closedCounter.get shouldBe 0
+ pub.sendError(ex)
+ sub.expectError(ex)
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the autocloseable resource when downstream cancel" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(create, (_: AutoCloseable, count) => count)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ val subscription = sub.expectSubscription()
+ subscription.request(2)
+ closedCounter.get shouldBe 0
+ pub.sendNext(1)
+ sub.expectNext(1)
+ closedCounter.get shouldBe 0
+ subscription.cancel()
+ pub.expectCancellation()
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the autocloseable resource when downstream fail" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(create, (_: AutoCloseable, count) => count)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.request(2)
+ closedCounter.get shouldBe 0
+ pub.sendNext(1)
+ sub.expectNext(1)
+ closedCounter.get shouldBe 0
+ sub.cancel(ex)
+ pub.expectCancellationWithCause(ex)
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the autocloseable resource on abrupt materializer termination"
in {
+ val closedCounter = new AtomicInteger(0)
+ @nowarn("msg=deprecated")
+ val mat = ActorMaterializer()
+ val promise = Promise[Done]()
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = {
+ closedCounter.incrementAndGet()
+ promise.complete(Success(Done))
+ }
+ }
+ val matVal = Source
+ .single(1)
+ .mapWithResource(create, (_: AutoCloseable, count) => count)
+ .runWith(Sink.never)(mat)
+ closedCounter.get shouldBe 0
+ mat.shutdown()
+ matVal.failed.futureValue shouldBe an[AbruptTerminationException]
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ closedCounter.get shouldBe 1
+ }
+
+ "continue with autoCloseable when Strategy is Resume and exception
happened" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val p = Source
+ .fromIterator(() => (0 to 50).iterator)
+ .mapWithResource(create,
+ (_: AutoCloseable, elem) => {
+ if (elem == 10) throw TE("") else elem
+ })
+ .withAttributes(supervisionStrategy(resumingDecider))
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[Int]()
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 48).foreach(i => {
+ sub.request(1)
+ c.expectNext() should ===(if (i < 10) i else i + 1)
+ })
+ sub.request(1)
+ c.expectNext(50)
+ c.expectComplete()
+ closedCounter.get shouldBe 1
+ }
+
+ "close and open stream with autocloseable again when Strategy is Restart"
in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val p = Source
+ .fromIterator(() => (0 to 50).iterator)
+ .mapWithResource(create,
+ (_: AutoCloseable, elem) => {
+ if (elem == 10 || elem == 20) throw TE("") else elem
+ })
+ .withAttributes(supervisionStrategy(restartingDecider))
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[Int]()
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 30).filter(i => i != 10 && i != 20).foreach(i => {
+ sub.request(1)
+ c.expectNext() shouldBe i
+ closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2)
+ })
+ sub.cancel()
+ }
+
+ "stop stream with autoCloseable when Strategy is Stop and exception
happened" in {
+ val closedCounter = new AtomicInteger(0)
+ val create = () =>
+ new AutoCloseable {
+ override def close(): Unit = closedCounter.incrementAndGet()
+ }
+ val p = Source
+ .fromIterator(() => (0 to 50).iterator)
+ .mapWithResource(create,
+ (_: AutoCloseable, elem) => {
+ if (elem == 10) throw TE("") else elem
+ })
+ .withAttributes(supervisionStrategy(stoppingDecider))
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[Int]()
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 9).foreach(i => {
+ sub.request(1)
+ c.expectNext() shouldBe i
+ })
+ sub.request(1)
+ c.expectError()
+ closedCounter.get shouldBe 1
+ }
+
}
override def afterTermination(): Unit = {
fs.close()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index c48f246983..3ed9bdf150 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
+ /**
+ * Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
+ *
+ * The resource creation function is invoked once when the stream is
materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function
returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as
stream element - according to the Reactive Streams specification.
+ *
+ * The [[AutoCloseable]] resource is closed only once when the upstream or
downstream finishes or fails.
+ *
+ * Early completion can be done with combination of the [[takeWhile]]
operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the
`pekko.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream
is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource
to output element
+ * @since 1.1.0
+ */
+ def mapWithResource[R <: AutoCloseable, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
+ mapWithResource(create, f,
+ (resource: AutoCloseable) => {
+ resource.close()
+ Optional.empty()
+ })
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be
stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 43125ade66..b8d521a420 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
+ /**
+ * Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
+ *
+ * The resource creation function is invoked once when the stream is
materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function
returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as
stream element - according to the Reactive Streams specification.
+ *
+ * The [[AutoCloseable]] resource is closed only once when the upstream or
downstream finishes or fails.
+ *
+ * Early completion can be done with combination of the [[takeWhile]]
operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the
`pekko.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream
is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource
to output element
+ * @since 1.1.0
+ */
+ def mapWithResource[R <: AutoCloseable, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
+ mapWithResource(create, f,
+ (resource: AutoCloseable) => {
+ resource.close()
+ Optional.empty()
+ })
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be
stateful,
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 8fa5662ab8..fc258e512b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -285,6 +285,45 @@ class SubFlow[In, Out, Mat](
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
+ /**
+ * Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
+ *
+ * The resource creation function is invoked once when the stream is
materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function
returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as
stream element - according to the Reactive Streams specification.
+ *
+ * The [[AutoCloseable]] resource is closed only once when the upstream or
downstream finishes or fails.
+ *
+ * Early completion can be done with combination of the [[takeWhile]]
operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the
`pekko.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream
is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource
to output element
+ * @since 1.1.0
+ */
+ def mapWithResource[R <: AutoCloseable, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T]): javadsl.SubFlow[In, T, Mat] =
+ mapWithResource(create, f,
+ (resource: AutoCloseable) => {
+ resource.close()
+ Optional.empty()
+ })
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be
stateful,
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index f64f003a9c..340ea3063c 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -276,6 +276,45 @@ class SubSource[Out, Mat](
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
+ /**
+ * Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
+ *
+ * The resource creation function is invoked once when the stream is
materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function
returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as
stream element - according to the Reactive Streams specification.
+ *
+ * The [[AutoCloseable]] resource is closed only once when the upstream or
downstream finishes or fails.
+ *
+ * Early completion can be done with combination of the [[takeWhile]]
operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the
`pekko.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream
is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource
to output element
+ * @since 1.1.0
+ */
+ def mapWithResource[R <: AutoCloseable, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T]): javadsl.SubSource[T, Mat] =
+ mapWithResource(create, f,
+ (resource: AutoCloseable) => {
+ resource.close()
+ Optional.empty()
+ })
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be
stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index d328a8ebb2..0b73b0b90a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1142,6 +1142,43 @@ trait FlowOps[+Out, +Mat] {
resource => close(resource))
.withAttributes(DefaultAttributes.mapWithResource))
+ /**
+ * Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
+ *
+ * The resource creation function is invoked once when the stream is
materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function
returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as
stream element - according to the Reactive Streams specification.
+ *
+ * The [[AutoCloseable]] resource is closed only once when the upstream or
downstream finishes or fails.
+ *
+ * Early completion can be done with combination of the [[takeWhile]]
operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the
`pekko.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream
is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource
to output element
+ * @since 1.1.0
+ */
+ def mapWithResource[R <: AutoCloseable, T](create: () => R, f: (R, Out) =>
T): Repr[T] =
+ mapWithResource(create)(f,
+ (resource: AutoCloseable) => {
+ resource.close()
+ None
+ })
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be
stateful,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]