This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new 85291e0902 add non-default config that allows InboundQuarantineCheck 
to ignore 'harmless' quarantine events (#1555) (#2100)
85291e0902 is described below

commit 85291e0902f5615e53915dca7fbffd7aef99134d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed Aug 27 20:13:28 2025 +0800

    add non-default config that allows InboundQuarantineCheck to ignore 
'harmless' quarantine events (#1555) (#2100)
    
    * stub test for harmless=true
    
    Update OutboundIdleShutdownSpec.scala
    
    Update OutboundIdleShutdownSpec.scala
    
    Update OutboundIdleShutdownSpec.scala
    
    * add quarantinedButHarmless check for tests
    
    * new test case
    
    * Update OutboundIdleShutdownSpec.scala
    
    * try to not shutdown when quarantine is harmless
    
    * Update OutboundIdleShutdownSpec.scala
    
    * Create quarantine.backwards.excludes
    
    * Update quarantine.backwards.excludes
    
    * update log message
    
    * try to add config
    
    * Update ArterySettings.scala
    
    * add tests
    
    * Update OutboundIdleShutdownSpec.scala
    
    * rework test
    
    (cherry picked from commit ec5e33fdb944305b709ba784166a438493fec497)
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../quarantine.backwards.excludes                  |  24 ++++
 remote/src/main/resources/reference.conf           |   5 +
 .../pekko/remote/artery/ArterySettings.scala       |   7 +
 .../pekko/remote/artery/ArteryTransport.scala      |  15 ++-
 .../apache/pekko/remote/artery/Association.scala   |   2 +-
 .../remote/artery/InboundQuarantineCheck.scala     |  26 ++--
 .../remote/artery/HarmlessQuarantineSpec.scala     | 142 +++++++++++++++++++++
 .../remote/artery/OutboundIdleShutdownSpec.scala   |  90 ++++++++++++-
 8 files changed, 292 insertions(+), 19 deletions(-)

diff --git 
a/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
 
b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
new file mode 100644
index 0000000000..bcf9400738
--- /dev/null
+++ 
b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# changes made due to issues with downing during harmless quarantine
+# https://github.com/apache/pekko/issues/578
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.unapply")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.artery.AssociationState$QuarantinedTimestamp$")
diff --git a/remote/src/main/resources/reference.conf 
b/remote/src/main/resources/reference.conf
index b1f6e7c28d..ea55bfca2d 100644
--- a/remote/src/main/resources/reference.conf
+++ b/remote/src/main/resources/reference.conf
@@ -857,6 +857,11 @@ pekko {
       # limit there will be extra performance and scalability cost.
       log-frame-size-exceeding = off
 
+      # If set to "on", InboundQuarantineCheck will propagate harmless 
quarantine events.
+      # This is the legacy behavior. Users who see these harmless quarantine 
events lead
+      # to problems can set this to "off" to suppress them 
(https://github.com/apache/pekko/pull/1555).
+      propagate-harmless-quarantine-events = on
+
       advanced {
 
         # Maximum serialized message size, including header data.
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala 
b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
index 85c54c74bc..8b257b27e4 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArterySettings.scala
@@ -105,6 +105,13 @@ private[pekko] final class ArterySettings private (config: 
Config) {
    */
   val Version: Byte = ArteryTransport.HighestVersion
 
+  /**
+   * If set to true, harmless quarantine events are propagated in 
InboundQuarantineCheck.
+   * Background is in https://github.com/apache/pekko/pull/1555
+   */
+  val PropagateHarmlessQuarantineEvents: Boolean =
+    getBoolean("propagate-harmless-quarantine-events")
+
   object Advanced {
     val config: Config = getConfig("advanced")
     import config._
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala 
b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
index 44103d42ed..76e618b758 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala
@@ -108,9 +108,9 @@ private[remote] object AssociationState {
       quarantined = ImmutableLongMap.empty[QuarantinedTimestamp],
       new AtomicReference(UniqueRemoteAddressValue(None, Nil)))
 
-  final case class QuarantinedTimestamp(nanoTime: Long) {
+  final case class QuarantinedTimestamp(nanoTime: Long, harmless: Boolean = 
false) {
     override def toString: String =
-      s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - 
nanoTime)} seconds ago"
+      s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - 
nanoTime)} seconds ago (harmless=$harmless)"
   }
 
   private final case class UniqueRemoteAddressValue(
@@ -159,6 +159,13 @@ private[remote] final class AssociationState private (
 
   def isQuarantined(uid: Long): Boolean = quarantined.contains(uid)
 
+  def quarantinedButHarmless(uid: Long): Boolean = {
+    quarantined.get(uid) match {
+      case OptionVal.Some(qt) => qt.harmless
+      case _                  => false
+    }
+  }
+
   @tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = {
     val current = _uniqueRemoteAddress.get()
     if (current.uniqueRemoteAddress.isEmpty) {
@@ -196,14 +203,14 @@ private[remote] final class AssociationState private (
       quarantined,
       new AtomicReference(UniqueRemoteAddressValue(Some(remoteAddress), Nil)))
 
-  def newQuarantined(): AssociationState =
+  def newQuarantined(harmless: Boolean = false): AssociationState =
     uniqueRemoteAddress() match {
       case Some(a) =>
         new AssociationState(
           incarnation,
           lastUsedTimestamp = new AtomicLong(System.nanoTime()),
           controlIdleKillSwitch,
-          quarantined = quarantined.updated(a.uid, 
QuarantinedTimestamp(System.nanoTime())),
+          quarantined = quarantined.updated(a.uid, 
QuarantinedTimestamp(System.nanoTime(), harmless)),
           _uniqueRemoteAddress)
       case None => this
     }
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala 
b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
index 6bf999338f..b97fef185c 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala
@@ -538,7 +538,7 @@ private[remote] class Association(
         current.uniqueRemoteAddress() match {
           case Some(peer) if peer.uid == u =>
             if (!current.isQuarantined(u)) {
-              val newState = current.newQuarantined()
+              val newState = current.newQuarantined(harmless)
               if (swapState(current, newState)) {
                 // quarantine state change was performed
                 if (harmless) {
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
 
b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
index 7e84545712..c6d2094149 100644
--- 
a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
+++ 
b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala
@@ -45,17 +45,27 @@ private[remote] class 
InboundQuarantineCheck(inboundContext: InboundContext)
         env.association match {
           case OptionVal.Some(association) =>
             if (association.associationState.isQuarantined(env.originUid)) {
-              if (log.isDebugEnabled)
-                log.debug(
-                  "Dropping message [{}] from [{}#{}] because the system is 
quarantined",
+              if (!inboundContext.settings.PropagateHarmlessQuarantineEvents
+                && 
association.associationState.quarantinedButHarmless(env.originUid)) {
+                log.info(
+                  "Message [{}] from [{}#{}] was dropped. " +
+                  "The system is quarantined but the UID is known to be 
harmless.",
                   Logging.messageClassName(env.message),
                   association.remoteAddress,
                   env.originUid)
-              // avoid starting outbound stream for heartbeats
-              if (!env.message.isInstanceOf[Quarantined] && 
!isHeartbeat(env.message))
-                inboundContext.sendControl(
-                  association.remoteAddress,
-                  Quarantined(inboundContext.localAddress, 
UniqueAddress(association.remoteAddress, env.originUid)))
+              } else {
+                if (log.isDebugEnabled)
+                  log.debug(
+                    "Dropping message [{}] from [{}#{}] because the system is 
quarantined",
+                    Logging.messageClassName(env.message),
+                    association.remoteAddress,
+                    env.originUid)
+                // avoid starting outbound stream for heartbeats
+                if (!env.message.isInstanceOf[Quarantined] && 
!isHeartbeat(env.message))
+                  inboundContext.sendControl(
+                    association.remoteAddress,
+                    Quarantined(inboundContext.localAddress, 
UniqueAddress(association.remoteAddress, env.originUid)))
+              }
               pull(in)
             } else
               push(out, env)
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
new file mode 100644
index 0000000000..eae4dd97a9
--- /dev/null
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/HarmlessQuarantineSpec.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.remote.artery
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.Span
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.actor.RootActorPath
+import pekko.remote.RARP
+import pekko.remote.UniqueAddress
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestActors
+import pekko.testkit.TestProbe
+
+class HarmlessQuarantineSpec extends ArteryMultiNodeSpec("""
+  pekko.loglevel=INFO
+  pekko.remote.artery.propagate-harmless-quarantine-events = off
+  pekko.remote.artery.advanced {
+    stop-idle-outbound-after = 1 s
+    connection-timeout = 2 s
+    remove-quarantined-association-after = 1 s
+    compression {
+      actor-refs.advertisement-interval = 5 seconds
+    }
+  }
+  """) with ImplicitSender with Eventually {
+
+  override implicit val patience: PatienceConfig = {
+    import pekko.testkit.TestDuration
+    PatienceConfig(testKitSettings.DefaultTimeout.duration.dilated * 2, 
Span(200, org.scalatest.time.Millis))
+  }
+
+  private def futureUniqueRemoteAddress(association: Association): 
Future[UniqueAddress] = {
+    val p = Promise[UniqueAddress]()
+    association.associationState.addUniqueRemoteAddressListener(a => 
p.success(a))
+    p.future
+  }
+
+  "Harmless Quarantine Events" should {
+
+    "eliminate quarantined association when not used - echo test" in 
withAssociation {
+      (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+        // event to watch out for, indicator of the issue
+        remoteSystem.eventStream.subscribe(testActor, 
classOf[ThisActorSystemQuarantinedEvent])
+
+        val remoteEcho = 
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+        val localAddress = RARP(system).provider.getDefaultAddress
+
+        val localEchoRef =
+          remoteSystem.actorSelection(RootActorPath(localAddress) / 
localProbe.ref.path.elements).resolveOne(
+            remainingOrDefault).futureValue
+        remoteEcho.tell("ping", localEchoRef)
+        localProbe.expectMsg("ping")
+
+        val association = localArtery.association(remoteAddress)
+        val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+        localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+        association.associationState.isQuarantined(remoteUid) shouldBe true
+        association.associationState.quarantinedButHarmless(remoteUid) 
shouldBe false
+
+        remoteEcho.tell("ping", localEchoRef) // trigger sending message from 
remote to local, which will trigger local to wrongfully notify remote that it 
is quarantined
+        eventually {
+          expectMsgType[ThisActorSystemQuarantinedEvent] // this is what 
remote emits when it learns it is quarantined by local
+        }
+    }
+
+    "eliminate quarantined association when not used - echo test 
(harmless=true)" in withAssociation {
+      (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+        // event to watch out for, indicator of the issue
+        remoteSystem.eventStream.subscribe(testActor, 
classOf[ThisActorSystemQuarantinedEvent])
+
+        val remoteEcho = 
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+        val localAddress = RARP(system).provider.getDefaultAddress
+
+        val localEchoRef =
+          remoteSystem.actorSelection(RootActorPath(localAddress) / 
localProbe.ref.path.elements).resolveOne(
+            remainingOrDefault).futureValue
+        remoteEcho.tell("ping", localEchoRef)
+        localProbe.expectMsg("ping")
+
+        val association = localArtery.association(remoteAddress)
+        val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+        localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", 
harmless = true)
+        association.associationState.isQuarantined(remoteUid) shouldBe true
+        association.associationState.quarantinedButHarmless(remoteUid) 
shouldBe true
+
+        remoteEcho.tell("ping", localEchoRef) // trigger sending message from 
remote to local, which will trigger local to wrongfully notify remote that it 
is quarantined
+        eventually {
+          expectNoMessage()
+        }
+    }
+
+    /**
+     * Test setup fixture:
+     * 1. A 'remote' ActorSystem is created to spawn an Echo actor,
+     * 2. A TestProbe is spawned locally to initiate communication with the 
Echo actor
+     * 3. Details (remoteAddress, remoteEcho, localArtery, localProbe) are 
supplied to the test
+     */
+    def withAssociation(test: (ActorSystem, Address, ActorRef, 
ArteryTransport, TestProbe) => Any): Unit = {
+      val remoteSystem = newRemoteSystem()
+      try {
+        remoteSystem.actorOf(TestActors.echoActorProps, "echo")
+        val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
+
+        def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) / 
"user" / "echo")
+
+        val echoRef = remoteEcho.resolveOne(remainingOrDefault).futureValue
+        val localProbe = new TestProbe(localSystem)
+
+        echoRef.tell("ping", localProbe.ref)
+        localProbe.expectMsg("ping")
+
+        val artery = 
RARP(system).provider.transport.asInstanceOf[ArteryTransport]
+
+        test(remoteSystem, remoteAddress, echoRef, artery, localProbe)
+
+      } finally {
+        shutdown(remoteSystem)
+      }
+    }
+  }
+}
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
index 39beddb4a2..8846c40f44 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala
@@ -31,13 +31,15 @@ import pekko.testkit.ImplicitSender
 import pekko.testkit.TestActors
 import pekko.testkit.TestProbe
 
-class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
+class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec("""
   pekko.loglevel=INFO
-  pekko.remote.artery.advanced.stop-idle-outbound-after = 1 s
-  pekko.remote.artery.advanced.connection-timeout = 2 s
-  pekko.remote.artery.advanced.remove-quarantined-association-after = 1 s
-  pekko.remote.artery.advanced.compression {
-    actor-refs.advertisement-interval = 5 seconds
+  pekko.remote.artery.advanced {
+    stop-idle-outbound-after = 1 s
+    connection-timeout = 2 s
+    remove-quarantined-association-after = 1 s
+    compression {
+      actor-refs.advertisement-interval = 5 seconds
+    }
   }
   """) with ImplicitSender with Eventually {
 
@@ -116,6 +118,8 @@ class OutboundIdleShutdownSpec extends 
ArteryMultiNodeSpec(s"""
       val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
 
       localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+      association.associationState.isQuarantined(remoteUid) shouldBe true
+      association.associationState.quarantinedButHarmless(remoteUid) shouldBe 
false
 
       eventually {
         assertStreamActive(association, Association.ControlQueueIndex, 
expected = false)
@@ -128,6 +132,80 @@ class OutboundIdleShutdownSpec extends 
ArteryMultiNodeSpec(s"""
       }
     }
 
+    "eliminate quarantined association when not used (harmless=true)" in 
withAssociation {
+      (_, remoteAddress, _, localArtery, _) =>
+        val association = localArtery.association(remoteAddress)
+        val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+
+        localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", 
harmless = true)
+        association.associationState.isQuarantined(remoteUid) shouldBe true
+        association.associationState.quarantinedButHarmless(remoteUid) 
shouldBe true
+
+        eventually {
+          assertStreamActive(association, Association.ControlQueueIndex, 
expected = false)
+          assertStreamActive(association, Association.OrdinaryQueueIndex, 
expected = false)
+        }
+
+        // the outbound streams are inactive and association quarantined, then 
it's completely removed
+        eventually {
+          localArtery.remoteAddresses should not contain remoteAddress
+        }
+    }
+
+    "eliminate quarantined association when not used - echo test" in 
withAssociation {
+      (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+        // event to watch out for, indicator of the issue
+        remoteSystem.eventStream.subscribe(testActor, 
classOf[ThisActorSystemQuarantinedEvent])
+
+        val remoteEcho = 
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+        val localAddress = RARP(system).provider.getDefaultAddress
+
+        val localEchoRef =
+          remoteSystem.actorSelection(RootActorPath(localAddress) / 
localProbe.ref.path.elements).resolveOne(
+            remainingOrDefault).futureValue
+        remoteEcho.tell("ping", localEchoRef)
+        localProbe.expectMsg("ping")
+
+        val association = localArtery.association(remoteAddress)
+        val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+        localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
+        association.associationState.isQuarantined(remoteUid) shouldBe true
+        association.associationState.quarantinedButHarmless(remoteUid) 
shouldBe false
+
+        remoteEcho.tell("ping", localEchoRef) // trigger sending message from 
remote to local, which will trigger local to wrongfully notify remote that it 
is quarantined
+        eventually {
+          expectMsgType[ThisActorSystemQuarantinedEvent] // this is what 
remote emits when it learns it is quarantined by local
+        }
+    }
+
+    "eliminate quarantined association when not used - echo test 
(harmless=true)" in withAssociation {
+      (remoteSystem, remoteAddress, _, localArtery, localProbe) =>
+        // event to watch out for, indicator of the issue
+        remoteSystem.eventStream.subscribe(testActor, 
classOf[ThisActorSystemQuarantinedEvent])
+
+        val remoteEcho = 
remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue
+
+        val localAddress = RARP(system).provider.getDefaultAddress
+
+        val localEchoRef =
+          remoteSystem.actorSelection(RootActorPath(localAddress) / 
localProbe.ref.path.elements).resolveOne(
+            remainingOrDefault).futureValue
+        remoteEcho.tell("ping", localEchoRef)
+        localProbe.expectMsg("ping")
+
+        val association = localArtery.association(remoteAddress)
+        val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid
+        localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", 
harmless = true)
+        association.associationState.isQuarantined(remoteUid) shouldBe true
+        association.associationState.quarantinedButHarmless(remoteUid) 
shouldBe true
+
+        remoteEcho.tell("ping", localEchoRef) // trigger sending message from 
remote to local, which will trigger local to wrongfully notify remote that it 
is quarantined
+        eventually {
+          expectMsgType[ThisActorSystemQuarantinedEvent] // this is what 
remote emits when it learns it is quarantined by local
+        }
+    }
+
     "remove inbound compression after quarantine" in withAssociation { (_, 
remoteAddress, _, localArtery, _) =>
       val association = localArtery.association(remoteAddress)
       val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to