This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 722b195e4d Fix Java unchecked warnings in stream testkit fluent API
(#2625) (#2725)
722b195e4d is described below
commit 722b195e4d629936a21a79c9b088a593edcab8b2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Mar 15 22:40:26 2026 +0800
Fix Java unchecked warnings in stream testkit fluent API (#2625) (#2725)
Use Scala's `this.type` return type instead of abstract type member `Self`
to fix raw type erasure in bytecode signatures. This is a root-cause fix
that eliminates unchecked warnings in Java without any boilerplate
overrides.
Root cause: `type Self <: ManualProbe[I]` erased to raw `ManualProbe` in
bytecode (no generic Signature attribute), causing Java to see raw types.
`this.type` makes the Scala compiler emit `ManualProbe<TI;>` (parameterized)
in the bytecode Signature attribute, so Java sees proper generic types.
Changes:
- Remove `type Self` abstract type members from ManualProbe/Probe classes
- Change all fluent method return types from `Self` to `this.type`
- Replace `self` references with `this`
- Fix ScalaDoc links: TestSubscriber → TestSubscriber.Probe, TestPublisher
→ TestPublisher.Probe
- Fix typo: "JAVA PAI" → "JAVA API"
Binary compatible: erased JVM return types unchanged (MiMa safe).
Only generic Signature attribute metadata changes.
---
.../fix-java-unchecked-warnings.excludes | 46 +++++++
.../pekko/stream/testkit/StreamTestKit.scala | 144 ++++++++++-----------
.../pekko/stream/testkit/javadsl/TestSink.scala | 2 +-
.../pekko/stream/testkit/javadsl/TestSource.scala | 2 +-
4 files changed, 119 insertions(+), 75 deletions(-)
diff --git
a/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
b/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
new file mode 100644
index 0000000000..8d97d5bb65
--- /dev/null
+++
b/stream-testkit/src/main/mima-filters/2.0.x.backwards.excludes/fix-java-unchecked-warnings.excludes
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Fix Java unchecked warnings by using this.type return types (#2625)
+# Changed fluent API methods from abstract type member Self to this.type,
+# which adds parameterized generic Signature attributes to bytecode.
+# Only the generic Signature metadata changes — erased JVM types are unchanged.
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#ManualProbe.expectNoMessage")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#ManualProbe.expectRequest")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectError")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectEvent")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextChainingPF")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextN")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextOrComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextUnordered")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNextUnorderedN")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectNoMessage")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectSubscriptionAndComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#ManualProbe.expectSubscriptionAndError")
+# Probe subclass methods — Scala 3 cross-build also changes these signatures
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.unsafeSendNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendComplete")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.sendError")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.expectCancellation")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestPublisher#Probe.expectCancellationWithCause")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.ensureSubscription")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.request")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.requestNext")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.testkit.TestSubscriber#Probe.cancel")
diff --git
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
index 36abb43fc8..ebc90af57d 100644
---
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
+++
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala
@@ -117,8 +117,6 @@ object TestPublisher {
class ManualProbe[I] private[TestPublisher] (autoOnSubscribe: Boolean =
true)(implicit system: ActorSystem)
extends Publisher[I] {
- type Self <: ManualProbe[I]
-
private val probe: TestProbe = TestProbe()
// this is a way to pause receiving message from probe until subscription
is done
@@ -130,7 +128,6 @@ object TestPublisher {
this
}
})
- private val self = this.asInstanceOf[Self]
/**
* Subscribes a given [[org.reactivestreams.Subscriber]] to this probe
publisher.
@@ -168,26 +165,32 @@ object TestPublisher {
/**
* Expect demand from a given subscription.
*/
- def expectRequest(subscription: Subscription, n: Int): Self =
executeAfterSubscription {
- probe.expectMsg(RequestMore(subscription, n))
- self
+ def expectRequest(subscription: Subscription, n: Int): this.type = {
+ executeAfterSubscription {
+ probe.expectMsg(RequestMore(subscription, n))
+ }
+ this
}
/**
* Expect no messages.
* Waits for the default period configured as
`pekko.actor.testkit.expect-no-message-default`.
*/
- def expectNoMessage(): Self = executeAfterSubscription {
- probe.expectNoMessage()
- self
+ def expectNoMessage(): this.type = {
+ executeAfterSubscription {
+ probe.expectNoMessage()
+ }
+ this
}
/**
* Expect no messages for a given duration.
*/
- def expectNoMessage(max: FiniteDuration): Self = executeAfterSubscription {
- probe.expectNoMessage(max)
- self
+ def expectNoMessage(max: FiniteDuration): this.type = {
+ executeAfterSubscription {
+ probe.expectNoMessage(max)
+ }
+ this
}
/**
@@ -196,7 +199,7 @@ object TestPublisher {
* Expect no messages for a given duration.
* @since 1.1.0
*/
- def expectNoMessage(max: java.time.Duration): Self =
expectNoMessage(max.toScala)
+ def expectNoMessage(max: java.time.Duration): this.type =
expectNoMessage(max.toScala)
/**
* Receive messages for a given duration or until one does not match a
given partial function.
@@ -307,8 +310,6 @@ object TestPublisher {
class Probe[T] private[TestPublisher] (initialPendingRequests:
Long)(implicit system: ActorSystem)
extends ManualProbe[T] {
- type Self = Probe[T]
-
private var pendingRequests = initialPendingRequests
private lazy val subscription = expectSubscription()
@@ -320,24 +321,24 @@ object TestPublisher {
*/
def pending: Long = pendingRequests
- def sendNext(elem: T): Self = {
+ def sendNext(elem: T): this.type = {
if (pendingRequests == 0) pendingRequests = subscription.expectRequest()
pendingRequests -= 1
subscription.sendNext(elem)
this
}
- def unsafeSendNext(elem: T): Self = {
+ def unsafeSendNext(elem: T): this.type = {
subscription.sendNext(elem)
this
}
- def sendComplete(): Self = {
+ def sendComplete(): this.type = {
subscription.sendComplete()
this
}
- def sendError(cause: Throwable): Self = {
+ def sendError(cause: Throwable): this.type = {
subscription.sendError(cause)
this
}
@@ -348,12 +349,12 @@ object TestPublisher {
requests
}
- def expectCancellation(): Self = {
+ def expectCancellation(): this.type = {
subscription.expectCancellation()
this
}
- def expectCancellationWithCause(expectedCause: Throwable): Self = {
+ def expectCancellationWithCause(expectedCause: Throwable): this.type = {
val cause = subscription.expectCancellation()
assert(cause == expectedCause, s"Expected cancellation cause to be
$expectedCause but was $cause")
this
@@ -371,6 +372,7 @@ object TestPublisher {
*/
def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E =
expectCancellationWithCause()(ClassTag(causeClass))
+
}
}
@@ -419,14 +421,10 @@ object TestSubscriber {
class ManualProbe[I] private[TestSubscriber] ()(implicit system:
ActorSystem) extends Subscriber[I] {
import pekko.testkit._
- type Self <: ManualProbe[I]
-
private val probe = TestProbe()
@volatile private var _subscription: Subscription = _
- private val self = this.asInstanceOf[Self]
-
/**
* Expect and return a [[org.reactivestreams.Subscription]].
*/
@@ -460,9 +458,9 @@ object TestSubscriber {
*
* Expect [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError`
or `OnComplete`).
*/
- def expectEvent(event: SubscriberEvent): Self = {
+ def expectEvent(event: SubscriberEvent): this.type = {
probe.expectMsg(event)
- self
+ this
}
/**
@@ -497,9 +495,9 @@ object TestSubscriber {
*
* Expect a stream element.
*/
- def expectNext(element: I): Self = {
+ def expectNext(element: I): this.type = {
probe.expectMsg(OnNext(element))
- self
+ this
}
/**
@@ -507,20 +505,20 @@ object TestSubscriber {
*
* Expect a stream element during specified time or timeout.
*/
- def expectNext(d: FiniteDuration, element: I): Self = {
+ def expectNext(d: FiniteDuration, element: I): this.type = {
probe.expectMsg(d, OnNext(element))
- self
+ this
}
/**
- * JAVA PAI
+ * JAVA API
*
* Fluent DSL
*
* Expect a stream element during specified time or timeout.
* @since 1.1.0
*/
- def expectNext(d: java.time.Duration, element: I): Self =
expectNext(d.toScala, element)
+ def expectNext(d: java.time.Duration, element: I): this.type =
expectNext(d.toScala, element)
/**
* Fluent DSL
@@ -528,7 +526,7 @@ object TestSubscriber {
* Expect multiple stream elements.
*/
@annotation.varargs
- def expectNext(e1: I, e2: I, es: I*): Self =
+ def expectNext(e1: I, e2: I, es: I*): this.type =
expectNextN((e1 +: e2 +:
es).iterator.map(identity).to(immutable.IndexedSeq))
/**
@@ -537,7 +535,7 @@ object TestSubscriber {
* Expect multiple stream elements in arbitrary order.
*/
@annotation.varargs
- def expectNextUnordered(e1: I, e2: I, es: I*): Self =
+ def expectNextUnordered(e1: I, e2: I, es: I*): this.type =
expectNextUnorderedN((e1 +: e2 +:
es).iterator.map(identity).to(immutable.IndexedSeq))
/**
@@ -559,9 +557,9 @@ object TestSubscriber {
* Fluent DSL
* Expect the given elements to be signalled in order.
*/
- def expectNextN(all: immutable.Seq[I]): Self = {
+ def expectNextN(all: immutable.Seq[I]): this.type = {
all.foreach(e => probe.expectMsg(OnNext(e)))
- self
+ this
}
/**
@@ -569,16 +567,16 @@ object TestSubscriber {
* Expect the given elements to be signalled in order.
* @since 1.1.0
*/
- def expectNextN(elems: java.util.List[I]): Self = {
+ def expectNextN(elems: java.util.List[I]): this.type = {
elems.forEach(e => probe.expectMsg(OnNext(e)))
- self
+ this
}
/**
* Fluent DSL
* Expect the given elements to be signalled in any order.
*/
- def expectNextUnorderedN(all: immutable.Seq[I]): Self = {
+ def expectNextUnorderedN(all: immutable.Seq[I]): this.type = {
@annotation.tailrec
def expectOneOf(all: immutable.Seq[I]): Unit = all match {
case Nil =>
@@ -589,7 +587,7 @@ object TestSubscriber {
}
expectOneOf(all)
- self
+ this
}
/**
@@ -599,16 +597,16 @@ object TestSubscriber {
* Expect the given elements to be signalled in any order.
* @since 1.1.0
*/
- def expectNextUnorderedN(all: java.util.List[I]): Self =
expectNextUnorderedN(Util.immutableSeq(all))
+ def expectNextUnorderedN(all: java.util.List[I]): this.type =
expectNextUnorderedN(Util.immutableSeq(all))
/**
* Fluent DSL
*
* Expect completion.
*/
- def expectComplete(): Self = {
+ def expectComplete(): this.type = {
probe.expectMsg(OnComplete)
- self
+ this
}
/**
@@ -621,9 +619,9 @@ object TestSubscriber {
*
* Expect given [[java.lang.Throwable]].
*/
- def expectError(cause: Throwable): Self = {
+ def expectError(cause: Throwable): this.type = {
probe.expectMsg(OnError(cause))
- self
+ this
}
/**
@@ -660,7 +658,7 @@ object TestSubscriber {
*
* See also
[[#expectSubscriptionAndError(cause:Throwable,signalDemand:Boolean)*
#expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean)]] if no
demand should be signalled.
*/
- def expectSubscriptionAndError(cause: Throwable): Self =
+ def expectSubscriptionAndError(cause: Throwable): this.type =
expectSubscriptionAndError(cause, signalDemand = true)
/**
@@ -671,11 +669,11 @@ object TestSubscriber {
*
* See also [[#expectSubscriptionAndError(cause:Throwable)*
#expectSubscriptionAndError(cause: Throwable)]].
*/
- def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean):
Self = {
+ def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean):
this.type = {
val sub = expectSubscription()
if (signalDemand) sub.request(1)
expectError(cause)
- self
+ this
}
/**
@@ -686,7 +684,7 @@ object TestSubscriber {
*
* See also [[#expectSubscriptionAndComplete(signalDemand:Boolean)*
#expectSubscriptionAndComplete(signalDemand: Boolean)]] if no demand should be
signalled.
*/
- def expectSubscriptionAndComplete(): Self =
+ def expectSubscriptionAndComplete(): this.type =
expectSubscriptionAndComplete(true)
/**
@@ -699,11 +697,11 @@ object TestSubscriber {
*
* See also [[#expectSubscriptionAndComplete()*
#expectSubscriptionAndComplete]].
*/
- def expectSubscriptionAndComplete(signalDemand: Boolean): Self = {
+ def expectSubscriptionAndComplete(signalDemand: Boolean): this.type = {
val sub = expectSubscription()
if (signalDemand) sub.request(1)
expectComplete()
- self
+ this
}
/**
@@ -756,12 +754,12 @@ object TestSubscriber {
*
* Expect given next element or stream completion.
*/
- def expectNextOrComplete(element: I): Self = {
+ def expectNextOrComplete(element: I): this.type = {
probe.fishForMessage(hint = s"OnNext($element) or OnComplete") {
case OnNext(`element`) => true
case OnComplete => true
}
- self
+ this
}
/**
@@ -769,9 +767,9 @@ object TestSubscriber {
*
* Assert that no message is received for the specified time.
*/
- def expectNoMessage(remaining: FiniteDuration): Self = {
+ def expectNoMessage(remaining: FiniteDuration): this.type = {
probe.expectNoMessage(remaining)
- self
+ this
}
/**
@@ -781,17 +779,17 @@ object TestSubscriber {
* Waits for the default period configured as
`pekko.test.expect-no-message-default`.
* That timeout is scaled using the configuration entry
"pekko.test.timefactor".
*/
- def expectNoMessage(): Self = {
+ def expectNoMessage(): this.type = {
probe.expectNoMessage()
- self
+ this
}
/**
* Java API: Assert that no message is received for the specified time.
*/
- def expectNoMessage(remaining: java.time.Duration): Self = {
+ def expectNoMessage(remaining: java.time.Duration): this.type = {
probe.expectNoMessage(remaining.toScala)
- self
+ this
}
/**
@@ -830,8 +828,10 @@ object TestSubscriber {
*
* @param max wait no more than max time, otherwise throw AssertionError
*/
- def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]):
Self =
- expectNextWithTimeoutPF(max, f.andThen(_ => self))
+ def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]):
this.type = {
+ expectNextWithTimeoutPF(max, f)
+ this
+ }
/**
* JAVA API
@@ -843,7 +843,7 @@ object TestSubscriber {
* @param max wait no more than max time, otherwise throw AssertionError
* @since 1.1.0
*/
- def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any,
Any]): Self =
+ def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any,
Any]): this.type =
expectNextChainingPF(max.toScala, f)
/**
@@ -851,7 +851,7 @@ object TestSubscriber {
*
* Allows chaining probe methods.
*/
- def expectNextChainingPF(f: PartialFunction[Any, Any]): Self =
+ def expectNextChainingPF(f: PartialFunction[Any, Any]): this.type =
expectNextChainingPF(Duration.Undefined, f)
def expectEventWithTimeoutPF[T](max: Duration, f:
PartialFunction[SubscriberEvent, T]): T =
@@ -919,7 +919,7 @@ object TestSubscriber {
val b = immutable.Seq.newBuilder[I]
@tailrec def drain(): immutable.Seq[I] =
- self.expectEvent(deadline.timeLeft) match {
+ this.expectEvent(deadline.timeLeft) match {
case OnError(ex) =>
throw new AssertionError(
s"toStrict received OnError while draining stream! Accumulated
elements: ${b.result()}",
@@ -933,7 +933,7 @@ object TestSubscriber {
}
// if no subscription was obtained yet, we expect it
- if (_subscription eq null) self.expectSubscription()
+ if (_subscription eq null) this.expectSubscription()
_subscription.request(Long.MaxValue)
drain()
@@ -1027,17 +1027,15 @@ object TestSubscriber {
*/
class Probe[T] private[TestSubscriber] ()(implicit system: ActorSystem)
extends ManualProbe[T] {
- override type Self = Probe[T]
-
private lazy val subscription = expectSubscription()
/** Asserts that a subscription has been received or will be received */
- def ensureSubscription(): Self = {
+ def ensureSubscription(): this.type = {
subscription // initializes lazy val
this
}
- def request(n: Long): Self = {
+ def request(n: Long): this.type = {
subscription.request(n)
this
}
@@ -1045,18 +1043,18 @@ object TestSubscriber {
/**
* Request and expect a stream element.
*/
- def requestNext(element: T): Self = {
+ def requestNext(element: T): this.type = {
subscription.request(1)
expectNext(element)
this
}
- def cancel(): Self = {
+ def cancel(): this.type = {
subscription.cancel()
this
}
- def cancel(cause: Throwable): Self = subscription match {
+ def cancel(cause: Throwable): this.type = subscription match {
case s: SubscriptionWithCancelException =>
s.cancel(cause)
this
diff --git
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
index d6ab2db74c..9fa14e868c 100644
---
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
+++
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSink.scala
@@ -22,7 +22,7 @@ import pekko.stream.testkit._
object TestSink {
/**
- * A Sink that materialized to a [[pekko.stream.testkit.TestSubscriber]].
+ * A Sink that materialized to a
[[pekko.stream.testkit.TestSubscriber.Probe]].
*/
def create[T](system: ClassicActorSystemProvider): Sink[T,
TestSubscriber.Probe[T]] =
new Sink(scaladsl.TestSink[T]()(system))
diff --git
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
index 968c3f3e49..74fed49578 100644
---
a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
+++
b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/javadsl/TestSource.scala
@@ -22,7 +22,7 @@ import pekko.stream.testkit._
object TestSource {
/**
- * A Source that materializes to a [[pekko.stream.testkit.TestPublisher]].
+ * A Source that materializes to a
[[pekko.stream.testkit.TestPublisher.Probe]].
*/
def create[T](system: ClassicActorSystemProvider): Source[T,
TestPublisher.Probe[T]] =
new Source(scaladsl.TestSource[T]()(system))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]