This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new 38facf752b reimplement fix for akka/pekko cluster (#1594) (#1664)
38facf752b is described below

commit 38facf752b4e7d5fb43ed7c671eb91888b41bd10
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Jan 2 14:04:33 2025 +0100

    reimplement fix for akka/pekko cluster (#1594) (#1664)
    
    * Revert "revert #1568 due to test failures (#1587)"
    
    This reverts commit 7af03e5215dc69df9702803dba7f22d94b49ead4.
    
    * temp run nightly test in this PR
    
    * no need for square brackets because the set print adds them
    
    * logging to find issue
    
    * support tcp protocols
    
    * Update ClusterDaemon.scala
    
    * remove temp logging
    
    * try to fix issue in Remoting
    
    * extra tests
    
    * more tests
    
    * ignore udp tests
    
    * try to make tests tidy up after failures
    
    * Update MixedProtocolClusterSpec.scala
    
    * Update MixedProtocolClusterSpec.scala
    
    * run main cluster tests for PR
---
 .github/workflows/build-test-prValidation.yml      |  47 +++++
 .../org/apache/pekko/cluster/ClusterDaemon.scala   |  29 ++--
 .../pekko/cluster/MixedProtocolClusterSpec.scala   | 192 +++++++++++++++++++++
 .../scala/org/apache/pekko/remote/Remoting.scala   |  22 ++-
 4 files changed, 275 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/build-test-prValidation.yml 
b/.github/workflows/build-test-prValidation.yml
index 1138fe1eec..f3d9a18c18 100644
--- a/.github/workflows/build-test-prValidation.yml
+++ b/.github/workflows/build-test-prValidation.yml
@@ -101,6 +101,53 @@ jobs:
           -Dpekko.log.timestamps=true \
           validatePullRequest
 
+  pekko-classic-remoting-tests:
+    name: Pekko Classic Remoting Tests
+    runs-on: ubuntu-22.04
+    if: github.repository == 'apache/pekko'
+    strategy:
+      fail-fast: false
+      matrix:
+        command:
+          - cluster/test distributed-data/test cluster-tools/test 
cluster-metrics/test
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v4
+        with:
+          # we don't know what commit the last tag was it's safer to get 
entire repo so previousStableVersion resolves
+          fetch-depth: 0
+          fetch-tags: true
+
+      - name: Setup Java 11
+        uses: actions/setup-java@v4
+        with:
+          distribution: temurin
+          java-version: 11
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@v1
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@v6
+
+      - name: Enable jvm-opts
+        run: cp .jvmopts-ci .jvmopts
+
+      - name: sbt ${{ matrix.command }}
+        env:
+          DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+        # note that this is not running any multi-jvm tests because 
multi-in-test=false
+        run: |-
+          sbt \
+          -Djava.security.egd=file:/dev/./urandom \
+          -Dpekko.remote.artery.enabled=off \
+          -Dpekko.test.timefactor=2 \
+          -Dpekko.actor.testkit.typed.timefactor=2 \
+          -Dpekko.test.tags.exclude=gh-exclude,timing \
+          -Dpekko.test.multi-in-test=false \
+          -Dpekko.cluster.assert=on \
+          clean ${{ matrix.command }}
+          
   jdk-21-extra-tests:
     name: Java 21 Extra Tests (including all tests that need Java 9+)
     runs-on: ubuntu-22.04
diff --git 
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 34320067be..479d58e97b 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -13,13 +13,13 @@
 
 package org.apache.pekko.cluster
 
+import scala.annotation.nowarn
 import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
-import scala.annotation.nowarn
 import com.typesafe.config.Config
 
 import org.apache.pekko
@@ -30,13 +30,11 @@ import pekko.annotation.InternalApi
 import pekko.cluster.ClusterEvent._
 import pekko.cluster.MemberStatus._
 import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
-import pekko.event.ActorWithLogClass
-import pekko.event.Logging
+import pekko.event.{ ActorWithLogClass, Logging }
 import pekko.pattern.ask
-import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent }
+import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent, 
RemoteSettings }
 import pekko.remote.artery.QuarantinedEvent
-import pekko.util.Timeout
-import pekko.util.Version
+import pekko.util.{ Timeout, Version }
 
 /**
  * Base trait for all cluster messages. All ClusterMessage's are serializable.
@@ -365,6 +363,13 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
   val statsEnabled = PublishStatsInterval.isFinite
   var gossipStats = GossipStats()
 
+  val acceptedProtocols: Set[String] = {
+    val remoteSettings: RemoteSettings = new 
RemoteSettings(context.system.settings.config)
+    val initSet = remoteSettings.AcceptProtocolNames
+    val tcpSet = initSet.map(protocol => s"$protocol.tcp")
+    initSet ++ tcpSet
+  }
+
   var seedNodes = SeedNodes
   var seedNodeProcess: Option[ActorRef] = None
   var seedNodeProcessCounter = 0 // for unique names
@@ -701,10 +706,10 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
    * which will reply with a `Welcome` message.
    */
   def join(address: Address): Unit = {
-    if (address.protocol != selfAddress.protocol)
+    if (!acceptedProtocols.contains(address.protocol))
       logWarning(
-        "Trying to join member with wrong protocol, but was ignored, expected 
[{}] but was [{}]",
-        selfAddress.protocol,
+        "Trying to join member with wrong protocol, but was ignored, expected 
any of {} but was [{}]",
+        acceptedProtocols,
         address.protocol)
     else if (address.system != selfAddress.system)
       logWarning(
@@ -750,10 +755,10 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
   def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: 
Version): Unit = {
     if (!preparingForShutdown) {
       val selfStatus = latestGossip.member(selfUniqueAddress).status
-      if (joiningNode.address.protocol != selfAddress.protocol)
+      if (!acceptedProtocols.contains(joiningNode.address.protocol))
         logWarning(
-          "Member with wrong protocol tried to join, but was ignored, expected 
[{}] but was [{}]",
-          selfAddress.protocol,
+          "Member with wrong protocol tried to join, but was ignored, expected 
any of {} but was [{}]",
+          acceptedProtocols,
           joiningNode.address.protocol)
       else if (joiningNode.address.system != selfAddress.system)
         logWarning(
diff --git 
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
 
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
new file mode 100644
index 0000000000..f3c8c73adb
--- /dev/null
+++ 
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.cluster
+
+import com.typesafe.config.{ Config, ConfigFactory }
+
+import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec }
+
+object MixedProtocolClusterSpec {
+
+  val baseConfig: Config =
+    ConfigFactory.parseString("""
+     pekko.actor.provider = "cluster"
+     pekko.coordinated-shutdown.terminate-actor-system = on
+
+     pekko.remote.artery.canonical.port = 0
+     pekko.remote.classic.netty.tcp.port = 0
+     pekko.remote.artery.advanced.aeron.idle-cpu-level = 3
+     pekko.remote.accept-protocol-names = ["pekko", "akka"]
+
+     pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+     pekko.cluster.configuration-compatibility-check.enforce-on-join = off
+     """)
+
+  val configWithUdp: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.artery.transport = "aeron-udp"
+    """).withFallback(baseConfig)
+
+  val configWithPekkoUdp: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "pekko"
+    """).withFallback(configWithUdp)
+
+  val configWithAkkaUdp: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "akka"
+    """).withFallback(configWithUdp)
+
+  val configWithPekkoTcp: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "pekko"
+    """).withFallback(baseConfig)
+
+  val configWithAkkaTcp: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "akka"
+    """).withFallback(baseConfig)
+
+  val configWithNetty: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.artery.enabled = false
+      pekko.remote.classic {
+        enabled-transports = ["pekko.remote.classic.netty.tcp"]
+      }
+    """).withFallback(baseConfig)
+
+  val configWithPekkoNetty: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "pekko"
+    """).withFallback(configWithNetty)
+
+  val configWithAkkaNetty: Config =
+    ConfigFactory.parseString("""
+      pekko.remote.protocol-name = "akka"
+    """).withFallback(configWithNetty)
+}
+
+class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit {
+
+  import MixedProtocolClusterSpec._
+
+  "A node using the akka protocol" must {
+
+    "be allowed to join a cluster with a node using the pekko protocol (udp)" 
taggedAs LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // start the first node with the "pekko" protocol
+        clusterTestUtil.newActorSystem(configWithPekkoUdp)
+
+        // have a node using the "akka" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+
+    "be allowed to join a cluster with a node using the pekko protocol (tcp)" 
taggedAs LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // start the first node with the "pekko" protocol
+        clusterTestUtil.newActorSystem(configWithPekkoTcp)
+
+        // have a node using the "akka" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+
+    "be allowed to join a cluster with a node using the pekko protocol 
(netty)" taggedAs LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // start the first node with the "pekko" protocol
+        clusterTestUtil.newActorSystem(configWithPekkoNetty)
+
+        // have a node using the "akka" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+
+    "allow a node using the pekko protocol to join the cluster (udp)" taggedAs 
LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // create the first node with the "akka" protocol
+        clusterTestUtil.newActorSystem(configWithAkkaUdp)
+
+        // have a node using the "pekko" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+
+    "allow a node using the pekko protocol to join the cluster (tcp)" taggedAs 
LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // create the first node with the "akka" protocol
+        clusterTestUtil.newActorSystem(configWithAkkaTcp)
+
+        // have a node using the "pekko" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+
+    "allow a node using the pekko protocol to join the cluster (netty)" 
taggedAs LongRunningTest in {
+
+      val clusterTestUtil = new ClusterTestUtil(system.name)
+      try {
+        // create the first node with the "akka" protocol
+        clusterTestUtil.newActorSystem(configWithAkkaNetty)
+
+        // have a node using the "pekko" protocol join
+        val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty)
+        clusterTestUtil.formCluster()
+
+        awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting 
joining node to be 'Up'")
+      } finally {
+        clusterTestUtil.shutdownAll()
+      }
+    }
+  }
+}
diff --git a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala 
b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
index 8581247114..16811bc837 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
@@ -225,11 +225,12 @@ private[remote] class Remoting(_system: 
ExtendedActorSystem, _provider: RemoteAc
             Await.result(addressesPromise.future, StartupTimeout.duration)
           if (transports.isEmpty) throw new RemoteTransportException("No 
transport drivers were loaded.", null)
 
-          transportMapping = transports
+          val mapping = transports
             .groupBy {
               case (transport, _) => transport.schemeIdentifier
             }
             .map { case (k, v) => k -> v.toSet }
+          transportMapping = addProtocolsToMap(mapping)
 
           defaultAddress = transports.head._2
           addresses = transports.map { _._2 }.toSet
@@ -296,6 +297,21 @@ private[remote] class Remoting(_system: 
ExtendedActorSystem, _provider: RemoteAc
         }
     }
   }
+
+  private def addProtocolsToMap(
+      map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, 
Set[(PekkoProtocolTransport, Address)]] = {
+    if (AcceptProtocolNames.size > 1) {
+      map.flatMap { case (protocol, transports) =>
+        val tcpProtocol = protocol.endsWith(".tcp")
+        AcceptProtocolNames.map { newProtocol =>
+          if (tcpProtocol)
+            s"$newProtocol.tcp" -> transports
+          else
+            newProtocol -> transports
+        }
+      }
+    } else map
+  }
 }
 
 /**
@@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: 
LoggingAdapter)
     }
 
     OneForOneStrategy(loggingEnabled = false) {
-      case InvalidAssociation(localAddress, remoteAddress, reason, 
disassiciationInfo) =>
+      case InvalidAssociation(localAddress, remoteAddress, reason, 
disassociationInfo) =>
         keepQuarantinedOr(remoteAddress) {
           val causedBy = if (reason.getCause == null) "" else s"Caused by: 
[${reason.getCause.getMessage}]"
           log.warning(
@@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: 
LoggingAdapter)
             causedBy)
           endpoints.markAsFailed(sender(), Deadline.now + 
settings.RetryGateClosedFor)
         }
-        disassiciationInfo.foreach {
+        disassociationInfo.foreach {
           case AssociationHandle.Quarantined =>
             
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress,
 remoteAddress))
           case _ => // do nothing


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to