This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.1.x by this push:
new 38facf752b reimplement fix for akka/pekko cluster (#1594) (#1664)
38facf752b is described below
commit 38facf752b4e7d5fb43ed7c671eb91888b41bd10
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Jan 2 14:04:33 2025 +0100
reimplement fix for akka/pekko cluster (#1594) (#1664)
* Revert "revert #1568 due to test failures (#1587)"
This reverts commit 7af03e5215dc69df9702803dba7f22d94b49ead4.
* temp run nightly test in this PR
* no need for square brackets because the set print adds them
* logging to find issue
* support tcp protocols
* Update ClusterDaemon.scala
* remove temp logging
* try to fix issue in Remoting
* extra tests
* more tests
* ignore udp tests
* try to make tests tidy up after failures
* Update MixedProtocolClusterSpec.scala
* Update MixedProtocolClusterSpec.scala
* run main cluster tests for PR
---
.github/workflows/build-test-prValidation.yml | 47 +++++
.../org/apache/pekko/cluster/ClusterDaemon.scala | 29 ++--
.../pekko/cluster/MixedProtocolClusterSpec.scala | 192 +++++++++++++++++++++
.../scala/org/apache/pekko/remote/Remoting.scala | 22 ++-
4 files changed, 275 insertions(+), 15 deletions(-)
diff --git a/.github/workflows/build-test-prValidation.yml
b/.github/workflows/build-test-prValidation.yml
index 1138fe1eec..f3d9a18c18 100644
--- a/.github/workflows/build-test-prValidation.yml
+++ b/.github/workflows/build-test-prValidation.yml
@@ -101,6 +101,53 @@ jobs:
-Dpekko.log.timestamps=true \
validatePullRequest
+ pekko-classic-remoting-tests:
+ name: Pekko Classic Remoting Tests
+ runs-on: ubuntu-22.04
+ if: github.repository == 'apache/pekko'
+ strategy:
+ fail-fast: false
+ matrix:
+ command:
+ - cluster/test distributed-data/test cluster-tools/test
cluster-metrics/test
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ # we don't know what commit the last tag was it's safer to get
entire repo so previousStableVersion resolves
+ fetch-depth: 0
+ fetch-tags: true
+
+ - name: Setup Java 11
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: 11
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@v1
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@v6
+
+ - name: Enable jvm-opts
+ run: cp .jvmopts-ci .jvmopts
+
+ - name: sbt ${{ matrix.command }}
+ env:
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ # note that this is not running any multi-jvm tests because
multi-in-test=false
+ run: |-
+ sbt \
+ -Djava.security.egd=file:/dev/./urandom \
+ -Dpekko.remote.artery.enabled=off \
+ -Dpekko.test.timefactor=2 \
+ -Dpekko.actor.testkit.typed.timefactor=2 \
+ -Dpekko.test.tags.exclude=gh-exclude,timing \
+ -Dpekko.test.multi-in-test=false \
+ -Dpekko.cluster.assert=on \
+ clean ${{ matrix.command }}
+
jdk-21-extra-tests:
name: Java 21 Extra Tests (including all tests that need Java 9+)
runs-on: ubuntu-22.04
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 34320067be..479d58e97b 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -13,13 +13,13 @@
package org.apache.pekko.cluster
+import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
-import scala.annotation.nowarn
import com.typesafe.config.Config
import org.apache.pekko
@@ -30,13 +30,11 @@ import pekko.annotation.InternalApi
import pekko.cluster.ClusterEvent._
import pekko.cluster.MemberStatus._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
-import pekko.event.ActorWithLogClass
-import pekko.event.Logging
+import pekko.event.{ ActorWithLogClass, Logging }
import pekko.pattern.ask
-import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent }
+import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent,
RemoteSettings }
import pekko.remote.artery.QuarantinedEvent
-import pekko.util.Timeout
-import pekko.util.Version
+import pekko.util.{ Timeout, Version }
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@@ -365,6 +363,13 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
val statsEnabled = PublishStatsInterval.isFinite
var gossipStats = GossipStats()
+ val acceptedProtocols: Set[String] = {
+ val remoteSettings: RemoteSettings = new
RemoteSettings(context.system.settings.config)
+ val initSet = remoteSettings.AcceptProtocolNames
+ val tcpSet = initSet.map(protocol => s"$protocol.tcp")
+ initSet ++ tcpSet
+ }
+
var seedNodes = SeedNodes
var seedNodeProcess: Option[ActorRef] = None
var seedNodeProcessCounter = 0 // for unique names
@@ -701,10 +706,10 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
* which will reply with a `Welcome` message.
*/
def join(address: Address): Unit = {
- if (address.protocol != selfAddress.protocol)
+ if (!acceptedProtocols.contains(address.protocol))
logWarning(
- "Trying to join member with wrong protocol, but was ignored, expected
[{}] but was [{}]",
- selfAddress.protocol,
+ "Trying to join member with wrong protocol, but was ignored, expected
any of {} but was [{}]",
+ acceptedProtocols,
address.protocol)
else if (address.system != selfAddress.system)
logWarning(
@@ -750,10 +755,10 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion:
Version): Unit = {
if (!preparingForShutdown) {
val selfStatus = latestGossip.member(selfUniqueAddress).status
- if (joiningNode.address.protocol != selfAddress.protocol)
+ if (!acceptedProtocols.contains(joiningNode.address.protocol))
logWarning(
- "Member with wrong protocol tried to join, but was ignored, expected
[{}] but was [{}]",
- selfAddress.protocol,
+ "Member with wrong protocol tried to join, but was ignored, expected
any of {} but was [{}]",
+ acceptedProtocols,
joiningNode.address.protocol)
else if (joiningNode.address.system != selfAddress.system)
logWarning(
diff --git
a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
new file mode 100644
index 0000000000..f3c8c73adb
--- /dev/null
+++
b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.cluster
+
+import com.typesafe.config.{ Config, ConfigFactory }
+
+import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec }
+
+object MixedProtocolClusterSpec {
+
+ val baseConfig: Config =
+ ConfigFactory.parseString("""
+ pekko.actor.provider = "cluster"
+ pekko.coordinated-shutdown.terminate-actor-system = on
+
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.classic.netty.tcp.port = 0
+ pekko.remote.artery.advanced.aeron.idle-cpu-level = 3
+ pekko.remote.accept-protocol-names = ["pekko", "akka"]
+
+ pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+ pekko.cluster.configuration-compatibility-check.enforce-on-join = off
+ """)
+
+ val configWithUdp: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.artery.transport = "aeron-udp"
+ """).withFallback(baseConfig)
+
+ val configWithPekkoUdp: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "pekko"
+ """).withFallback(configWithUdp)
+
+ val configWithAkkaUdp: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "akka"
+ """).withFallback(configWithUdp)
+
+ val configWithPekkoTcp: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "pekko"
+ """).withFallback(baseConfig)
+
+ val configWithAkkaTcp: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "akka"
+ """).withFallback(baseConfig)
+
+ val configWithNetty: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.artery.enabled = false
+ pekko.remote.classic {
+ enabled-transports = ["pekko.remote.classic.netty.tcp"]
+ }
+ """).withFallback(baseConfig)
+
+ val configWithPekkoNetty: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "pekko"
+ """).withFallback(configWithNetty)
+
+ val configWithAkkaNetty: Config =
+ ConfigFactory.parseString("""
+ pekko.remote.protocol-name = "akka"
+ """).withFallback(configWithNetty)
+}
+
+class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit {
+
+ import MixedProtocolClusterSpec._
+
+ "A node using the akka protocol" must {
+
+ "be allowed to join a cluster with a node using the pekko protocol (udp)"
taggedAs LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // start the first node with the "pekko" protocol
+ clusterTestUtil.newActorSystem(configWithPekkoUdp)
+
+ // have a node using the "akka" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+
+ "be allowed to join a cluster with a node using the pekko protocol (tcp)"
taggedAs LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // start the first node with the "pekko" protocol
+ clusterTestUtil.newActorSystem(configWithPekkoTcp)
+
+ // have a node using the "akka" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+
+ "be allowed to join a cluster with a node using the pekko protocol
(netty)" taggedAs LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // start the first node with the "pekko" protocol
+ clusterTestUtil.newActorSystem(configWithPekkoNetty)
+
+ // have a node using the "akka" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+
+ "allow a node using the pekko protocol to join the cluster (udp)" taggedAs
LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // create the first node with the "akka" protocol
+ clusterTestUtil.newActorSystem(configWithAkkaUdp)
+
+ // have a node using the "pekko" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+
+ "allow a node using the pekko protocol to join the cluster (tcp)" taggedAs
LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // create the first node with the "akka" protocol
+ clusterTestUtil.newActorSystem(configWithAkkaTcp)
+
+ // have a node using the "pekko" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+
+ "allow a node using the pekko protocol to join the cluster (netty)"
taggedAs LongRunningTest in {
+
+ val clusterTestUtil = new ClusterTestUtil(system.name)
+ try {
+ // create the first node with the "akka" protocol
+ clusterTestUtil.newActorSystem(configWithAkkaNetty)
+
+ // have a node using the "pekko" protocol join
+ val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty)
+ clusterTestUtil.formCluster()
+
+ awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting
joining node to be 'Up'")
+ } finally {
+ clusterTestUtil.shutdownAll()
+ }
+ }
+ }
+}
diff --git a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
index 8581247114..16811bc837 100644
--- a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
+++ b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala
@@ -225,11 +225,12 @@ private[remote] class Remoting(_system:
ExtendedActorSystem, _provider: RemoteAc
Await.result(addressesPromise.future, StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No
transport drivers were loaded.", null)
- transportMapping = transports
+ val mapping = transports
.groupBy {
case (transport, _) => transport.schemeIdentifier
}
.map { case (k, v) => k -> v.toSet }
+ transportMapping = addProtocolsToMap(mapping)
defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet
@@ -296,6 +297,21 @@ private[remote] class Remoting(_system:
ExtendedActorSystem, _provider: RemoteAc
}
}
}
+
+ private def addProtocolsToMap(
+ map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String,
Set[(PekkoProtocolTransport, Address)]] = {
+ if (AcceptProtocolNames.size > 1) {
+ map.flatMap { case (protocol, transports) =>
+ val tcpProtocol = protocol.endsWith(".tcp")
+ AcceptProtocolNames.map { newProtocol =>
+ if (tcpProtocol)
+ s"$newProtocol.tcp" -> transports
+ else
+ newProtocol -> transports
+ }
+ }
+ } else map
+ }
}
/**
@@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log:
LoggingAdapter)
}
OneForOneStrategy(loggingEnabled = false) {
- case InvalidAssociation(localAddress, remoteAddress, reason,
disassiciationInfo) =>
+ case InvalidAssociation(localAddress, remoteAddress, reason,
disassociationInfo) =>
keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by:
[${reason.getCause.getMessage}]"
log.warning(
@@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log:
LoggingAdapter)
causedBy)
endpoints.markAsFailed(sender(), Deadline.now +
settings.RetryGateClosedFor)
}
- disassiciationInfo.foreach {
+ disassociationInfo.foreach {
case AssociationHandle.Quarantined =>
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress,
remoteAddress))
case _ => // do nothing
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]