This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.1.x by this push:
new de76c11ad0 Allow overriding dispatcher in mapWithResource (#1949)
(#2102)
de76c11ad0 is described below
commit de76c11ad02e3b5af051b19adfa042ec44306373
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Aug 27 16:21:27 2025 +0100
Allow overriding dispatcher in mapWithResource (#1949) (#2102)
* Allow overriding the dispatcher in mapWithResource
Closes #1948
Co-authored-by: Piotr SowiĆski <[email protected]>
---
.../stream/scaladsl/FlowMapWithResourceSpec.scala | 27 +++++++++++++++++++++-
.../org/apache/pekko/stream/scaladsl/Flow.scala | 2 +-
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index 56db0ed625..894e372a8a 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -33,7 +33,7 @@ import com.google.common.jimfs.{ Configuration, Jimfs }
import org.apache.pekko
import pekko.Done
-import pekko.stream.{ AbruptTerminationException, ActorAttributes,
ActorMaterializer, SystemMaterializer }
+import pekko.stream.{ AbruptTerminationException, ActorAttributes,
ActorMaterializer, Attributes, SystemMaterializer }
import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.Supervision.{ restartingDecider, resumingDecider,
stoppingDecider }
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
@@ -231,6 +231,31 @@ class FlowMapWithResourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
finally p.cancel()
}
+ "allow overriding the default dispatcher" in {
+ val p = Source
+ .single(1)
+ .mapWithResource(() => newBufferedReader())(
+ (reader, _) => Option(reader.readLine()),
+ reader => {
+ reader.close()
+ None
+ })
+ .withAttributes(
+ Attributes.name("mapWithResourceCustomDispatcher") and
+ ActorAttributes.dispatcher("pekko.actor.default-dispatcher")
+ )
+ .runWith(TestSink.probe)
+
+ SystemMaterializer(system).materializer
+ .asInstanceOf[PhasedFusingActorMaterializer]
+ .supervisor
+ .tell(StreamSupervisor.GetChildren, testActor)
+ val ref = expectMsgType[Children].children
+ .find(_.path.toString contains "mapWithResourceCustomDispatcher").get
+ try assertDispatcher(ref, "pekko.actor.default-dispatcher")
+ finally p.cancel()
+ }
+
"fail when create throws exception" in {
EventFilter[TE](occurrences = 1).intercept {
val p = Source
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 9c66727d51..ae8a1f8164 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1189,7 +1189,7 @@ trait FlowOps[+Out, +Mat] {
create,
(resource, out) => (resource, f(resource, out)),
resource => close(resource))
- .withAttributes(DefaultAttributes.mapWithResource))
+ ).withAttributes(DefaultAttributes.mapWithResource)
/**
* Transform each stream element with the help of an [[AutoCloseable]]
resource and close it when the stream finishes or fails.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]