This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new 0d4a9c39 Port akka-management PR #1143: AppVersionRevision extension
for Kubernetes revision-based app versioning (#784)
0d4a9c39 is described below
commit 0d4a9c39d912834e61d35a9aed5aeb3d47246c63
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 14 21:38:03 2026 +0100
Port akka-management PR #1143: AppVersionRevision extension for Kubernetes
revision-based app versioning (#784)
* Port akka-management PR #1143: add AppVersionRevision extension and
related changes
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/ede30481-3334-486d-a699-64b8020b28f4
Co-authored-by: pjfanning <[email protected]>
* Fix spelling: 'nods' -> 'nodes' in app-version-revision-kubernetes-test.sh
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/ede30481-3334-486d-a699-64b8020b28f4
Co-authored-by: pjfanning <[email protected]>
* move file
* scalafmt
* don't yet run int tests as part of PR
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
... => integration-tests-app-version-revision.yml} | 21 +-
.../integration-tests-rolling-update-cr.yml | 1 -
docs/src/main/paradox/rolling-updates.md | 67 ++++++
.../pekko-cluster-app-value-revision.yml | 85 ++++++++
.../cluster/bootstrap/PodDeletionCostDemoApp.scala | 4 +
.../test-app-version-revision.sh | 10 +
.../app-version-revision-kubernetes-test.sh | 73 +++++++
.../cluster-bootstrap-internal.backwards.excludes | 23 ++
.../cluster/bootstrap/ClusterBootstrap.scala | 16 +-
.../kubernetes/AppVersionRevision.scala | 87 ++++++++
.../rollingupdate/kubernetes/KubernetesApi.scala | 17 ++
.../kubernetes/KubernetesApiImpl.scala | 100 +++++++++
.../kubernetes/KubernetesJsonSupport.scala | 44 ++++
.../kubernetes/AppVersionRevisionCompileOnly.java | 30 +++
.../AppVersionRevisionJavaCompileTest.java | 28 +++
.../kubernetes/AppVersionRevisionCompileOnly.scala | 29 +++
.../kubernetes/AppVersionRevisionSpec.scala | 242 +++++++++++++++++++++
.../PodDeletionCostAnnotatorCrSpec.scala | 2 +
18 files changed, 850 insertions(+), 29 deletions(-)
diff --git a/.github/workflows/integration-tests-rolling-update-cr.yml
b/.github/workflows/integration-tests-app-version-revision.yml
similarity index 72%
copy from .github/workflows/integration-tests-rolling-update-cr.yml
copy to .github/workflows/integration-tests-app-version-revision.yml
index 5b94a543..dc7e2804 100644
--- a/.github/workflows/integration-tests-rolling-update-cr.yml
+++ b/.github/workflows/integration-tests-app-version-revision.yml
@@ -1,10 +1,9 @@
-name: Integration test for Rolling Update CR Kubernetes
+name: Integration Tests for AppVersionRevision
on:
push:
branches:
- main
- - release-*
workflow_dispatch:
permissions:
@@ -12,7 +11,7 @@ permissions:
jobs:
integration-test:
- name: Integration Tests for Rolling Update CR Kubernetes
+ name: Integration Tests for AppVersionRevision
runs-on: ubuntu-22.04
if: github.repository == 'apache/pekko-management'
strategy:
@@ -29,15 +28,15 @@ jobs:
git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
git checkout scratch
- - name: Install sbt
- uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
-
- name: Setup Java 17
uses: actions/setup-java@v5
with:
distribution: temurin
java-version: 17
+ - name: Install sbt
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
- name: Cache Coursier cache
uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
@@ -53,15 +52,7 @@ jobs:
- name: Run Integration Tests
timeout-minutes: 15
run: |-
- echo 'Creating namespace'
- kubectl create namespace rolling
- echo 'Creating resources'
- kubectl apply -f ./rolling-update-kubernetes/pod-cost.yml
- echo 'Adding proxy port'
- kubectl proxy --port=8080 &
- echo 'Running tests'
- sbt "rolling-update-kubernetes-int-test/test"
- ./integration-test/rolling-update-kubernetes/test-cr.sh
+
./integration-test/rolling-update-kubernetes/test-app-version-revision.sh
- name: Print logs on failure
if: ${{ failure() }}
diff --git a/.github/workflows/integration-tests-rolling-update-cr.yml
b/.github/workflows/integration-tests-rolling-update-cr.yml
index 5b94a543..271ec0ac 100644
--- a/.github/workflows/integration-tests-rolling-update-cr.yml
+++ b/.github/workflows/integration-tests-rolling-update-cr.yml
@@ -4,7 +4,6 @@ on:
push:
branches:
- main
- - release-*
workflow_dispatch:
permissions:
diff --git a/docs/src/main/paradox/rolling-updates.md
b/docs/src/main/paradox/rolling-updates.md
index 84195c9d..e2206a27 100644
--- a/docs/src/main/paradox/rolling-updates.md
+++ b/docs/src/main/paradox/rolling-updates.md
@@ -129,3 +129,70 @@ that gives the default service user this role in `<YOUR
NAMESPACE>`.
This RBAC example covers only the permissions needed for this
`PodDeletionCost` extension specifically. However, usually you'll also be using
@ref:[Kubernetes API](bootstrap/kubernetes-api.md) for discovery and bootstrap
of your cluster, so you'll need to combine this with any other role required
already configured, either by keeping them separately or merging them into a
single role.
@@@
+
+## app-version from Deployment
+
+When using Cluster Sharding, it is
[recommended](https://pekko.apache.org/docs/pekko/current/additional/rolling-updates.html#cluster-sharding)
for rolling updates that you define an increasing `pekko.cluster.app-version`
configuration property for each roll out.
+
+This works well unless you use `kubectl rollout undo` which deploys the
previous ReplicaSet configuration which contains the previous value for that
config.
+
+To fix this, you can use `AppVersionRevision` to read the current annotation
`deployment.kubernetes.io/revision` (part of the ReplicaSet) from the
Kubernetes Deployment via the Kubernetes API which always increases, also
during a rollback:
+
+### Using
+
+The AppVersionRevision extension must be started, this can either be done
through config or programmatically.
+
+**Through config**
+
+Listing the `AppVersionRevision` extension among the autoloaded
`pekko.extensions` in `application.conf` will also cause it to autostart:
+
+```
+pekko.extensions =
["org.apache.pekko.rollingupdate.kubernetes.AppVersionRevision"]
+```
+
+If the extension configuration is incorrect, the autostart will log an error
and terminate the actor system.
+
+**Programmatically**
+
+Scala
+: @@snip
[AppVersionRevisionCompileOnly.scala](/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.scala)
{ #start }
+
+Java
+: @@snip
[AppVersionRevisionCompileOnly.java](/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.java)
{ #start }
+
+#### Configuration
+
+The following configuration is required, more details for each and additional
configurations can be found in
[reference.conf](https://github.com/apache/pekko-management/blob/main/rolling-update-kubernetes/src/main/resources/reference.conf):
+
+* `pekko.rollingupdate.kubernetes.pod-name`: this can be provided by setting
`KUBERNETES_POD_NAME` environment variable to `metadata.name` on the Kubernetes
container spec.
+
+Additionally, the pod annotator needs to know which namespace the pod belongs
to. By default, this will be detected by reading the namespace
+from the service account secret, in
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be
overridden by
+setting `pekko.rollingupdate.kubernetes.namespace` or by providing
`KUBERNETES_NAMESPACE` environment variable.
+
+#### Role based access control
+
+Make sure to provide access to the corresponding RBAC rules `apiGroups` and
`resources` like this:
+
+```yaml
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+rules:
+- apiGroups: ["apps", ""]
+ resources: ["pods", "replicasets"]
+ verbs: ["get", "list"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+subjects:
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: pod-reader
+ apiGroup: rbac.authorization.k8s.io
+```
diff --git
a/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-app-value-revision.yml
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-app-value-revision.yml
new file mode 100644
index 00000000..32511944
--- /dev/null
+++
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-app-value-revision.yml
@@ -0,0 +1,85 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app: pekko-rollingupdate-demo
+ name: pekko-rollingupdate-demo
+spec:
+ replicas: 3
+ selector:
+ matchLabels:
+ app: pekko-rollingupdate-demo
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 0
+ type: RollingUpdate
+
+ template:
+ metadata:
+ labels:
+ app: pekko-rollingupdate-demo
+ actorSystemName: pekko-rollingupdate-demo
+ spec:
+ containers:
+ - name: pekko-rollingupdate-demo
+ image: integration-test-rolling-update-kubernetes:1.3.3.7
+ # Remove for a real project, the image is picked up locally for the
integration test
+ imagePullPolicy: Never
+ #health
+ livenessProbe:
+ httpGet:
+ path: /alive
+ port: management
+ readinessProbe:
+ httpGet:
+ path: /ready
+ port: management
+ #health
+ ports:
+ # pekko-management bootstrap
+ - name: management
+ containerPort: 8558
+ protocol: TCP
+ - name: http
+ containerPort: 8080
+ protocol: TCP
+ env:
+ - name: SOME_ENV_TO_BE_CHANGED
+ value: "true"
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: KUBERNETES_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ - name: REQUIRED_CONTACT_POINT_NR
+ value: "3"
+ - name: JAVA_TOOL_OPTIONS
+ value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
+---
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+rules:
+- apiGroups: ["apps", ""] # additional access to api to get revision
+ resources: ["pods", "replicasets"]
+ verbs: ["get", "list"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+subjects:
+ # Uses the default service account.
+ # Consider creating a dedicated service account to run your
+ # Apache Pekko Cluster services and binding the role to that one.
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: pod-reader
+ apiGroup: rbac.authorization.k8s.io
diff --git
a/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
index db8466d7..bf00800e 100644
---
a/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
+++
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
@@ -20,6 +20,7 @@ import pekko.http.scaladsl.Http
import pekko.http.scaladsl.server.Directives._
import pekko.management.cluster.bootstrap.ClusterBootstrap
import pekko.management.scaladsl.PekkoManagement
+import pekko.rollingupdate.kubernetes.AppVersionRevision
import pekko.rollingupdate.kubernetes.PodDeletionCost
object PodDeletionCostDemoApp extends App {
@@ -33,6 +34,9 @@ object PodDeletionCostDemoApp extends App {
PekkoManagement(system).start()
+ // preferred to be called before ClusterBootstrap
+ AppVersionRevision(system).start()
+
ClusterBootstrap(system).start()
PodDeletionCost(system).start()
diff --git
a/integration-test/rolling-update-kubernetes/test-app-version-revision.sh
b/integration-test/rolling-update-kubernetes/test-app-version-revision.sh
new file mode 100755
index 00000000..315ddba8
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/test-app-version-revision.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+set -exu
+
+export NAMESPACE=pekko-rollingupdate-demo-ns
+export APP_NAME=pekko-rollingupdate-demo
+export PROJECT_NAME=integration-test-rolling-update-kubernetes
+export
DEPLOYMENT=integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-app-value-revision.yml
+
+integration-test/scripts/app-version-revision-kubernetes-test.sh
diff --git a/integration-test/scripts/app-version-revision-kubernetes-test.sh
b/integration-test/scripts/app-version-revision-kubernetes-test.sh
new file mode 100755
index 00000000..6be92cb4
--- /dev/null
+++ b/integration-test/scripts/app-version-revision-kubernetes-test.sh
@@ -0,0 +1,73 @@
+#!/bin/bash -e
+
+set -exu
+
+echo "Running app-version-revision-kubernetes-test.sh with deployment:
$DEPLOYMENT"
+
+eval $(minikube -p minikube docker-env)
+
+sbt $PROJECT_NAME/Docker/publishLocal
+
+# function to run after each change of ReplicaSet - see below `kubectl ...`
+# Param $1 is the expected revision
+testRevisionInPodsLog () {
+ echo "Testing for revision $1:"
+ for i in {1..20}
+ do
+ sleep 5
+ echo "Waiting for rolling update to complete ... (revision $1, $i/20)"
+ kubectl get pods -n $NAMESPACE
+
+ # the two lines below ensure a rollout is fully completed, before we look
at the logs for updated revision
+ # if not filtering out Terminated or pods that are not not ready (0/1) we
will see previous revisions in the logs
+ [ `kubectl get pods -n $NAMESPACE | grep Terminating | wc -l` -ne 0 ] &&
continue # loop again until no Terminating nodes in the list
+ [ `kubectl get pods -n $NAMESPACE | grep 0/1 | wc -l` -eq 0 ] && break
# exit the loop once we only have READY (1/1) pods
+ done
+
+ if [ $i -eq 20 ]
+ then
+ echo "Pods did not get ready (revision $1)"
+ kubectl -n $NAMESPACE describe deployment pekko-rollingupdate-demo
+ exit -1
+ fi
+
+ echo "Rolling update complete."
+
+ # expected log to indicate that reading was successful
+ expected_app_version_log="Reading revision from Kubernetes:
pekko.cluster.app-version was set to $1"
+
+ for POD in $(kubectl get pods -n $NAMESPACE | grep $APP_NAME | awk '{ print
$1 }')
+ do
+ # this grep'ed string is always logged
+ app_version_log=$(kubectl logs $POD -n $NAMESPACE | grep 'revision from
Kubernetes')
+ echo "found log in $POD: $app_version_log"
+
+ if [[ "$app_version_log" =~ .*"$expected_app_version_log".* ]]; then
+ echo "Logging for $POD contains '$expected_app_version_log'."
+ else
+ echo "Error! Logging for $POD did not contain
'$expected_app_version_log' but it contained '$app_version_log'."
+ exit -1
+ fi
+ done
+
+ echo "Testing for revision $1 done!"
+}
+
+# prep
+docker images | head
+kubectl create namespace $NAMESPACE || true
+kubectl -n $NAMESPACE delete deployment pekko-rollingupdate-demo || true
+kubectl -n $NAMESPACE apply -f $DEPLOYMENT
+
+# after the initial deployment
+testRevisionInPodsLog "1"
+
+# update the deployment, which creates a new revision
+kubectl set env deployment/$APP_NAME SOME_ENV_TO_BE_CHANGED=on -n $NAMESPACE
+testRevisionInPodsLog "2"
+
+# rollback, which creates a new revision
+kubectl rollout undo deployment/$APP_NAME -n $NAMESPACE
+testRevisionInPodsLog "3"
+
+echo "Test Successful!"
diff --git
a/management-cluster-bootstrap/src/main/mima-filters/2.0.x.backwards.excludes/cluster-bootstrap-internal.backwards.excludes
b/management-cluster-bootstrap/src/main/mima-filters/2.0.x.backwards.excludes/cluster-bootstrap-internal.backwards.excludes
new file mode 100644
index 00000000..7eda687b
--- /dev/null
+++
b/management-cluster-bootstrap/src/main/mima-filters/2.0.x.backwards.excludes/cluster-bootstrap-internal.backwards.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# removed internal classes
+ProblemFilters.exclude[MissingFieldProblem]("org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap.Internal")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap$Internal$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap$Internal$BootstrapStep")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap$Internal$Initializing$")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap$Internal$NotRunning$")
diff --git
a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
index 4f18de42..2e54468c 100644
---
a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
+++
b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrap.scala
@@ -33,7 +33,7 @@ import
pekko.management.scaladsl.ManagementRouteProviderSettings
import pekko.http.scaladsl.model.Uri
import pekko.http.scaladsl.server.Route
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.Promise
@@ -42,12 +42,11 @@ import scala.util.control.NonFatal
final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends
Extension with ManagementRouteProvider {
- import ClusterBootstrap.Internal._
import system.dispatcher
private val log = Logging(system, classOf[ClusterBootstrap])
- private final val bootstrapStep = new
AtomicReference[BootstrapStep](NotRunning)
+ private final val bootstrapStep = new AtomicBoolean(false)
val settings: ClusterBootstrapSettings =
ClusterBootstrapSettings(system.settings.config, log)
@@ -110,7 +109,7 @@ final class ClusterBootstrap(implicit system:
ExtendedActorSystem) extends Exten
"If you want to use the automatic bootstrap mechanism, make sure to
NOT set explicit seed nodes in the configuration. " +
"This node will attempt to join the configured seed nodes.",
Cluster(system).settings.SeedNodes.mkString("[", ", ", "]"))
- } else if (bootstrapStep.compareAndSet(NotRunning, Initializing)) {
+ } else if (bootstrapStep.compareAndSet(false, true)) {
log.info("Initiating bootstrap procedure using {} method...",
settings.contactPointDiscovery.discoveryMethod)
ensureSelfContactPoint()
@@ -162,13 +161,4 @@ object ClusterBootstrap extends
ExtensionId[ClusterBootstrap] with ExtensionIdPr
override def createExtension(system: ExtendedActorSystem): ClusterBootstrap
= new ClusterBootstrap()(system)
- /**
- * INTERNAL API
- */
- private[bootstrap] object Internal {
- sealed trait BootstrapStep
- case object NotRunning extends BootstrapStep
- case object Initializing extends BootstrapStep
- }
-
}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
new file mode 100644
index 00000000..2699b31f
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevision.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.Extension
+import pekko.actor.ExtensionId
+import pekko.actor.ExtensionIdProvider
+import pekko.cluster.Cluster
+import pekko.event.Logging
+import pekko.util.Version
+
+final class AppVersionRevision(implicit system: ExtendedActorSystem) extends
Extension {
+
+ private val log = Logging(system, classOf[AppVersionRevision])
+ private val configPath = "pekko.rollingupdate.kubernetes"
+ private val config = system.settings.config.getConfig(configPath)
+ private val k8sSettings = KubernetesSettings(config)
+ implicit private val ec: ExecutionContext = system.dispatcher
+ private final val isInitialized = new AtomicBoolean(false)
+ log.debug("Settings {}", k8sSettings)
+
+ private val versionPromise = Promise[Version]()
+
+ def getRevision(): Future[Version] = versionPromise.future
+
+ def start(): Unit = {
+ if (k8sSettings.podName.isEmpty) {
+ log.error(
+ "Not able to read the app version from the revision of the current
ReplicaSet. Reason: " +
+ "No configuration found to extract the pod name from. " +
+ s"Be sure to provide the pod name with `$configPath.pod-name` " +
+ "or by setting ENV variable `KUBERNETES_POD_NAME`.")
+ } else {
+ if (isInitialized.compareAndSet(false, true)) {
+ Cluster(system).setAppVersionLater(getRevision())
+ KubernetesApiImpl(log, k8sSettings).foreach { kubernetesApi =>
+
versionPromise.completeWith(kubernetesApi.readRevision().map(Version(_)))
+ }
+ } else
+ log.warning("AppVersionRevision extension already initiated, yet
start() method was called again. Ignoring.")
+ }
+ }
+
+ // autostart if the extension is loaded through the config extension list
+ private val autostart =
+
system.settings.config.getStringList("pekko.extensions").contains(classOf[AppVersionRevision].getName)
+
+ if (autostart) {
+ log.info("AppVersionRevision loaded through 'pekko.extensions'
auto-starting itself.")
+ try {
+ AppVersionRevision(system).start()
+ } catch {
+ case NonFatal(ex) =>
+ log.error(ex, "Failed to autostart AppVersionRevision extension")
+ }
+ }
+}
+
+object AppVersionRevision extends ExtensionId[AppVersionRevision] with
ExtensionIdProvider {
+
+ override def lookup: AppVersionRevision.type = AppVersionRevision
+
+ override def createExtension(system: ExtendedActorSystem):
AppVersionRevision = new AppVersionRevision()(system)
+
+ override def get(system: ActorSystem): AppVersionRevision = super.get(system)
+
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
index 9397180b..621340f2 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
@@ -54,6 +54,21 @@ private[pekko] final case class PodCost(podName: String,
cost: Int, address: Str
*/
@InternalApi private[pekko] final class PodCostClientException(message:
String) extends PodCostException(message)
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class ReadRevisionException(message:
String) extends RuntimeException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class GetPodException(message: String)
extends RuntimeException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class ReplicaSetException(message: String)
extends RuntimeException(message)
+
/**
* INTERNAL API
*/
@@ -89,6 +104,8 @@ private[pekko] trait KubernetesApi {
def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done]
+ def readRevision(): Future[String]
+
/**
* Reads a PodCost from the API server. If it doesn't exist it tries to
create it.
* The creation can fail due to another instance creating at the same time,
in this case
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
index 61437a2b..a43ec027 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
@@ -324,6 +324,106 @@ PUTs must contain resourceVersions. Response:
} yield resource
}
+ /**
+ * Start a proxy in local minikube:
+ * - kubectl proxy --port=8080
+ * - replic_set_name=$(curl -s
http://localhost:8080/api/v1/namespaces/pekko-rollingupdate-demo-ns/pods | jq
'.items[0].metadata.ownerReferences[0].name'| tr -d '"')
+ * - revision=$(curl -s
http://localhost:8080/apis/apps/v1/namespaces/pekko-rollingupdate-demo-ns/replicasets/"$replic_set_name"
| jq '.metadata.annotations["deployment.kubernetes.io/revision"]'| tr -d '"')
+ */
+ override def readRevision(): Future[String] = {
+ val maxTries = 5
+ def loop(tries: Int = 0): Future[Option[String]] = {
+
+ val podOwnerRef = getPod().map(_.metadata.ownerReferences.filter(_.kind
== "ReplicaSet").headOption)
+
+ val replicaSet = podOwnerRef.flatMap {
+ case Some(podOwnerRef) => getReplicaSet(podOwnerRef.name)
+ case None => Future.failed(new ReadRevisionException("No
replica name found"))
+ }
+
+ val revision =
replicaSet.map(_.metadata.annotations.`deployment.kubernetes.io/revision`)
+ revision.map(Some(_)).recoverWith {
+ case ex =>
+ if (tries >= maxTries) {
+ Future(None)
+ } else {
+ log.warning(s"Failed to get revision ${ex.getMessage}. Try again
($tries)")
+ loop(tries + 1)
+ }
+ }
+ }
+
+ loop()
+ .map {
+ case Some(revision) =>
+ log.info(s"Reading revision from Kubernetes:
pekko.cluster.app-version was set to $revision")
+ revision
+ case None => throw new ReadRevisionException(s"Not able to read
revision from Kubernetes.")
+ }
+ .recover {
+ case ex =>
+ throw new ReadRevisionException(s"Error. Not able to read revision
from Kubernetes: ${ex.getMessage}")
+ }
+ }
+
+ private val pathForGetPod: Uri.Path =
+ Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / "pods" /
settings.podName
+
+ private def pathForReplicaSet(replicaSetName: String): Uri.Path =
+ Uri.Path.Empty / "apis" / "apps" / "v1" / "namespaces" / namespace /
"replicasets" / replicaSetName
+
+ private def getReplicaSet(name: String): Future[ReplicaSet] = {
+ val ent = HttpEntity.Empty.withContentType(ContentTypes.`application/json`)
+ val request = requestForPath(pathForReplicaSet(name), entity = ent)
+ val httpResponse = makeRequest(request, s"Timeout getting replica set
'$name'")
+ for {
+ response <- httpResponse
+ responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+ replicaSet <- response.status match {
+ case StatusCodes.OK =>
+ Unmarshal(responseEntity).to[ReplicaSet].recover {
+ case ex =>
+ throw new ReplicaSetException(s"Error while parsing ReplicaSet:
${ex.getMessage}")
+ }
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ responseEntity
+ .toStrict(settings.bodyReadTimeout)
+ .flatMap(e => Unmarshal(e).to[String])
+ .map(body =>
+ throw new ReplicaSetException(
+ s"Unexpected response from API server when retrieving
ReplicaSet: $unexpected. Body: $body"))
+ }
+ } yield {
+ replicaSet
+ }
+ }
+
+ private def getPod(): Future[Pod] = {
+ val ent = HttpEntity.Empty.withContentType(ContentTypes.`application/json`)
+ val request = requestForPath(pathForGetPod, entity = ent)
+ val httpResponse = makeRequest(request, "Timeout getting pod")
+ for {
+ response <- httpResponse
+ responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+ pod <- response.status match {
+ case StatusCodes.OK =>
+ Unmarshal(responseEntity).to[Pod].recover {
+ case ex => throw new GetPodException(s"Error while parsing Pod:
${ex.getMessage}")
+ }
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .map(body =>
+ throw new GetPodException(
+ s"Unexpected response from API server when retrieving Pod.
StatusCode: $unexpected. Body: $body"))
+ }
+ } yield pod
+ }
+
}
/**
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
index e46eb427..cea1e93a 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
@@ -21,6 +21,42 @@ import spray.json.DefaultJsonProtocol
import spray.json.JsonFormat
import spray.json.RootJsonFormat
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class ReplicaAnnotation(`deployment.kubernetes.io/revision`: String)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class ReplicaSetMetadata(annotations: ReplicaAnnotation)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class ReplicaSet(metadata: ReplicaSetMetadata)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class PodOwnerRef(name: String, kind: String)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class PodMetadata(ownerReferences: immutable.Seq[PodOwnerRef])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class Pod(metadata: PodMetadata)
+
/**
* INTERNAL API
*/
@@ -53,4 +89,12 @@ trait KubernetesJsonSupport extends SprayJsonSupport with
DefaultJsonProtocol {
implicit val specFormat: JsonFormat[Spec] = jsonFormat1(Spec.apply)
implicit val podCostCustomResourceFormat:
RootJsonFormat[PodCostCustomResource] = jsonFormat4(
PodCostCustomResource.apply)
+
+ implicit val podOwnerRefFormat: JsonFormat[PodOwnerRef] =
jsonFormat2(PodOwnerRef.apply)
+ implicit val podMetadataFormat: JsonFormat[PodMetadata] =
jsonFormat1(PodMetadata.apply)
+ implicit val podFormat: RootJsonFormat[Pod] = jsonFormat1(Pod.apply)
+
+ implicit val replicaAnnotationFormat: JsonFormat[ReplicaAnnotation] =
jsonFormat1(ReplicaAnnotation.apply)
+ implicit val replicaSetMetadataFormat: JsonFormat[ReplicaSetMetadata] =
jsonFormat1(ReplicaSetMetadata.apply)
+ implicit val podReplicaSetFormat: RootJsonFormat[ReplicaSet] =
jsonFormat1(ReplicaSet.apply)
}
diff --git
a/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.java
b/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.java
new file mode 100644
index 00000000..501fd6b0
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/java/jdoc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdoc.pekko.rollingupdate.kubernetes;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.rollingupdate.kubernetes.AppVersionRevision;
+
+public class AppVersionRevisionCompileOnly {
+ public static void bootstrap() {
+
+ ActorSystem system = ActorSystem.create();
+
+ //#start
+ // Starting the AppVersionRevision extension
+ // preferred to be called before ClusterBootstrap
+ AppVersionRevision.get(system).start();
+ //#start
+ }
+}
diff --git
a/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionJavaCompileTest.java
b/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionJavaCompileTest.java
new file mode 100644
index 00000000..c6cebb12
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/java/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionJavaCompileTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.junit.jupiter.api.Test;
+
+public class AppVersionRevisionJavaCompileTest {
+
+ public void test() {
+ ActorSystem actorSystem = ActorSystem.create("test");
+ AppVersionRevision appVersionRevision =
AppVersionRevision.get(actorSystem);
+ }
+
+ @Test
+ public void compileOnly() {}
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.scala
b/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.scala
new file mode 100644
index 00000000..0ecb8d77
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/scala/doc/pekko/rollingupdate/kubernetes/AppVersionRevisionCompileOnly.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package doc.pekko.rollingupdate.kubernetes
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.rollingupdate.kubernetes.AppVersionRevision
+
+object AppVersionRevisionCompileOnly {
+
+ val system = ActorSystem()
+
+ // #start
+ // Starting the AppVersionRevision extension
+ // preferred to be called before ClusterBootstrap
+ AppVersionRevision(system).start()
+ // #start
+
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
new file mode 100644
index 00000000..39a7a95e
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/AppVersionRevisionSpec.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.MappingBuilder
+import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock.aResponse
+import com.github.tomakehurst.wiremock.client.WireMock.get
+import com.github.tomakehurst.wiremock.client.WireMock.stubFor
+import com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
+import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+import com.github.tomakehurst.wiremock.matching.EqualToPattern
+import com.github.tomakehurst.wiremock.stubbing.Scenario
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Millis
+import org.scalatest.time.Seconds
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+import pekko.actor.ActorSystem
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+
+import scala.concurrent.duration._
+
+object AppVersionRevisionSpec {
+ val config = ConfigFactory.parseString("""
+ pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
+ pekko.actor.provider = cluster
+
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.artery.canonical.hostname = 127.0.0.1
+
+ pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+ pekko.coordinated-shutdown.terminate-actor-system = off
+ pekko.coordinated-shutdown.run-by-actor-system-terminate = off
+ pekko.test.filter-leeway = 10s
+ """)
+}
+
+class AppVersionRevisionSpec
+ extends TestKit(
+ ActorSystem(
+ "AppVersionRevisionSpec",
+ AppVersionRevisionSpec.config
+ ))
+ with ImplicitSender
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Eventually
+ with ScalaFutures {
+
+ private val wireMockServer = new WireMockServer(wireMockConfig().port(0))
+ wireMockServer.start()
+ WireMock.configureFor(wireMockServer.port())
+
+ // for wiremock to provide json
+ val mapper = new ObjectMapper()
+
+ private val namespace = "namespace-test"
+ private val podName1 = "pod-test-1"
+
+ private def settings(podName: String) = {
+ new KubernetesSettings(
+ apiCaPath = "",
+ apiTokenPath = "",
+ apiServiceHost = "localhost",
+ apiServicePort = wireMockServer.port(),
+ namespace = Some(namespace),
+ namespacePath = "",
+ podName = podName,
+ secure = false,
+ apiServiceRequestTimeout = 2.seconds,
+ customResourceSettings = new CustomResourceSettings(enabled = false,
crName = None, 60.seconds)
+ )
+ }
+
+ private val kubernetesApi =
+ new KubernetesApiImpl(
+ system,
+ settings(podName1),
+ namespace,
+ apiToken = "apiToken",
+ clientHttpsConnectionContext = None)
+
+ override implicit val patienceConfig: PatienceConfig =
+ PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
+
+ override protected def afterAll(): Unit = super.shutdown()
+
+ override protected def beforeEach(): Unit = {
+ wireMockServer.resetAll()
+ WireMock.resetAllScenarios()
+ }
+
+ private def podPath(podName: String) =
+ urlEqualTo(s"/api/v1/namespaces/$namespace/pods/$podName")
+
+ private def replicaPath(replica: String) =
+ urlEqualTo(s"/apis/apps/v1/namespaces/$namespace/replicasets/$replica")
+
+ private def getPod(podName: String): MappingBuilder =
+ get(podPath(podName)).withHeader("Content-Type", new
EqualToPattern("application/json"))
+
+ private def getReplicaSet(replica: String): MappingBuilder =
+ get(replicaPath(replica)).withHeader("Content-Type", new
EqualToPattern("application/json"))
+
+ private val defaultPodResponseJson =
+ """{
+ | "metadata": {
+ | "ownerReferences": [
+ | {"name": "wrong-replicaset-id", "kind": "SomethingElse"},
+ | {"name": "parent-replicaset-id", "kind": "ReplicaSet"}
+ | ]
+ | }
+ |}""".stripMargin
+
+ private val defaultReplicaResponseJson =
+ """{
+ | "metadata": {
+ | "annotations": {
+ | "deployment.kubernetes.io/revision": "1"
+ | }
+ | }
+ |}""".stripMargin
+
+ private def stubPodResponse(json: String = defaultPodResponseJson, state:
String = Scenario.STARTED) =
+ stubFor(
+ getPod(podName1)
+ .willReturn(
+
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
+ )
+ .inScenario("pod")
+ .whenScenarioStateIs(state))
+
+ private def stubReplicaResponse(json: String = defaultReplicaResponseJson) =
+ stubFor(
+ getReplicaSet("parent-replicaset-id")
+ .willReturn(
+
ResponseDefinitionBuilder.okForJson("").withJsonBody(mapper.readTree(json))
+ )
+ .inScenario("replica")
+ .whenScenarioStateIs(Scenario.STARTED))
+
+ "Read revision from Kubernetes" should {
+
+ "parse pod and replica response to get the revision" in {
+ stubPodResponse()
+ stubReplicaResponse()
+
+ EventFilter
+ .info(pattern = "Reading revision from Kubernetes:
pekko.cluster.app-version was set to 1", occurrences = 1)
+ .intercept {
+ kubernetesApi.readRevision().futureValue should be("1")
+ }
+ }
+
+ "retry and then fail when pod not found" in {
+ stubFor(getPod(podName1).willReturn(aResponse().withStatus(404)))
+ EventFilter
+ .warning(pattern = ".*Failed to get revision", occurrences = 5)
+ .intercept {
+
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
+ }
+ }
+
+ "retry and then fail when replicaset not found" in {
+ stubPodResponse()
+
stubFor(getReplicaSet("parent-replicaset-id").willReturn(aResponse().withStatus(404)))
+ EventFilter
+ .warning(pattern = ".*Failed to get revision", occurrences = 5)
+ .intercept {
+
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
+ }
+ }
+
+ "log if pod json can not be parsed" in {
+ stubPodResponse(json = """{ "invalid": "json" }""")
+ EventFilter
+ .warning(pattern = ".*Error while parsing Pod*")
+ .intercept {
+
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
+ }
+ }
+
+ "log if replica json can not be parsed" in {
+ stubPodResponse()
+ stubReplicaResponse(json = """{ "invalid": "json" }""")
+ EventFilter
+ .warning(pattern = ".*Error while parsing Pod*")
+ .intercept {
+
assert(kubernetesApi.readRevision().failed.futureValue.isInstanceOf[ReadRevisionException])
+ }
+ }
+
+ "break the loop if consecutive request succeeds" in {
+ stubFor(
+ getPod(podName1)
+ .willReturn(aResponse().withStatus(404))
+ .inScenario("pod")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .willSetStateTo("after first fail")
+ )
+ stubFor(
+ getPod(podName1)
+ .willReturn(aResponse().withStatus(404))
+ .inScenario("pod")
+ .whenScenarioStateIs("after first fail")
+ .willSetStateTo("k8s is happy now")
+ )
+ stubPodResponse(state = "k8s is happy now")
+ stubReplicaResponse()
+ EventFilter
+ .warning(pattern = ".*Try again*", occurrences = 2)
+ .intercept {
+ kubernetesApi.readRevision().futureValue should be("1")
+ }
+ }
+ }
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
index 10ae084b..2a9a1044 100644
---
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
@@ -90,6 +90,8 @@ object PodDeletionCostAnnotatorCrSpec {
def getPodCosts(): Vector[PodCost] = this.synchronized {
podCosts
}
+
+ override def readRevision(): Future[String] = Future.successful("1")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]