This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch fix/jdk25-alsoto-async-race in repository https://gitbox.apache.org/repos/asf/pekko.git
commit e4b4d749695e00fd88d1984c33c562cb69110630 Author: He-Pin <[email protected]> AuthorDate: Fri May 29 16:51:51 2026 +0800 test: await async alsoTo/wireTap side sink in context specs Motivation: SourceWithContextSpec and FlowWithContextSpec fail on JDK 25 nightly with a real assertion error (not a timeout), e.g.: Vector(Message(A,1), Message(B,2), Message(D,3)) was not equal to Vector(Message(A,1), Message(B,2), Message(D,3), Message(C,4)) (SourceWithContextSpec.scala:94) *** FAILED *** (15 milliseconds) `alsoTo`/`alsoToContext`/`wireTap`/`wireTapContext` feed an asynchronous side Sink. The tests collected the side stream output into a buffer and asserted on it immediately after the main stream's `expectComplete()`. There is no happens-before between the main stream completing and the side Sink draining its last element, so the assertion can run before the side Sink has observed every element. The enclosing `within(10.seconds) { ... }` only bounds how long the block may take; it does not retry, so the buffer was read exactly once. On JDK 17 the side Sink usually drained first by luck; JDK 25's different scheduling surfaces the race as a hard failure in ~15ms. The shared `scala.collection.mutable.ListBuffer` was also written from the side-stream dispatcher thread and read from the test thread without synchronization, a second latent data race. Modification: - Collect side-stream output into a `java.util.concurrent. ConcurrentLinkedQueue` (thread-safe, preserves arrival order). - Replace the single `within(10.seconds) { assert }` with `awaitAssert(assert, 10.seconds)`, which polls until the async side Sink has caught up (or the deadline elapses). Result: The assertions now wait for the asynchronous side Sink to finish instead of sampling it once, removing both the timing race and the cross-thread buffer race. Both specs (24 tests) pass locally; the checks still fail fast if the side Sink genuinely drops data, since awaitAssert rethrows the last assertion error at the deadline. --- .../stream/scaladsl/FlowWithContextSpec.scala | 39 ++++++++++------------ .../stream/scaladsl/SourceWithContextSpec.scala | 39 ++++++++++------------ 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index da9e22f9b9..fd27c27238 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -13,8 +13,10 @@ package org.apache.pekko.stream.scaladsl -import scala.collection.mutable.ListBuffer +import java.util.concurrent.ConcurrentLinkedQueue + import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ import scala.util.control.NoStackTrace import org.apache.pekko @@ -80,14 +82,17 @@ class FlowWithContextSpec extends StreamSpec { } "pass through all data when using alsoTo" in { - val listBuffer = new ListBuffer[String]() + // alsoTo feeds an asynchronous side Sink, which may still be draining when the + // main stream completes. Poll until it has observed every element instead of + // asserting once (the single assertion raced under JDK 25 scheduling). + val received = new ConcurrentLinkedQueue[String]() Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) .asSourceWithContext(_.offset) .via( FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => (data.data.toLowerCase, offset) }) - .alsoTo(Sink.foreach(string => listBuffer.+=(string))) + .alsoTo(Sink.foreach(string => received.add(string))) ) .toMat(TestSink[(String, Long)]())(Keep.right) .run() @@ -97,20 +102,18 @@ class FlowWithContextSpec extends StreamSpec { .expectNext(("d", 3L)) .expectNext(("c", 4L)) .expectComplete() - .within(10.seconds) { - listBuffer should contain theSameElementsInOrderAs List("a", "b", "d", "c") - } + awaitAssert(received.asScala.toList should contain theSameElementsInOrderAs List("a", "b", "d", "c"), 10.seconds) } "pass through all data when using alsoToContext" in { - val listBuffer = new ListBuffer[Long]() + val received = new ConcurrentLinkedQueue[Long]() Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) .asSourceWithContext(_.offset) .via( FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => (data.data.toLowerCase, offset) }) - .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset))) + .alsoToContext(Sink.foreach(offset => received.add(offset))) ) .toMat(TestSink[(String, Long)]())(Keep.right) .run() @@ -120,19 +123,17 @@ class FlowWithContextSpec extends StreamSpec { .expectNext(("d", 3L)) .expectNext(("c", 4L)) .expectComplete() - .within(10.seconds) { - listBuffer should contain theSameElementsInOrderAs List(1L, 2L, 3L, 4L) - } + awaitAssert(received.asScala.toList should contain theSameElementsInOrderAs List(1L, 2L, 3L, 4L), 10.seconds) } "pass through all data when using wireTap" in { - val listBuffer = new ListBuffer[String]() + val received = new ConcurrentLinkedQueue[String]() Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) .asSourceWithContext(_.offset) .via( FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => (data.data.toLowerCase, offset) - }).wireTap(Sink.foreach(string => listBuffer.+=(string))) + }).wireTap(Sink.foreach(string => received.add(string))) ) .toMat(TestSink[(String, Long)]())(Keep.right) .run() @@ -142,19 +143,17 @@ class FlowWithContextSpec extends StreamSpec { .expectNext(("d", 3L)) .expectNext(("c", 4L)) .expectComplete() - .within(10.seconds) { - listBuffer should contain atLeastOneElementOf List("a", "b", "d", "c") - } + awaitAssert(received.asScala.toList should contain atLeastOneElementOf List("a", "b", "d", "c"), 10.seconds) } "pass through all data when using wireTapContext" in { - val listBuffer = new ListBuffer[Long]() + val received = new ConcurrentLinkedQueue[Long]() Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) .asSourceWithContext(_.offset) .via( FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => (data.data.toLowerCase, offset) - }).wireTapContext(Sink.foreach(offset => listBuffer.+=(offset))) + }).wireTapContext(Sink.foreach(offset => received.add(offset))) ) .toMat(TestSink[(String, Long)]())(Keep.right) .run() @@ -164,9 +163,7 @@ class FlowWithContextSpec extends StreamSpec { .expectNext(("d", 3L)) .expectNext(("c", 4L)) .expectComplete() - .within(10.seconds) { - listBuffer should contain atLeastOneElementOf List(1L, 2L, 3L, 4L) - } + awaitAssert(received.asScala.toList should contain atLeastOneElementOf List(1L, 2L, 3L, 4L), 10.seconds) } "keep the same order for data and context when using unsafeDataVia" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index ff578d4629..02b9c9e535 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -13,8 +13,10 @@ package org.apache.pekko.stream.scaladsl -import scala.collection.mutable.ListBuffer +import java.util.concurrent.ConcurrentLinkedQueue + import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ import scala.util.control.NoStackTrace import org.apache.pekko @@ -77,11 +79,14 @@ class SourceWithContextSpec extends StreamSpec { } "pass through all data when using alsoTo" in { - val listBuffer = new ListBuffer[Message]() + // alsoTo feeds an asynchronous side Sink, which may still be draining when the + // main stream completes. Poll until it has observed every element instead of + // asserting once (the single assertion raced under JDK 25 scheduling). + val received = new ConcurrentLinkedQueue[Message]() val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) Source(messages) .asSourceWithContext(_.offset) - .alsoTo(Sink.foreach(message => listBuffer.+=(message))) + .alsoTo(Sink.foreach(message => received.add(message))) .toMat(TestSink[(Message, Long)]())(Keep.right) .run() .request(4) @@ -90,17 +95,15 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Message("D", 3L), 3L)) .expectNext((Message("C", 4L), 4L)) .expectComplete() - .within(10.seconds) { - listBuffer.toVector shouldBe messages - } + awaitAssert(received.asScala.toVector shouldBe messages, 10.seconds) } "pass through all data when using alsoToContext" in { - val listBuffer = new ListBuffer[Long]() + val received = new ConcurrentLinkedQueue[Long]() val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) Source(messages) .asSourceWithContext(_.offset) - .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset))) + .alsoToContext(Sink.foreach(offset => received.add(offset))) .toMat(TestSink[(Message, Long)]())(Keep.right) .run() .request(4) @@ -109,17 +112,15 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Message("D", 3L), 3L)) .expectNext((Message("C", 4L), 4L)) .expectComplete() - .within(10.seconds) { - listBuffer.toVector shouldBe messages.map(_.offset) - } + awaitAssert(received.asScala.toVector shouldBe messages.map(_.offset), 10.seconds) } "pass through all data when using wireTap" in { - val listBuffer = new ListBuffer[Message]() + val received = new ConcurrentLinkedQueue[Message]() val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) Source(messages) .asSourceWithContext(_.offset) - .wireTap(Sink.foreach(message => listBuffer.+=(message))) + .wireTap(Sink.foreach(message => received.add(message))) .toMat(TestSink[(Message, Long)]())(Keep.right) .run() .request(4) @@ -128,17 +129,15 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Message("D", 3L), 3L)) .expectNext((Message("C", 4L), 4L)) .expectComplete() - .within(10.seconds) { - listBuffer.toVector should contain atLeastOneElementOf messages - } + awaitAssert(received.asScala.toVector should contain atLeastOneElementOf messages, 10.seconds) } "pass through all data when using wireTapContext" in { - val listBuffer = new ListBuffer[Long]() + val received = new ConcurrentLinkedQueue[Long]() val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) Source(messages) .asSourceWithContext(_.offset) - .wireTapContext(Sink.foreach(offset => listBuffer.+=(offset))) + .wireTapContext(Sink.foreach(offset => received.add(offset))) .toMat(TestSink[(Message, Long)]())(Keep.right) .run() .request(4) @@ -147,9 +146,7 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Message("D", 3L), 3L)) .expectNext((Message("C", 4L), 4L)) .expectComplete() - .within(10.seconds) { - listBuffer.toVector should contain atLeastOneElementOf (messages.map(_.offset)) - } + awaitAssert(received.asScala.toVector should contain atLeastOneElementOf (messages.map(_.offset)), 10.seconds) } "pass through contexts via a FlowWithContext" in { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
