This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 50a2d6e4d5 Harden some specs (#2737)
50a2d6e4d5 is described below
commit 50a2d6e4d53f1e9242516d26b8876cdf3877efe5
Author: Philippus Baalman <[email protected]>
AuthorDate: Mon Mar 16 13:57:22 2026 +0100
Harden some specs (#2737)
* Harden RemoteFailureSpec
* Harden RememberEntitiesAndStartEntitySpec
* Remove duplicate "producer"
* Harden ReliableDeliveryShardingSpec
* Increase timeout in OutboundIdleShutdownSpec
---
.../actor/typed/delivery/internal/ConsumerControllerImpl.scala | 2 +-
.../sharding/typed/delivery/ReliableDeliveryShardingSpec.scala | 2 ++
.../cluster/sharding/RememberEntitiesAndStartEntitySpec.scala | 5 +++--
.../apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala | 4 +++-
.../org/apache/pekko/remote/artery/RemoteFailureSpec.scala | 10 ++++++++--
5 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ConsumerControllerImpl.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ConsumerControllerImpl.scala
index 613b403173..295392b81b 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ConsumerControllerImpl.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ConsumerControllerImpl.scala
@@ -392,7 +392,7 @@ private class ConsumerControllerImpl[A] private (
} else if (s.receivedSeqNr == 0) {
// needed for sharding
context.log.debug(
- "Received SequencedMessage seqNr [{}], from new producer producer [{}]
but it wasn't first. Resending.",
+ "Received SequencedMessage seqNr [{}], from new producer [{}] but it
wasn't first. Resending.",
seqNr,
seqMsg.producerController)
// request resend of all unconfirmed, and mark first
diff --git
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
index d0365f2986..4e6becb941 100644
---
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
+++
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala
@@ -399,6 +399,7 @@ class ReliableDeliveryShardingSpec
// It is possible the ProducerController re-sends msg-3 again
before it has processed its acknowledgement.
// If the ConsumerController restarts between sending the
acknowledgement and receiving that re-sent msg-3,
// it will deliver msg-3 a second time. We then expect msg-4 next:
+ delivery3cor4.confirmTo ! ConsumerController.Confirmed
val delivery4 = consumerProbes(1).receiveMessage()
delivery4.message should ===(TestConsumer.Job("msg-4"))
case TestConsumer.Job("msg-4") =>
@@ -418,6 +419,7 @@ class ReliableDeliveryShardingSpec
// It is possible the ProducerController re-sends msg-3 again
before it has processed its acknowledgement.
// If the ConsumerController restarts between sending the
acknowledgement and receiving that re-sent msg-3,
// it will deliver msg-3 a second time. We then expect msg-4 next:
+ delivery3cor4.confirmTo ! ConsumerController.Confirmed
val delivery4 = consumerProbes(2).receiveMessage()
delivery4.message should ===(TestConsumer.Job("msg-4"))
case TestConsumer.Job("msg-4") =>
diff --git
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesAndStartEntitySpec.scala
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesAndStartEntitySpec.scala
index 2163074c52..330961469d 100644
---
a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesAndStartEntitySpec.scala
+++
b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesAndStartEntitySpec.scala
@@ -30,6 +30,7 @@ import pekko.testkit.WithLogCapturing
import org.scalatest.wordspec.AnyWordSpecLike
+import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
object RememberEntitiesAndStartEntitySpec {
@@ -105,13 +106,13 @@ class RememberEntitiesAndStartEntitySpec
// race condition between this message and region getting the
termination message, we may need to retry
val secondShardIncarnation = awaitAssert {
sharding ! EntityEnvelope(11, "give-me-shard")
- expectMsgType[ActorRef]
+ expectMsgType[ActorRef](1.second) // short timeout, retry via
awaitAssert
}
awaitAssert {
secondShardIncarnation ! GetShardStats
// the remembered 1 and 11 which we just triggered start of
- expectMsg(ShardStats("1", 2))
+ expectMsg(1.second, ShardStats("1", 2)) // short timeout, retry via
awaitAssert
}
}
}
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
index 088d0da555..a2d3c2c3d0 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
@@ -30,6 +30,7 @@ import pekko.testkit.TestProbe
import org.scalatest.concurrent.Eventually
import org.scalatest.time.Span
+import org.scalatest.time.Span.convertSpanToDuration
class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec("""
pekko.loglevel=INFO
@@ -284,7 +285,8 @@ class OutboundIdleShutdownSpec extends
ArteryMultiNodeSpec("""
def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) /
"user" / "echo")
- val echoRef = remoteEcho.resolveOne(remainingOrDefault).futureValue
+ // use a little less resolve timeout than the patience timeout for
good error messages
+ val echoRef =
remoteEcho.resolveOne(convertSpanToDuration(patience.timeout) -
200.millis).futureValue
val localProbe = new TestProbe(localSystem)
echoRef.tell("ping", localProbe.ref)
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/RemoteFailureSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/RemoteFailureSpec.scala
index 537cbe11a4..037f08659f 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/RemoteFailureSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/RemoteFailureSpec.scala
@@ -16,6 +16,8 @@ package org.apache.pekko.remote.artery
import scala.concurrent.duration._
import org.apache.pekko
+import org.apache.pekko.actor.ActorIdentity
+import org.apache.pekko.actor.Identify
import pekko.remote.EndpointDisassociatedException
import pekko.serialization.jackson.CborSerializable
import pekko.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent }
@@ -34,7 +36,7 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with
ImplicitSender {
"Remoting" should {
"not be exhausted by sending to broken connections" in {
- val remoteSystems = Vector.fill(5)(newRemoteSystem())
+ val remoteSystems = Vector.fill(3)(newRemoteSystem())
remoteSystems.foreach { sys =>
sys.eventStream.publish(TestEvent
@@ -42,7 +44,11 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with
ImplicitSender {
sys.actorOf(TestActors.echoActorProps, name = "echo")
}
val remoteSelections = remoteSystems.map { sys =>
- system.actorSelection(rootActorPath(sys) / "user" / "echo")
+ val sel = system.actorSelection(rootActorPath(sys) / "user" / "echo")
+ // verify that it's there
+ sel ! Identify(sel.toSerializationFormat)
+ expectMsgType[ActorIdentity].ref.isDefined should ===(true)
+ sel
}
system.actorOf(TestActors.echoActorProps, name = "echo")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]