This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f2eb52af1d feat: Make it possible to define appVersion later (#2947)
f2eb52af1d is described below
commit f2eb52af1dfa29762d5b34964b1bca2cb663f060
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 9 09:37:41 2026 +0100
feat: Make it possible to define appVersion later (#2947)
* Port akka-core#31934: Make it possible to define appVersion later
Motivation:
Alternative to config app-version that is defined before ActorSystem
startup.
If the version is read from an external system (e.g. Kubernetes) this
feature
makes it possible to define it after system startup but before joining.
Modification:
- Added SetAppVersionLater/SetAppVersion messages to ClusterUserAction
- Added laterAppVersion state and setAppVersionLater/setAppVersion private
methods to ClusterCoreDaemon
- Updated uninitialized, tryingToJoin, initialized receive handlers
- Updated join() to defer when appVersion not yet available
- Updated joining() to handle laterAppVersion for self
- Added setAppVersionLater Scala + Java API to Cluster.scala
- Added SetAppVersionLater case class + Java factory to
cluster-typed/Cluster.scala
- Added SetAppVersionLater handling in AdaptedClusterImpl.managerBehavior
- Added AppVersionSpec multi-JVM test
Result:
Users can now supply appVersion from an external source (e.g. Kubernetes)
after
ActorSystem startup but before joining the cluster.
Tests:
- scalafmt: not installed, skipped
- sbt: not installed, skipped
References:
Refs https://github.com/akka/akka-core/pull/31934
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/27cc73f3-2831-4301-8cd0-af3116012209
Co-authored-by: pjfanning <[email protected]>
* Address review feedback: fix double space in comment and improve test
timing
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/27cc73f3-2831-4301-8cd0-af3116012209
Co-authored-by: pjfanning <[email protected]>
* scalafmt
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../org/apache/pekko/cluster/typed/Cluster.scala | 28 +++++
.../typed/internal/AdaptedClusterImpl.scala | 4 +
.../scala/org/apache/pekko/cluster/Cluster.scala | 36 +++++++
.../org/apache/pekko/cluster/ClusterDaemon.scala | 114 +++++++++++++++++++--
.../org/apache/pekko/cluster/AppVersionSpec.scala | 79 ++++++++++++++
5 files changed, 250 insertions(+), 11 deletions(-)
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
index e2fdf08a76..2bba600929 100644
--- a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
+++ b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
@@ -13,7 +13,10 @@
package org.apache.pekko.cluster.typed
+import java.util.concurrent.CompletionStage
+
import scala.collection.immutable
+import scala.concurrent.Future
import org.apache.pekko
import pekko.actor.Address
@@ -24,6 +27,7 @@ import pekko.cluster._
import pekko.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState }
import pekko.cluster.typed.internal.AdapterClusterImpl
import pekko.japi.Util
+import pekko.util.Version
/**
* Messages for subscribing to changes in the cluster state
@@ -128,6 +132,30 @@ final case class JoinSeedNodes(seedNodes:
immutable.Seq[Address]) extends Cluste
}
+/**
+ * Scala API: If the `appVersion` is read from an external system (e.g.
Kubernetes) it can be defined after
+ * system startup but before joining by completing the `appVersion` `Future`.
In that case,
+ * `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]].
It's fine to send
+ * `Join` or `JoinSeedNodes` immediately afterwards (before the `Future` is
completed). The join will
+ * then wait for the `appVersion` to be completed.
+ */
+final case class SetAppVersionLater(appVersion: Future[Version]) extends
ClusterCommand
+
+object SetAppVersionLater {
+
+ /**
+ * Java API: If the `appVersion` is read from an external system (e.g.
Kubernetes) it can be defined after
+ * system startup but before joining by completing the `appVersion`
`CompletionStage`. In that case,
+ * `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]].
It's fine to send
+ * `Join` or `JoinSeedNodes` immediately afterwards (before the
`CompletionStage` is completed). The join will
+ * then wait for the `appVersion` to be completed.
+ */
+ def create(appVersion: CompletionStage[Version]): SetAppVersionLater = {
+ import scala.jdk.FutureConverters._
+ SetAppVersionLater(appVersion.asScala)
+ }
+}
+
/**
* Send command to issue state transition to LEAVING for the node specified by
'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving`
(not published to
diff --git
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
index e0eefc354c..2f4dd46874 100644
---
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
+++
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
@@ -142,6 +142,10 @@ private[pekko] object AdapterClusterImpl {
adaptedCluster.joinSeedNodes(addresses)
Behaviors.same
+ case SetAppVersionLater(version) =>
+ adaptedCluster.setAppVersionLater(version)
+ Behaviors.same
+
case PrepareForFullClusterShutdown =>
adaptedCluster.prepareForFullClusterShutdown()
Behaviors.same
diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
index b778252232..40ccac5711 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
@@ -14,13 +14,17 @@
package org.apache.pekko.cluster
import java.io.Closeable
+import java.util.concurrent.CompletionStage
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext }
+import scala.concurrent.Future
import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
import scala.util.control.NonFatal
import org.apache.pekko
@@ -36,6 +40,7 @@ import pekko.event.MarkerLoggingAdapter
import pekko.japi.Util
import pekko.pattern._
import pekko.remote.{ UniqueAddress => _, _ }
+import pekko.util.Version
import com.typesafe.config.{ Config, ConfigFactory }
@@ -369,6 +374,37 @@ class Cluster(val system: ExtendedActorSystem) extends
Extension {
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit =
joinSeedNodes(Util.immutableSeq(seedNodes))
+ /**
+ * Scala API: If the `appVersion` is read from an external system (e.g.
Kubernetes) it can be defined after
+ * system startup but before joining by completing the `appVersion`
`Future`. In that case, `setAppVersionLater`
+ * should be called before calling `join` or `joinSeedNodes`. It's fine to
call `join` or `joinSeedNodes`
+ * immediately afterwards (before the `Future` is completed). The join will
then wait for the `appVersion`
+ * to be completed.
+ */
+ def setAppVersionLater(appVersion: Future[Version]): Unit = {
+ clusterCore ! ClusterUserAction.SetAppVersionLater
+ import system.dispatcher
+ appVersion.onComplete {
+ case Success(version) =>
+ clusterCore ! ClusterUserAction.SetAppVersion(version)
+ case Failure(exc) =>
+ logWarning("Later appVersion failed. Fallback to configured appVersion
[{}]. {}", settings.AppVersion, exc)
+ clusterCore ! ClusterUserAction.SetAppVersion(settings.AppVersion)
+ }
+ }
+
+ /**
+ * Java API: If the `appVersion` is read from an external system (e.g.
Kubernetes) it can be defined after
+ * system startup but before joining by completing the `appVersion`
`CompletionStage`. In that case,
+ * `setAppVersionLater` should be called before calling `join` or
`joinSeedNodes`. It's fine to call
+ * `join` or `joinSeedNodes` immediately afterwards (before the
`CompletionStage` is completed). The join will
+ * then wait for the `appVersion` to be completed.
+ */
+ def setAppVersionLater(appVersion: CompletionStage[Version]): Unit = {
+ import scala.jdk.FutureConverters._
+ setAppVersionLater(appVersion.asScala)
+ }
+
/**
* Send command to issue state transition to LEAVING for the node specified
by 'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving`
(not published to
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 1b085cafa6..3bc92fde56 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -18,6 +18,8 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
import scala.util.control.NonFatal
import org.apache.pekko
@@ -74,6 +76,19 @@ private[cluster] object ClusterUserAction {
*/
@SerialVersionUID(1L)
case object PrepareForShutdown extends ClusterMessage
+
+ /**
+ * The `appVersion` is defined after system startup but before joining.
+ * The `appVersion` is defined via the `SetAppVersion` message.
+ * Subsequent `JoinTo` will be deferred until after `SetAppVersion` has been
+ * received.
+ */
+ case object SetAppVersionLater
+
+ /**
+ * Command to set the `appVersion` after system startup but before joining.
+ */
+ final case class SetAppVersion(appVersion: Version)
}
/**
@@ -400,6 +415,8 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
}
var exitingConfirmed = Set.empty[UniqueAddress]
+ var laterAppVersion: Option[Promise[Version]] = None
+
/**
* Looks up and returns the remote cluster command connection for the
specific address.
*/
@@ -489,6 +506,10 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
case JoinSeedNodes(newSeedNodes) =>
resetJoinSeedNodesDeadline()
joinSeedNodes(newSeedNodes)
+ case ClusterUserAction.SetAppVersionLater =>
+ setAppVersionLater()
+ case ClusterUserAction.SetAppVersion(version) =>
+ setAppVersion(version)
case msg: SubscriptionMessage =>
publisher.forward(msg)
case Welcome(from, gossip) =>
@@ -512,6 +533,10 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
resetJoinSeedNodesDeadline()
becomeUninitialized()
joinSeedNodes(newSeedNodes)
+ case ClusterUserAction.SetAppVersionLater =>
+ setAppVersionLater()
+ case ClusterUserAction.SetAppVersion(version) =>
+ setAppVersion(version)
case msg: SubscriptionMessage => publisher.forward(msg)
case _: Tick =>
if (joinSeedNodesDeadline.exists(_.isOverdue()))
@@ -531,6 +556,20 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
}
}
+ private def setAppVersionLater(): Unit = {
+ laterAppVersion match {
+ case Some(_) => // already set, ignore duplicate
+ case None => laterAppVersion = Some(Promise())
+ }
+ }
+
+ private def setAppVersion(version: Version): Unit = {
+ laterAppVersion match {
+ case Some(promise) => promise.trySuccess(version)
+ case None => laterAppVersion = Some(Promise().success(version))
+ }
+ }
+
private def joinSeedNodesWasUnsuccessful(): Unit = {
logWarning(
"Joining of seed-nodes [{}] was unsuccessful after configured " +
@@ -588,6 +627,10 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
logInfo("Trying to join [{}] when already part of a cluster,
ignoring", address)
case JoinSeedNodes(nodes) =>
logInfo("Trying to join seed nodes [{}] when already part of a
cluster, ignoring", nodes.mkString(", "))
+ case ClusterUserAction.SetAppVersionLater =>
+ logInfo("Trying to set appVersion later when already part of a
cluster, ignoring")
+ case ClusterUserAction.SetAppVersion(version) =>
+ logInfo("Trying to set appVersion [{}] when already part of a cluster,
ignoring", version)
case ExitingConfirmed(address) => receiveExitingConfirmed(address)
}: Actor.Receive).orElse(receiveExitingCompleted)
@@ -746,17 +789,46 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
// to support manual join when joining to seed nodes is stuck (no seed
nodes available)
stopSeedNodeProcess()
- if (address == selfAddress) {
- becomeInitialized()
- joining(selfUniqueAddress, cluster.selfRoles,
cluster.settings.AppVersion)
- } else {
- val joinDeadline = RetryUnsuccessfulJoinAfter match {
- case d: FiniteDuration => Some(Deadline.now + d)
- case _ => None
+ val appVersionOpt = laterAppVersion match {
+ case None =>
+ logDebug("Using appVersion [{}] from config.",
cluster.settings.AppVersion)
+ Some(cluster.settings.AppVersion)
+ case Some(promise) =>
+ promise.future.value match {
+ case Some(Success(version)) =>
+ logDebug("Using appVersion [{}] from completed setAppVersion
Future.", version)
+ Some(version)
+ case Some(Failure(exc)) =>
+ logError("Can't join because later appVersion was completed with
failure: {}", exc)
+ None
+ case None =>
+ logDebug(
+ "appVersion from setAppVersion Future is not completed yet.
Will continue the join to " +
+ "[{}] when the appVersion Future has been completed.",
+ address)
+ import pekko.pattern.pipe
+ // easiest to just try again via JoinTo when the promise has
been completed
+ val pipeMessage = promise.future.map(_ =>
ClusterUserAction.JoinTo(address)).recover {
+ case _ => ClusterUserAction.JoinTo(address)
+ }
+ pipe(pipeMessage).to(self)
+ None
+ }
+ }
+
+ appVersionOpt.foreach { appVersion =>
+ if (address == selfAddress) {
+ becomeInitialized()
+ joining(selfUniqueAddress, cluster.selfRoles, appVersion)
+ } else {
+ val joinDeadline = RetryUnsuccessfulJoinAfter match {
+ case d: FiniteDuration => Some(Deadline.now + d)
+ case _ => None
+ }
+ context.become(tryingToJoin(address, joinDeadline))
+ logDebug("Trying to join [{}]", address)
+ clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles,
appVersion)
}
- context.become(tryingToJoin(address, joinDeadline))
- logDebug("Trying to join [{}]", address)
- clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles,
cluster.settings.AppVersion)
}
}
}
@@ -777,6 +849,15 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
* current gossip state, including the new joining member.
*/
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion:
Version): Unit = {
+ def isSelfAppVersionDefined = laterAppVersion match {
+ case None => true
+ case Some(promise) =>
+ promise.future.value match {
+ case None => false
+ case Some(v) => v.isSuccess
+ }
+ }
+
if (!preparingForShutdown) {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (!acceptedProtocols.contains(joiningNode.address.protocol))
@@ -794,6 +875,11 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
"Trying to join [{}] to [{}] member, ignoring. Use a member that is
Up instead.",
joiningNode,
selfStatus)
+ else if (laterAppVersion.nonEmpty && !isSelfAppVersionDefined)
+ logInfo(
+ "Trying to join [{}] but [{}] has not defined appVersion yet,
ignoring. Try again later.",
+ joiningNode,
+ selfAddress)
else {
val localMembers = latestGossip.members
@@ -829,10 +915,16 @@ private[cluster] class ClusterCoreDaemon(publisher:
ActorRef, joinConfigCompatCh
// add joining node as Joining
// add self in case someone else joins before self has joined (Set
discards duplicates)
+ val selfAppVersion = laterAppVersion match {
+ case None => cluster.settings.AppVersion
+ case Some(promise) =>
+ // promise is known to be completed, checked above
+ promise.future.value.get.get
+ }
val newMembers = localMembers + Member(joiningNode, roles,
appVersion) + Member(
selfUniqueAddress,
cluster.selfRoles,
- cluster.settings.AppVersion)
+ selfAppVersion)
val newGossip = latestGossip.copy(members = newMembers)
updateLatestGossip(newGossip)
diff --git
a/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala
b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala
new file mode 100644
index 0000000000..67da1ba8e6
--- /dev/null
+++ b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster
+
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.apache.pekko.remote.testkit.MultiNodeConfig
+import org.apache.pekko.util.Version
+
+object AppVersionMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+
+ commonConfig(debugConfig(on =
false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
+}
+
+class AppVersionMultiJvmNode1 extends AppVersionSpec
+class AppVersionMultiJvmNode2 extends AppVersionSpec
+
+abstract class AppVersionSpec extends
MultiNodeClusterSpec(AppVersionMultiJvmSpec) {
+
+ import AppVersionMultiJvmSpec._
+
+ "Later appVersion" must {
+ "be used when joining" in {
+ val laterVersion = Promise[Version]()
+ cluster.setAppVersionLater(laterVersion.future)
+ // ok to try to join immediately
+ runOn(first) {
+ cluster.join(first)
+ // not joining until laterVersion has been completed
+ val until = Deadline.now + 300.milliseconds
+ while (!until.isOverdue()) {
+ cluster.selfMember.status should ===(MemberStatus.Removed)
+ Thread.sleep(50)
+ }
+ laterVersion.trySuccess(Version("2"))
+ awaitAssert {
+ cluster.selfMember.status should ===(MemberStatus.Up)
+ cluster.selfMember.appVersion should ===(Version("2"))
+ }
+ }
+ enterBarrier("first-joined")
+
+ runOn(second) {
+ cluster.joinSeedNodes(List(address(first), address(second)))
+ // not joining until laterVersion has been completed
+ val until = Deadline.now + 300.milliseconds
+ while (!until.isOverdue()) {
+ cluster.selfMember.status should ===(MemberStatus.Removed)
+ Thread.sleep(50)
+ }
+ laterVersion.trySuccess(Version("3"))
+ awaitAssert {
+ cluster.selfMember.status should ===(MemberStatus.Up)
+ cluster.selfMember.appVersion should ===(Version("3"))
+ }
+ }
+ enterBarrier("second-joined")
+
+ cluster.state.members.find(_.address == address(first)).get.appVersion
should ===(Version("2"))
+ cluster.state.members.find(_.address == address(second)).get.appVersion
should ===(Version("3"))
+
+ enterBarrier("after-1")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]