This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 5addd96f70 Fix stream IO dispatcher test failures (#2806)
5addd96f70 is described below
commit 5addd96f70c22565af4db28f3c6909a3c67d8890
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 19:38:49 2026 +0800
Fix stream IO dispatcher test failures (#2806)
Avoid creating custom materializers in tests that verify dispatcher
assignment. Custom materializers are not covered by StreamSpec's
afterEach cleanup, causing test pollution and failures.
Changes:
- Remove custom ActorMaterializer creation in FileSinkSpec,
FileSourceSpec, and UnfoldResourceAsyncSourceSpec
- Use SystemMaterializer(system).materializer to access the system
materializer supervisor for dispatcher assertions
- Remove separate ActorSystem creation in FileSourceSpec dispatcher
tests (no longer needed with system materializer)
- Clean up StreamSpec debug dump: replace non-functional stream
supervisor query with printDebugDump helper, add stopAllChildren
on failure for cleanup
Upstream: akka/akka-core@145319d86d
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
---
.../apache/pekko/stream/testkit/StreamSpec.scala | 42 ++++++----------
.../org/apache/pekko/stream/io/FileSinkSpec.scala | 7 ++-
.../apache/pekko/stream/io/FileSourceSpec.scala | 56 +++++++++-------------
.../scaladsl/UnfoldResourceAsyncSourceSpec.scala | 7 ++-
4 files changed, 43 insertions(+), 69 deletions(-)
diff --git
a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
index 6032e57c8f..43c458f52d 100644
---
a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
+++
b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamSpec.scala
@@ -15,17 +15,14 @@ package org.apache.pekko.stream.testkit
import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.concurrent.duration._
+import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
-import pekko.actor.{ ActorRef, ActorSystem }
+import pekko.actor.ActorSystem
import pekko.stream.Materializer
import pekko.stream.impl.PhasedFusingActorMaterializer
-import pekko.stream.impl.StreamSupervisor
-import pekko.stream.snapshot.{ MaterializerState, StreamSnapshotImpl }
-import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren,
stopAllChildren }
-import pekko.testkit.{ PekkoSpec, TestProbe }
+import pekko.stream.testkit.scaladsl.StreamTestKit.{ assertNoChildren,
printDebugDump, stopAllChildren }
+import pekko.testkit.PekkoSpec
import pekko.testkit.TestKitUtils
import org.scalatest.Failed
@@ -49,31 +46,22 @@ abstract class StreamSpec(_system: ActorSystem) extends
PekkoSpec(_system) with
override def withFixture(test: NoArgTest) = {
super.withFixture(test) match {
case failed: Failed =>
- implicit val ec = system.dispatcher
- val probe = TestProbe()(system)
- // FIXME I don't think it always runs under /user anymore (typed)
- // FIXME correction - I'm not sure this works at _all_ - supposed to
dump stream state if test fails
- val streamSupervisors = system.actorSelection("/user/" +
StreamSupervisor.baseName + "*")
- streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref)
- val children: Seq[ActorRef] = probe
- .receiveWhile(2.seconds) {
- case StreamSupervisor.Children(children) => children
- }
- .flatten
- println("--- Stream actors debug dump ---")
- if (children.isEmpty) println("Stream is completed. No debug
information is available")
- else {
- println("Stream actors alive: " + children)
- Future
- .sequence(children.map(MaterializerState.requestFromChild))
- .foreach(snapshots =>
- snapshots.foreach(s =>
-
pekko.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl])))
+ Materializer(_system) match {
+ case impl: PhasedFusingActorMaterializer =>
+ implicit val ec = impl.system.dispatcher
+ println("--- Stream actors debug dump (only works for tests using
system materializer) ---")
+ printDebugDump(impl.supervisor)
+ println("--- Stream actors debug dump end ---")
+ stopAllChildren(impl.system, impl.supervisor)
+ case _ =>
}
failed
case other =>
Materializer(_system) match {
case impl: PhasedFusingActorMaterializer =>
+ // Note that this is different from assertAllStages stopped since
it tries to
+ // *kill* all streams first, before checking if any is stuck. It
also does not
+ // work for tests starting their own materializers.
stopAllChildren(impl.system, impl.supervisor)
val result = test.apply()
assertNoChildren(impl.system, impl.supervisor,
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
index 8dfd06209c..30e208af68 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSinkSpec.scala
@@ -32,14 +32,13 @@ import pekko.stream.scaladsl.{ FileIO, Keep, Source }
import pekko.stream.testkit._
import pekko.stream.testkit.Utils._
import pekko.util.ByteString
+import pekko.stream.SystemMaterializer
import org.scalatest.concurrent.ScalaFutures
@nowarn
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with
ScalaFutures {
- val settings =
ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
- implicit val materializer: Materializer = ActorMaterializer(settings)
val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix())
val TestLines = {
@@ -171,7 +170,7 @@ class FileSinkSpec extends
StreamSpec(UnboundedMailboxConfig) with ScalaFutures
targetFile { f =>
val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run()
try {
- materializer
+ SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
@@ -195,7 +194,7 @@ class FileSinkSpec extends
StreamSpec(UnboundedMailboxConfig) with ScalaFutures
Keep.left)
.run()
try {
- materializer
+ SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
index 08724f9b0a..8f6a417568 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration._
import com.google.common.jimfs.{ Configuration, Jimfs }
import org.apache.pekko
-import pekko.actor.ActorSystem
import pekko.stream._
import pekko.stream.IOResult._
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
@@ -35,6 +34,7 @@ import pekko.stream.testkit._
import pekko.stream.testkit.Utils._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.util.ByteString
+import pekko.stream.SystemMaterializer
object FileSourceSpec {
final case class Settings(chunkSize: Int, readAhead: Int)
@@ -43,9 +43,6 @@ object FileSourceSpec {
@nowarn
class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
- val settings =
ActorMaterializerSettings(system).withDispatcher("pekko.actor.default-dispatcher")
- implicit val materializer: Materializer = ActorMaterializer(settings)
-
val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix())
val TestText = {
@@ -261,39 +258,30 @@ class FileSourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
}
"use dedicated blocking-io-dispatcher by default" in {
- val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
- val materializer = ActorMaterializer()(sys)
- try {
- val p = FileIO.fromPath(manyLines).runWith(TestSink())(materializer)
-
- materializer
- .asInstanceOf[PhasedFusingActorMaterializer]
- .supervisor
- .tell(StreamSupervisor.GetChildren, testActor)
- val ref = expectMsgType[Children].children.find(_.path.toString
contains "fileSource").get
- try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
- finally p.cancel()
- } finally shutdown(sys)
+ val p = FileIO.fromPath(manyLines).runWith(TestSink())
+
+ SystemMaterializer(system).materializer
+ .asInstanceOf[PhasedFusingActorMaterializer]
+ .supervisor
+ .tell(StreamSupervisor.GetChildren, testActor)
+ val ref = expectMsgType[Children].children.find(_.path.toString contains
"fileSource").get
+ try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
+ finally p.cancel()
}
"allow overriding the dispatcher using Attributes" in {
- val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
- val materializer = ActorMaterializer()(sys)
-
- try {
- val p = FileIO
- .fromPath(manyLines)
-
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
- .runWith(TestSink())(materializer)
-
- materializer
- .asInstanceOf[PhasedFusingActorMaterializer]
- .supervisor
- .tell(StreamSupervisor.GetChildren, testActor)
- val ref = expectMsgType[Children].children.find(_.path.toString
contains "fileSource").get
- try assertDispatcher(ref, "pekko.actor.default-dispatcher")
- finally p.cancel()
- } finally shutdown(sys)
+ val p = FileIO
+ .fromPath(manyLines)
+
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
+ .runWith(TestSink())
+
+ SystemMaterializer(system).materializer
+ .asInstanceOf[PhasedFusingActorMaterializer]
+ .supervisor
+ .tell(StreamSupervisor.GetChildren, testActor)
+ val ref = expectMsgType[Children].children.find(_.path.toString contains
"fileSource").get
+ try assertDispatcher(ref, "pekko.actor.default-dispatcher")
+ finally p.cancel()
}
"not signal onComplete more than once" in {
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
index ac261e039c..07a42be8d8 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala
@@ -26,6 +26,7 @@ import pekko.Done
import pekko.stream.ActorAttributes
import pekko.stream.Materializer
import pekko.stream.Supervision
+import pekko.stream.SystemMaterializer
import pekko.stream.impl.PhasedFusingActorMaterializer
import pekko.stream.impl.StreamSupervisor
import pekko.stream.impl.StreamSupervisor.Children
@@ -325,9 +326,6 @@ class UnfoldResourceAsyncSourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
}
"use dedicated blocking-io-dispatcher by default" in {
- // use a separate materializer to ensure we know what child is our stream
- implicit val materializer = Materializer(system)
-
Source
.unfoldResourceAsync[String, Unit](
() => Promise[Unit]().future, // never complete
@@ -335,7 +333,8 @@ class UnfoldResourceAsyncSourceSpec extends
StreamSpec(UnboundedMailboxConfig) {
_ => ???)
.runWith(Sink.ignore)
-
materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren,
testActor)
+
SystemMaterializer(system).materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(
+ StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains
"unfoldResourceSourceAsync").get
assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]