This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new de76c11ad0 Allow overriding dispatcher in mapWithResource (#1949) 
(#2102)
de76c11ad0 is described below

commit de76c11ad02e3b5af051b19adfa042ec44306373
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Aug 27 16:21:27 2025 +0100

    Allow overriding dispatcher in mapWithResource (#1949) (#2102)
    
    * Allow overriding the dispatcher in mapWithResource
    
    Closes #1948
    
    Co-authored-by: Piotr SowiƄski <[email protected]>
---
 .../stream/scaladsl/FlowMapWithResourceSpec.scala  | 27 +++++++++++++++++++++-
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  2 +-
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
index 56db0ed625..894e372a8a 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -33,7 +33,7 @@ import com.google.common.jimfs.{ Configuration, Jimfs }
 
 import org.apache.pekko
 import pekko.Done
-import pekko.stream.{ AbruptTerminationException, ActorAttributes, 
ActorMaterializer, SystemMaterializer }
+import pekko.stream.{ AbruptTerminationException, ActorAttributes, 
ActorMaterializer, Attributes, SystemMaterializer }
 import pekko.stream.ActorAttributes.supervisionStrategy
 import pekko.stream.Supervision.{ restartingDecider, resumingDecider, 
stoppingDecider }
 import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
@@ -231,6 +231,31 @@ class FlowMapWithResourceSpec extends 
StreamSpec(UnboundedMailboxConfig) {
       finally p.cancel()
     }
 
+    "allow overriding the default dispatcher" in {
+      val p = Source
+        .single(1)
+        .mapWithResource(() => newBufferedReader())(
+          (reader, _) => Option(reader.readLine()),
+          reader => {
+            reader.close()
+            None
+          })
+        .withAttributes(
+          Attributes.name("mapWithResourceCustomDispatcher") and
+          ActorAttributes.dispatcher("pekko.actor.default-dispatcher")
+        )
+        .runWith(TestSink.probe)
+
+      SystemMaterializer(system).materializer
+        .asInstanceOf[PhasedFusingActorMaterializer]
+        .supervisor
+        .tell(StreamSupervisor.GetChildren, testActor)
+      val ref = expectMsgType[Children].children
+        .find(_.path.toString contains "mapWithResourceCustomDispatcher").get
+      try assertDispatcher(ref, "pekko.actor.default-dispatcher")
+      finally p.cancel()
+    }
+
     "fail when create throws exception" in {
       EventFilter[TE](occurrences = 1).intercept {
         val p = Source
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 9c66727d51..ae8a1f8164 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1189,7 +1189,7 @@ trait FlowOps[+Out, +Mat] {
         create,
         (resource, out) => (resource, f(resource, out)),
         resource => close(resource))
-        .withAttributes(DefaultAttributes.mapWithResource))
+    ).withAttributes(DefaultAttributes.mapWithResource)
 
   /**
    * Transform each stream element with the help of an [[AutoCloseable]] 
resource and close it when the stream finishes or fails.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to