This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.1.x by this push:
new 85291e0902 add non-default config that allows InboundQuarantineCheck
to ignore 'harmless' quarantine events (#1555) (#2100)
85291e0902 is described below
commit 85291e0902f5615e53915dca7fbffd7aef99134d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed Aug 27 20:13:28 2025 +0800
add non-default config that allows InboundQuarantineCheck to ignore
'harmless' quarantine events (#1555) (#2100)
* stub test for harmless=true
Update OutboundIdleShutdownSpec.scala
Update OutboundIdleShutdownSpec.scala
Update OutboundIdleShutdownSpec.scala
* add quarantinedButHarmless check for tests
* new test case
* Update OutboundIdleShutdownSpec.scala
* try to not shutdown when quarantine is harmless
* Update OutboundIdleShutdownSpec.scala
* Create quarantine.backwards.excludes
* Update quarantine.backwards.excludes
* update log message
* try to add config
* Update ArterySettings.scala
* add tests
* Update OutboundIdleShutdownSpec.scala
* rework test
(cherry picked from commit ec5e33fdb944305b709ba784166a438493fec497)
Co-authored-by: PJ Fanning <[email protected]>
---
.../quarantine.backwards.excludes | 24 ++++
remote/src/main/resources/reference.conf | 5 +
.../pekko/remote/artery/ArterySettings.scala | 7 +
.../pekko/remote/artery/ArteryTransport.scala | 15 ++-
.../apache/pekko/remote/artery/Association.scala | 2 +-
.../remote/artery/InboundQuarantineCheck.scala | 26 ++--
.../remote/artery/HarmlessQuarantineSpec.scala | 142 +++++++++++++++++++++
.../remote/artery/OutboundIdleShutdownSpec.scala | 90 ++++++++++++-
8 files changed, 292 insertions(+), 19 deletions(-)
diff --git
a/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
new file mode 100644
index 0000000000..bcf9400738
--- /dev/null
+++
b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changes made due to issues with downing during harmless quarantine
+# https://github.com/apache/pekko/issues/578
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.unapply")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.artery.AssociationState$QuarantinedTimestamp$")
diff --git a/remote/src/main/resources/reference.conf
b/remote/src/main/resources/reference.conf
index b1f6e7c28d..ea55bfca2d 100644
--- a/remote/src/main/resources/reference.conf
+++ b/remote/src/main/resources/reference.conf
@@ -857,6 +857,11 @@ pekko {
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
+ # If set to "on", InboundQuarantineCheck will propagate harmless
quarantine events.
+ # This is the legacy behavior. Users who see these harmless quarantine
events lead
+ # to problems can set this to "off" to suppress them
(https://github.com/apache/pekko/pull/1555).
+ propagate-harmless-quarantine-events = on
+
advanced {
# Maximum serialized message size, including header data.
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
index 85c54c74bc..8b257b27e4 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
@@ -105,6 +105,13 @@ private[pekko] final class ArterySettings private (config:
Config) {
*/
val Version: Byte = ArteryTransport.HighestVersion
+ /**
+ * If set to true, harmless quarantine events are propagated in
InboundQuarantineCheck.
+ * Background is in https://github.com/apache/pekko/pull/1555
+ */
+ val PropagateHarmlessQuarantineEvents: Boolean =
+ getBoolean("propagate-harmless-quarantine-events")
+
object Advanced {
val config: Config = getConfig("advanced")
import config._
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
index 44103d42ed..76e618b758 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
@@ -108,9 +108,9 @@ private[remote] object AssociationState {
quarantined = ImmutableLongMap.empty[QuarantinedTimestamp],
new AtomicReference(UniqueRemoteAddressValue(None, Nil)))
- final case class QuarantinedTimestamp(nanoTime: Long) {
+ final case class QuarantinedTimestamp(nanoTime: Long, harmless: Boolean =
false) {
override def toString: String =
- s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() -
nanoTime)} seconds ago"
+ s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() -
nanoTime)} seconds ago (harmless=$harmless)"
}
private final case class UniqueRemoteAddressValue(
@@ -159,6 +159,13 @@ private[remote] final class AssociationState private (
def isQuarantined(uid: Long): Boolean = quarantined.contains(uid)
+ def quarantinedButHarmless(uid: Long): Boolean = {
+ quarantined.get(uid) match {
+ case OptionVal.Some(qt) => qt.harmless
+ case _ => false
+ }
+ }
+
@tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = {
val current = _uniqueRemoteAddress.get()
if (current.uniqueRemoteAddress.isEmpty) {
@@ -196,14 +203,14 @@ private[remote] final class AssociationState private (
quarantined,
new AtomicReference(UniqueRemoteAddressValue(Some(remoteAddress), Nil)))
- def newQuarantined(): AssociationState =
+ def newQuarantined(harmless: Boolean = false): AssociationState =
uniqueRemoteAddress() match {
case Some(a) =>
new AssociationState(
incarnation,
lastUsedTimestamp = new AtomicLong(System.nanoTime()),
controlIdleKillSwitch,
- quarantined = quarantined.updated(a.uid,
QuarantinedTimestamp(System.nanoTime())),
+ quarantined = quarantined.updated(a.uid,
QuarantinedTimestamp(System.nanoTime(), harmless)),
_uniqueRemoteAddress)
case None => this
}
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
index 6bf999338f..b97fef185c 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
@@ -538,7 +538,7 @@ private[remote] class Association(
current.uniqueRemoteAddress() match {
case Some(peer) if peer.uid == u =>
if (!current.isQuarantined(u)) {
- val newState = current.newQuarantined()
+ val newState = current.newQuarantined(harmless)
if (swapState(current, newState)) {
// quarantine state change was performed
if (harmless) {
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
index 7e84545712..c6d2094149 100644
---
a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
+++
b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
@@ -45,17 +45,27 @@ private[remote] class
InboundQuarantineCheck(inboundContext: InboundContext)
env.association match {
case OptionVal.Some(association) =>
if (association.associationState.isQuarantined(env.originUid)) {
- if (log.isDebugEnabled)
- log.debug(
- "Dropping message [{}] from [{}#{}] because the system is
quarantined",
+ if (!inboundContext.settings.PropagateHarmlessQuarantineEvents
+ &&
association.associationState.quarantinedButHarmless(env.originUid)) {
+ log.info(
+ "Message [{}] from [{}#{}] was dropped. " +
+ "The system is quarantined but the UID is known to be
harmless.",
Logging.messageClassName(env.message),
association.remoteAddress,
env.originUid)
- // avoid starting outbound stream for heartbeats
- if (!env.message.isInstanceOf[Quarantined] &&
!isHeartbeat(env.message))
- inboundContext.sendControl(
- association.remoteAddress,
- Quarantined(inboundContext.localAddress,
UniqueAddress(association.remoteAddress, env.originUid)))
+ } else {
+ if (log.isDebugEnabled)
+ log.debug(
+ "Dropping message [{}] from [{}#{}] because the system is
quarantined",
+ Logging.messageClassName(env.message),
+ association.remoteAddress,
+ env.originUid)
+ // avoid starting outbound stream for heartbeats
+ if (!env.message.isInstanceOf[Quarantined] &&
!isHeartbeat(env.message))
+ inboundContext.sendControl(
+ association.remoteAddress,
+ Quarantined(inboundContext.localAddress,
UniqueAddress(association.remoteAddress, env.originUid)))
+ }
pull(in)
} else
push(out, env)
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
new file mode 100644
index 0000000000..eae4dd97a9
--- /dev/null
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.remote.artery
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.Span
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.actor.RootActorPath
+import pekko.remote.RARP
+import pekko.remote.UniqueAddress
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestActors
+import pekko.testkit.TestProbe
+
+class HarmlessQuarantineSpec extends ArteryMultiNodeSpec("""
+ pekko.loglevel=INFO
+ pekko.remote.artery.propagate-harmless-quarantine-events = off
+ pekko.remote.artery.advanced {
+ stop-idle-outbound-after = 1 s
+ connection-timeout = 2 s
+ remove-quarantined-association-after = 1 s
+ compression {
+ actor-refs.advertisement-interval = 5 seconds
+ }
+ }
+ """) with ImplicitSender with Eventually {
+
+ override implicit val patience: PatienceConfig = {
+ import pekko.testkit.TestDuration
+ PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2,
Span(200, org.scalatest.time.Millis))
+ }
+
+ private def futureUniqueRemoteAddress(association: Association):
Future[UniqueAddress] = {
+ val p = Promise[UniqueAddress]()
+ association.associationState.addUniqueRemoteAddressListener(a =>
p.success(a))
+ p.future
+ }
+
+ "Harmless Quarantine Events" should {
+
+ "eliminate quarantined association when not used - echo test" in
withAssociation {
+ (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+ // event to watch out for, indicator of the issue
+ remoteSystem.eventStream.subscribe(testActor,
classOf[ThisActorSystemQuarantinedEvent])
+
+ val remoteEcho =
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+ val localAddress = RARP(system).provider.getDefaultAddress
+
+ val localEchoRef =
+ remoteSystem.actorSelection(RootActorPath(localAddress) /
localProbe.ref.path.elements).resolveOne(
+ remainingOrDefault).futureValue
+ remoteEcho.tell("ping", localEchoRef)
+ localProbe.expectMsg("ping")
+
+ val association = localArtery.association(remoteAddress)
+ val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+ localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid)
shouldBe false
+
+ remoteEcho.tell("ping", localEchoRef) // trigger sending message from
remote to local, which will trigger local to wrongfully notify remote that it
is quarantined
+ eventually {
+ expectMsgType[ThisActorSystemQuarantinedEvent] // this is what
remote emits when it learns it is quarantined by local
+ }
+ }
+
+ "eliminate quarantined association when not used - echo test
(harmless=true)" in withAssociation {
+ (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+ // event to watch out for, indicator of the issue
+ remoteSystem.eventStream.subscribe(testActor,
classOf[ThisActorSystemQuarantinedEvent])
+
+ val remoteEcho =
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+ val localAddress = RARP(system).provider.getDefaultAddress
+
+ val localEchoRef =
+ remoteSystem.actorSelection(RootActorPath(localAddress) /
localProbe.ref.path.elements).resolveOne(
+ remainingOrDefault).futureValue
+ remoteEcho.tell("ping", localEchoRef)
+ localProbe.expectMsg("ping")
+
+ val association = localArtery.association(remoteAddress)
+ val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+ localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest",
harmless = true)
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid)
shouldBe true
+
+ remoteEcho.tell("ping", localEchoRef) // trigger sending message from
remote to local, which will trigger local to wrongfully notify remote that it
is quarantined
+ eventually {
+ expectNoMessage()
+ }
+ }
+
+ /**
+ * Test setup fixture:
+ * 1. A 'remote' ActorSystem is created to spawn an Echo actor,
+ * 2. A TestProbe is spawned locally to initiate communication with the
Echo actor
+ * 3. Details (remoteAddress, remoteEcho, localArtery, localProbe) are
supplied to the test
+ */
+ def withAssociation(test: (ActorSystem, Address, ActorRef,
ArteryTransport, TestProbe) => Any): Unit = {
+ val remoteSystem = newRemoteSystem()
+ try {
+ remoteSystem.actorOf(TestActors.echoActorProps, "echo")
+ val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
+
+ def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) /
"user" / "echo")
+
+ val echoRef = remoteEcho.resolveOne(remainingOrDefault).futureValue
+ val localProbe = new TestProbe(localSystem)
+
+ echoRef.tell("ping", localProbe.ref)
+ localProbe.expectMsg("ping")
+
+ val artery =
RARP(system).provider.transport.asInstanceOf[ArteryTransport]
+
+ test(remoteSystem, remoteAddress, echoRef, artery, localProbe)
+
+ } finally {
+ shutdown(remoteSystem)
+ }
+ }
+ }
+}
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
index 39beddb4a2..8846c40f44 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
@@ -31,13 +31,15 @@ import pekko.testkit.ImplicitSender
import pekko.testkit.TestActors
import pekko.testkit.TestProbe
-class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
+class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec("""
pekko.loglevel=INFO
- pekko.remote.artery.advanced.stop-idle-outbound-after = 1 s
- pekko.remote.artery.advanced.connection-timeout = 2 s
- pekko.remote.artery.advanced.remove-quarantined-association-after = 1 s
- pekko.remote.artery.advanced.compression {
- actor-refs.advertisement-interval = 5 seconds
+ pekko.remote.artery.advanced {
+ stop-idle-outbound-after = 1 s
+ connection-timeout = 2 s
+ remove-quarantined-association-after = 1 s
+ compression {
+ actor-refs.advertisement-interval = 5 seconds
+ }
}
""") with ImplicitSender with Eventually {
@@ -116,6 +118,8 @@ class OutboundIdleShutdownSpec extends
ArteryMultiNodeSpec(s"""
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid) shouldBe
false
eventually {
assertStreamActive(association, Association.ControlQueueIndex,
expected = false)
@@ -128,6 +132,80 @@ class OutboundIdleShutdownSpec extends
ArteryMultiNodeSpec(s"""
}
}
+ "eliminate quarantined association when not used (harmless=true)" in
withAssociation {
+ (_, remoteAddress, _, localArtery, _) =>
+ val association = localArtery.association(remoteAddress)
+ val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+
+ localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest",
harmless = true)
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid)
shouldBe true
+
+ eventually {
+ assertStreamActive(association, Association.ControlQueueIndex,
expected = false)
+ assertStreamActive(association, Association.OrdinaryQueueIndex,
expected = false)
+ }
+
+ // the outbound streams are inactive and association quarantined, then
it's completely removed
+ eventually {
+ localArtery.remoteAddresses should not contain remoteAddress
+ }
+ }
+
+ "eliminate quarantined association when not used - echo test" in
withAssociation {
+ (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+ // event to watch out for, indicator of the issue
+ remoteSystem.eventStream.subscribe(testActor,
classOf[ThisActorSystemQuarantinedEvent])
+
+ val remoteEcho =
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+ val localAddress = RARP(system).provider.getDefaultAddress
+
+ val localEchoRef =
+ remoteSystem.actorSelection(RootActorPath(localAddress) /
localProbe.ref.path.elements).resolveOne(
+ remainingOrDefault).futureValue
+ remoteEcho.tell("ping", localEchoRef)
+ localProbe.expectMsg("ping")
+
+ val association = localArtery.association(remoteAddress)
+ val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+ localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid)
shouldBe false
+
+ remoteEcho.tell("ping", localEchoRef) // trigger sending message from
remote to local, which will trigger local to wrongfully notify remote that it
is quarantined
+ eventually {
+ expectMsgType[ThisActorSystemQuarantinedEvent] // this is what
remote emits when it learns it is quarantined by local
+ }
+ }
+
+ "eliminate quarantined association when not used - echo test
(harmless=true)" in withAssociation {
+ (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+ // event to watch out for, indicator of the issue
+ remoteSystem.eventStream.subscribe(testActor,
classOf[ThisActorSystemQuarantinedEvent])
+
+ val remoteEcho =
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+ val localAddress = RARP(system).provider.getDefaultAddress
+
+ val localEchoRef =
+ remoteSystem.actorSelection(RootActorPath(localAddress) /
localProbe.ref.path.elements).resolveOne(
+ remainingOrDefault).futureValue
+ remoteEcho.tell("ping", localEchoRef)
+ localProbe.expectMsg("ping")
+
+ val association = localArtery.association(remoteAddress)
+ val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+ localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest",
harmless = true)
+ association.associationState.isQuarantined(remoteUid) shouldBe true
+ association.associationState.quarantinedButHarmless(remoteUid)
shouldBe true
+
+ remoteEcho.tell("ping", localEchoRef) // trigger sending message from
remote to local, which will trigger local to wrongfully notify remote that it
is quarantined
+ eventually {
+ expectMsgType[ThisActorSystemQuarantinedEvent] // this is what
remote emits when it learns it is quarantined by local
+ }
+ }
+
"remove inbound compression after quarantine" in withAssociation { (_,
remoteAddress, _, localArtery, _) =>
val association = localArtery.association(remoteAddress)
val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]