This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new f2eb52af1d feat: Make it possible to define appVersion later (#2947)
f2eb52af1d is described below

commit f2eb52af1dfa29762d5b34964b1bca2cb663f060
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 9 09:37:41 2026 +0100

    feat: Make it possible to define appVersion later (#2947)
    
    * Port akka-core#31934: Make it possible to define appVersion later
    
    Motivation:
    Alternative to config app-version that is defined before ActorSystem 
startup.
    If the version is read from an external system (e.g. Kubernetes) this 
feature
    makes it possible to define it after system startup but before joining.
    
    Modification:
    - Added SetAppVersionLater/SetAppVersion messages to ClusterUserAction
    - Added laterAppVersion state and setAppVersionLater/setAppVersion private
      methods to ClusterCoreDaemon
    - Updated uninitialized, tryingToJoin, initialized receive handlers
    - Updated join() to defer when appVersion not yet available
    - Updated joining() to handle laterAppVersion for self
    - Added setAppVersionLater Scala + Java API to Cluster.scala
    - Added SetAppVersionLater case class + Java factory to 
cluster-typed/Cluster.scala
    - Added SetAppVersionLater handling in AdaptedClusterImpl.managerBehavior
    - Added AppVersionSpec multi-JVM test
    
    Result:
    Users can now supply appVersion from an external source (e.g. Kubernetes) 
after
    ActorSystem startup but before joining the cluster.
    
    Tests:
    - scalafmt: not installed, skipped
    - sbt: not installed, skipped
    
    References:
    Refs https://github.com/akka/akka-core/pull/31934
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/27cc73f3-2831-4301-8cd0-af3116012209
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Address review feedback: fix double space in comment and improve test 
timing
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/27cc73f3-2831-4301-8cd0-af3116012209
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../org/apache/pekko/cluster/typed/Cluster.scala   |  28 +++++
 .../typed/internal/AdaptedClusterImpl.scala        |   4 +
 .../scala/org/apache/pekko/cluster/Cluster.scala   |  36 +++++++
 .../org/apache/pekko/cluster/ClusterDaemon.scala   | 114 +++++++++++++++++++--
 .../org/apache/pekko/cluster/AppVersionSpec.scala  |  79 ++++++++++++++
 5 files changed, 250 insertions(+), 11 deletions(-)

diff --git 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
index e2fdf08a76..2bba600929 100644
--- a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
+++ b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/Cluster.scala
@@ -13,7 +13,10 @@
 
 package org.apache.pekko.cluster.typed
 
+import java.util.concurrent.CompletionStage
+
 import scala.collection.immutable
+import scala.concurrent.Future
 
 import org.apache.pekko
 import pekko.actor.Address
@@ -24,6 +27,7 @@ import pekko.cluster._
 import pekko.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState }
 import pekko.cluster.typed.internal.AdapterClusterImpl
 import pekko.japi.Util
+import pekko.util.Version
 
 /**
  * Messages for subscribing to changes in the cluster state
@@ -128,6 +132,30 @@ final case class JoinSeedNodes(seedNodes: 
immutable.Seq[Address]) extends Cluste
 
 }
 
+/**
+ * Scala API: If the `appVersion` is read from an external system (e.g. 
Kubernetes) it can be defined after
+ * system startup but before joining by completing the `appVersion` `Future`. 
In that case,
+ * `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]]. 
It's fine to send
+ * `Join` or `JoinSeedNodes` immediately afterwards (before the `Future` is 
completed). The join will
+ * then wait for the `appVersion` to be completed.
+ */
+final case class SetAppVersionLater(appVersion: Future[Version]) extends 
ClusterCommand
+
+object SetAppVersionLater {
+
+  /**
+   * Java API: If the `appVersion` is read from an external system (e.g. 
Kubernetes) it can be defined after
+   * system startup but before joining by completing the `appVersion` 
`CompletionStage`. In that case,
+   * `SetAppVersionLater` should be sent before [[Join]] or [[JoinSeedNodes]]. 
It's fine to send
+   * `Join` or `JoinSeedNodes` immediately afterwards (before the 
`CompletionStage` is completed). The join will
+   * then wait for the `appVersion` to be completed.
+   */
+  def create(appVersion: CompletionStage[Version]): SetAppVersionLater = {
+    import scala.jdk.FutureConverters._
+    SetAppVersionLater(appVersion.asScala)
+  }
+}
+
 /**
  * Send command to issue state transition to LEAVING for the node specified by 
'address'.
  * The member will go through the status changes [[MemberStatus]] `Leaving` 
(not published to
diff --git 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
index e0eefc354c..2f4dd46874 100644
--- 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
+++ 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/typed/internal/AdaptedClusterImpl.scala
@@ -142,6 +142,10 @@ private[pekko] object AdapterClusterImpl {
         adaptedCluster.joinSeedNodes(addresses)
         Behaviors.same
 
+      case SetAppVersionLater(version) =>
+        adaptedCluster.setAppVersionLater(version)
+        Behaviors.same
+
       case PrepareForFullClusterShutdown =>
         adaptedCluster.prepareForFullClusterShutdown()
         Behaviors.same
diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
index b778252232..40ccac5711 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala
@@ -14,13 +14,17 @@
 package org.apache.pekko.cluster
 
 import java.io.Closeable
+import java.util.concurrent.CompletionStage
 import java.util.concurrent.ThreadFactory
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.annotation.varargs
 import scala.collection.immutable
 import scala.concurrent.{ Await, ExecutionContext }
+import scala.concurrent.Future
 import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
 import scala.util.control.NonFatal
 
 import org.apache.pekko
@@ -36,6 +40,7 @@ import pekko.event.MarkerLoggingAdapter
 import pekko.japi.Util
 import pekko.pattern._
 import pekko.remote.{ UniqueAddress => _, _ }
+import pekko.util.Version
 
 import com.typesafe.config.{ Config, ConfigFactory }
 
@@ -369,6 +374,37 @@ class Cluster(val system: ExtendedActorSystem) extends 
Extension {
   def joinSeedNodes(seedNodes: java.util.List[Address]): Unit =
     joinSeedNodes(Util.immutableSeq(seedNodes))
 
+  /**
+   * Scala API: If the `appVersion` is read from an external system (e.g. 
Kubernetes) it can be defined after
+   * system startup but before joining by completing the `appVersion` 
`Future`. In that case, `setAppVersionLater`
+   * should be called before calling `join` or `joinSeedNodes`. It's fine to 
call `join` or `joinSeedNodes`
+   * immediately afterwards (before the `Future` is completed). The join will 
then wait for the `appVersion`
+   * to be completed.
+   */
+  def setAppVersionLater(appVersion: Future[Version]): Unit = {
+    clusterCore ! ClusterUserAction.SetAppVersionLater
+    import system.dispatcher
+    appVersion.onComplete {
+      case Success(version) =>
+        clusterCore ! ClusterUserAction.SetAppVersion(version)
+      case Failure(exc) =>
+        logWarning("Later appVersion failed. Fallback to configured appVersion 
[{}]. {}", settings.AppVersion, exc)
+        clusterCore ! ClusterUserAction.SetAppVersion(settings.AppVersion)
+    }
+  }
+
+  /**
+   * Java API: If the `appVersion` is read from an external system (e.g. 
Kubernetes) it can be defined after
+   * system startup but before joining by completing the `appVersion` 
`CompletionStage`. In that case,
+   * `setAppVersionLater` should be called before calling `join` or 
`joinSeedNodes`. It's fine to call
+   * `join` or `joinSeedNodes` immediately afterwards (before the 
`CompletionStage` is completed). The join will
+   * then wait for the `appVersion` to be completed.
+   */
+  def setAppVersionLater(appVersion: CompletionStage[Version]): Unit = {
+    import scala.jdk.FutureConverters._
+    setAppVersionLater(appVersion.asScala)
+  }
+
   /**
    * Send command to issue state transition to LEAVING for the node specified 
by 'address'.
    * The member will go through the status changes [[MemberStatus]] `Leaving` 
(not published to
diff --git 
a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala 
b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
index 1b085cafa6..3bc92fde56 100644
--- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
+++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala
@@ -18,6 +18,8 @@ import scala.collection.immutable
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
 import scala.util.control.NonFatal
 
 import org.apache.pekko
@@ -74,6 +76,19 @@ private[cluster] object ClusterUserAction {
    */
   @SerialVersionUID(1L)
   case object PrepareForShutdown extends ClusterMessage
+
+  /**
+   * The `appVersion` is defined after system startup but before joining.
+   * The `appVersion` is defined via the `SetAppVersion` message.
+   * Subsequent `JoinTo` will be deferred until after `SetAppVersion` has been
+   * received.
+   */
+  case object SetAppVersionLater
+
+  /**
+   * Command to set the `appVersion` after system startup but before joining.
+   */
+  final case class SetAppVersion(appVersion: Version)
 }
 
 /**
@@ -400,6 +415,8 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
   }
   var exitingConfirmed = Set.empty[UniqueAddress]
 
+  var laterAppVersion: Option[Promise[Version]] = None
+
   /**
    * Looks up and returns the remote cluster command connection for the 
specific address.
    */
@@ -489,6 +506,10 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
       case JoinSeedNodes(newSeedNodes) =>
         resetJoinSeedNodesDeadline()
         joinSeedNodes(newSeedNodes)
+      case ClusterUserAction.SetAppVersionLater =>
+        setAppVersionLater()
+      case ClusterUserAction.SetAppVersion(version) =>
+        setAppVersion(version)
       case msg: SubscriptionMessage =>
         publisher.forward(msg)
       case Welcome(from, gossip) =>
@@ -512,6 +533,10 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
         resetJoinSeedNodesDeadline()
         becomeUninitialized()
         joinSeedNodes(newSeedNodes)
+      case ClusterUserAction.SetAppVersionLater =>
+        setAppVersionLater()
+      case ClusterUserAction.SetAppVersion(version) =>
+        setAppVersion(version)
       case msg: SubscriptionMessage => publisher.forward(msg)
       case _: Tick                  =>
         if (joinSeedNodesDeadline.exists(_.isOverdue()))
@@ -531,6 +556,20 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
     }
   }
 
+  private def setAppVersionLater(): Unit = {
+    laterAppVersion match {
+      case Some(_) => // already set, ignore duplicate
+      case None    => laterAppVersion = Some(Promise())
+    }
+  }
+
+  private def setAppVersion(version: Version): Unit = {
+    laterAppVersion match {
+      case Some(promise) => promise.trySuccess(version)
+      case None          => laterAppVersion = Some(Promise().success(version))
+    }
+  }
+
   private def joinSeedNodesWasUnsuccessful(): Unit = {
     logWarning(
       "Joining of seed-nodes [{}] was unsuccessful after configured " +
@@ -588,6 +627,10 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
         logInfo("Trying to join [{}] when already part of a cluster, 
ignoring", address)
       case JoinSeedNodes(nodes) =>
         logInfo("Trying to join seed nodes [{}] when already part of a 
cluster, ignoring", nodes.mkString(", "))
+      case ClusterUserAction.SetAppVersionLater =>
+        logInfo("Trying to set appVersion later when already part of a 
cluster, ignoring")
+      case ClusterUserAction.SetAppVersion(version) =>
+        logInfo("Trying to set appVersion [{}] when already part of a cluster, 
ignoring", version)
       case ExitingConfirmed(address) => receiveExitingConfirmed(address)
     }: Actor.Receive).orElse(receiveExitingCompleted)
 
@@ -746,17 +789,46 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
       // to support manual join when joining to seed nodes is stuck (no seed 
nodes available)
       stopSeedNodeProcess()
 
-      if (address == selfAddress) {
-        becomeInitialized()
-        joining(selfUniqueAddress, cluster.selfRoles, 
cluster.settings.AppVersion)
-      } else {
-        val joinDeadline = RetryUnsuccessfulJoinAfter match {
-          case d: FiniteDuration => Some(Deadline.now + d)
-          case _                 => None
+      val appVersionOpt = laterAppVersion match {
+        case None =>
+          logDebug("Using appVersion [{}] from config.", 
cluster.settings.AppVersion)
+          Some(cluster.settings.AppVersion)
+        case Some(promise) =>
+          promise.future.value match {
+            case Some(Success(version)) =>
+              logDebug("Using appVersion [{}] from completed setAppVersion 
Future.", version)
+              Some(version)
+            case Some(Failure(exc)) =>
+              logError("Can't join because later appVersion was completed with 
failure: {}", exc)
+              None
+            case None =>
+              logDebug(
+                "appVersion from setAppVersion Future is not completed yet. 
Will continue the join to " +
+                "[{}] when the appVersion Future has been completed.",
+                address)
+              import pekko.pattern.pipe
+              // easiest to just try again via JoinTo when the promise has 
been completed
+              val pipeMessage = promise.future.map(_ => 
ClusterUserAction.JoinTo(address)).recover {
+                case _ => ClusterUserAction.JoinTo(address)
+              }
+              pipe(pipeMessage).to(self)
+              None
+          }
+      }
+
+      appVersionOpt.foreach { appVersion =>
+        if (address == selfAddress) {
+          becomeInitialized()
+          joining(selfUniqueAddress, cluster.selfRoles, appVersion)
+        } else {
+          val joinDeadline = RetryUnsuccessfulJoinAfter match {
+            case d: FiniteDuration => Some(Deadline.now + d)
+            case _                 => None
+          }
+          context.become(tryingToJoin(address, joinDeadline))
+          logDebug("Trying to join [{}]", address)
+          clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles, 
appVersion)
         }
-        context.become(tryingToJoin(address, joinDeadline))
-        logDebug("Trying to join [{}]", address)
-        clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles, 
cluster.settings.AppVersion)
       }
     }
   }
@@ -777,6 +849,15 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
    * current gossip state, including the new joining member.
    */
   def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: 
Version): Unit = {
+    def isSelfAppVersionDefined = laterAppVersion match {
+      case None          => true
+      case Some(promise) =>
+        promise.future.value match {
+          case None    => false
+          case Some(v) => v.isSuccess
+        }
+    }
+
     if (!preparingForShutdown) {
       val selfStatus = latestGossip.member(selfUniqueAddress).status
       if (!acceptedProtocols.contains(joiningNode.address.protocol))
@@ -794,6 +875,11 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
           "Trying to join [{}] to [{}] member, ignoring. Use a member that is 
Up instead.",
           joiningNode,
           selfStatus)
+      else if (laterAppVersion.nonEmpty && !isSelfAppVersionDefined)
+        logInfo(
+          "Trying to join [{}] but [{}] has not defined appVersion yet, 
ignoring. Try again later.",
+          joiningNode,
+          selfAddress)
       else {
         val localMembers = latestGossip.members
 
@@ -829,10 +915,16 @@ private[cluster] class ClusterCoreDaemon(publisher: 
ActorRef, joinConfigCompatCh
 
             // add joining node as Joining
             // add self in case someone else joins before self has joined (Set 
discards duplicates)
+            val selfAppVersion = laterAppVersion match {
+              case None          => cluster.settings.AppVersion
+              case Some(promise) =>
+                // promise is known to be completed, checked above
+                promise.future.value.get.get
+            }
             val newMembers = localMembers + Member(joiningNode, roles, 
appVersion) + Member(
               selfUniqueAddress,
               cluster.selfRoles,
-              cluster.settings.AppVersion)
+              selfAppVersion)
             val newGossip = latestGossip.copy(members = newMembers)
 
             updateLatestGossip(newGossip)
diff --git 
a/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala 
b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala
new file mode 100644
index 0000000000..67da1ba8e6
--- /dev/null
+++ b/cluster/src/multi-jvm/scala/org/apache/pekko/cluster/AppVersionSpec.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster
+
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.apache.pekko.remote.testkit.MultiNodeConfig
+import org.apache.pekko.util.Version
+
+object AppVersionMultiJvmSpec extends MultiNodeConfig {
+  val first = role("first")
+  val second = role("second")
+
+  commonConfig(debugConfig(on = 
false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
+}
+
+class AppVersionMultiJvmNode1 extends AppVersionSpec
+class AppVersionMultiJvmNode2 extends AppVersionSpec
+
+abstract class AppVersionSpec extends 
MultiNodeClusterSpec(AppVersionMultiJvmSpec) {
+
+  import AppVersionMultiJvmSpec._
+
+  "Later appVersion" must {
+    "be used when joining" in {
+      val laterVersion = Promise[Version]()
+      cluster.setAppVersionLater(laterVersion.future)
+      // ok to try to join immediately
+      runOn(first) {
+        cluster.join(first)
+        // not joining until laterVersion has been completed
+        val until = Deadline.now + 300.milliseconds
+        while (!until.isOverdue()) {
+          cluster.selfMember.status should ===(MemberStatus.Removed)
+          Thread.sleep(50)
+        }
+        laterVersion.trySuccess(Version("2"))
+        awaitAssert {
+          cluster.selfMember.status should ===(MemberStatus.Up)
+          cluster.selfMember.appVersion should ===(Version("2"))
+        }
+      }
+      enterBarrier("first-joined")
+
+      runOn(second) {
+        cluster.joinSeedNodes(List(address(first), address(second)))
+        // not joining until laterVersion has been completed
+        val until = Deadline.now + 300.milliseconds
+        while (!until.isOverdue()) {
+          cluster.selfMember.status should ===(MemberStatus.Removed)
+          Thread.sleep(50)
+        }
+        laterVersion.trySuccess(Version("3"))
+        awaitAssert {
+          cluster.selfMember.status should ===(MemberStatus.Up)
+          cluster.selfMember.appVersion should ===(Version("3"))
+        }
+      }
+      enterBarrier("second-joined")
+
+      cluster.state.members.find(_.address == address(first)).get.appVersion 
should ===(Version("2"))
+      cluster.state.members.find(_.address == address(second)).get.appVersion 
should ===(Version("3"))
+
+      enterBarrier("after-1")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to