This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new fcc541a71f test: clean up TLS edge streams after early cancellation
(#3000)
fcc541a71f is described below
commit fcc541a71f90338b9c2f17df0b691913f0955831
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 28 19:15:57 2026 +0800
test: clean up TLS edge streams after early cancellation (#3000)
Motivation:
JDK 25 nightly builds time out in repeated TlsGraphStageEdgeCasesSpec
early-cancellation scenarios because earlier materializations can keep draining
after the expected bytes have been collected.
Modification:
Materialize collectExactly with a KillSwitch and watch stream termination,
then shut down and await the stream in finally after the expected bytes are
collected.
Result:
Repeated TLS edge-case checks do not leave prior materializations running
in the same actor system.
Tests:
- JDK 25 nightly-style virtualized stream-dispatcher flags: stream-tests /
Test / testOnly org.apache.pekko.stream.io.TlsGraphStageEdgeCasesSpec
- scalafmt --mode diff-ref=origin/main --quiet
- scalafmt --list --mode diff-ref=origin/main
- git diff --check
References:
Refs #2994
---
.../stream/io/TlsGraphStageEdgeCasesSpec.scala | 26 +++++++++++++++-------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
index 2e9e3216bb..718867a9b8 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
@@ -68,14 +68,24 @@ class TlsGraphStageEdgeCasesSpec extends
StreamSpec(TlsGraphStageEdgeCasesSpec.c
private def collectExactly(
stream: Source[SslTlsInbound, NotUsed],
expectedBytes: Int,
- timeout: FiniteDuration = 30.seconds): ByteString =
- Await.result(
- stream
- .collect { case SessionBytes(_, b) => b }
- .scan(ByteString.empty)(_ ++ _)
- .dropWhile(_.size < expectedBytes)
- .runWith(Sink.headOption),
- timeout.dilated).getOrElse(ByteString.empty)
+ timeout: FiniteDuration = 30.seconds): ByteString = {
+ val ((killSwitch, streamDone), result) = stream
+ .viaMat(KillSwitches.single)(Keep.right)
+ .watchTermination(Keep.both)
+ .toMat(
+ Flow[SslTlsInbound]
+ .collect { case SessionBytes(_, b) => b }
+ .scan(ByteString.empty)(_ ++ _)
+ .dropWhile(_.size < expectedBytes)
+ .toMat(Sink.headOption)(Keep.right))(Keep.both)
+ .run()
+
+ try Await.result(result, timeout.dilated).getOrElse(ByteString.empty)
+ finally {
+ killSwitch.shutdown()
+ Await.result(streamDone, timeout.dilated)
+ }
+ }
"TlsGraphStage" must {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]