This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git


The following commit(s) were added to refs/heads/main by this push:
     new de751596 Port Akka management PRs #1134, #1141, #1142 to Pekko (#774)
de751596 is described below

commit de751596c79bd4c7525800336cd81fdcff1bf302
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 14 15:16:41 2026 +0100

    Port Akka management PRs #1134, #1141, #1142 to Pekko (#774)
    
    * Port Akka management PRs #1134, #1141, #1142 to Pekko
    
    PR #1134 - Write pod cost to CRD (rolling-update-kubernetes):
    - Add CustomResourceSettings and extend KubernetesSettings with
      custom-resource config (enabled, cr-name, cleanup-after) and
      api-service-request-timeout
    - Create KubernetesApi trait with PodCost/PodCostResource types
    - Create KubernetesJsonSupport for spray-json marshalling of CRD types
    - Create KubernetesApiImpl with Pekko HTTP client, blocking IO on
      DefaultBlockingDispatcherId at startup; uses pekko.apache.org API group
    - Refactor PodDeletionCost extension to use KubernetesApiImpl
    - Refactor PodDeletionCostAnnotator to support both direct annotation
      and PodCost CR updates via KubernetesApi abstraction
    - Delete ApiRequests.scala (functionality absorbed into KubernetesApiImpl)
    - Update reference.conf with new config keys
    - Update PodDeletionCostAnnotatorSpec to use new props() factory
    - Add PodDeletionCostAnnotatorCrSpec for CRD-based cost updates
    
    PR #1141 - Integration test scripts:
    - Add integration-test/scripts/rollingupdate-kubernetes-test.sh
    - Add integration-test/rollingupdate-kubernetes/kubernetes/ directory
    
    PR #1142 - Blocking IO at startup (discovery-kubernetes-api):
    - Move apiToken, podNamespace and SSL context creation to Future on
      blocking dispatcher (kubernetesSetup) in KubernetesApiServiceDiscovery
    - Add KubernetesSetup private case class to companion object
    - Use Logging(system, classOf[...]) instead of getClass
    - Change Seq(None) to List(None) in targets; return immutable.Seq
    
    Co-authored-by: Copilot <[email protected]>
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix log message grammar: 'oldest will written' -> 'oldest will be written'
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-management/sessions/e6cfbf1b-2d6a-4ebf-8f70-21d47b773b13
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * imports
    
    * compile issue
    
    * Update KubernetesSettings.scala
    
    * Port remaining files from Akka PR #1134: pod-cost YAMLs, int-test module, 
rolling-update integration test module, scripts, and GitHub workflow
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-management/sessions/ca58ec9f-6a95-4dec-980c-2007db4841ae
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix bugs in rollingupdate-kubernetes-cr-test.sh: remove broken inner 
loop, add try_count increment
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-management/sessions/ca58ec9f-6a95-4dec-980c-2007db4841ae
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update integration-tests-rolling-update-cr.yml
    
    * scalafmt
    
    * add-opens
    
    * Update build.sbt
    
    * Update logback.xml
    
    * Update build.sbt
    
    * Update build.sbt
    
    * Update build.sbt
    
    * Update integration test workflow triggers
    
    Removed pull_request trigger from integration test workflow.
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../integration-tests-rolling-update-cr.yml        |  68 ++++
 build.sbt                                          |  27 ++
 .../kubernetes/KubernetesApiServiceDiscovery.scala | 153 ++++----
 .../rolling-update-kubernetes/build.sbt            |  22 ++
 .../kubernetes/pekko-cluster-cr.yml                | 127 +++++++
 .../kubernetes/pekko-cluster.yml                   | 122 +++++++
 .../src/main/resources/application.conf            |  22 ++
 .../src/main/resources/logback.xml                 |  19 +
 .../cluster/bootstrap/PodDeletionCostDemoApp.scala |  41 +++
 .../rolling-update-kubernetes/test-cr.sh           |  11 +
 integration-test/rolling-update-kubernetes/test.sh |  10 +
 .../scripts/rollingupdate-kubernetes-cr-test.sh    |  74 ++++
 .../scripts/rollingupdate-kubernetes-test.sh       | 134 +++++++
 project/Dependencies.scala                         |   4 +
 .../kubernetes/KubernetesApiIntegrationTest.scala  | 182 ++++++++++
 rolling-update-kubernetes/pod-cost-example.yml     |  16 +
 rolling-update-kubernetes/pod-cost.yml             |  49 +++
 .../src/main/resources/reference.conf              |  18 +
 .../apache/pekko/rollingupdate/CostStrategy.scala  |   5 +-
 .../rollingupdate/kubernetes/ApiRequests.scala     |  51 ---
 .../rollingupdate/kubernetes/KubernetesApi.scala   | 116 ++++++
 .../kubernetes/KubernetesApiImpl.scala             | 389 +++++++++++++++++++++
 .../kubernetes/KubernetesJsonSupport.scala         |  56 +++
 .../kubernetes/KubernetesSettings.scala            |  39 ++-
 .../rollingupdate/kubernetes/PodDeletionCost.scala |  95 ++---
 .../kubernetes/PodDeletionCostAnnotator.scala      | 217 +++++++-----
 .../PodDeletionCostAnnotatorCrSpec.scala           | 190 ++++++++++
 .../kubernetes/PodDeletionCostAnnotatorSpec.scala  |  57 +--
 28 files changed, 2027 insertions(+), 287 deletions(-)

diff --git a/.github/workflows/integration-tests-rolling-update-cr.yml 
b/.github/workflows/integration-tests-rolling-update-cr.yml
new file mode 100644
index 00000000..5b94a543
--- /dev/null
+++ b/.github/workflows/integration-tests-rolling-update-cr.yml
@@ -0,0 +1,68 @@
+name: Integration test for Rolling Update CR Kubernetes
+
+on:
+  push:
+    branches:
+      - main
+      - release-*
+  workflow_dispatch:
+
+permissions:
+  contents: read
+
+jobs:
+  integration-test:
+    name: Integration Tests for Rolling Update CR Kubernetes
+    runs-on: ubuntu-22.04
+    if: github.repository == 'apache/pekko-management'
+    strategy:
+      fail-fast: false
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v6
+        with:
+          fetch-depth: 0
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+      - name: Setup Java 17
+        uses: actions/setup-java@v5
+        with:
+          distribution: temurin
+          java-version: 17
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+
+      - name: Setup Minikube
+        uses: 
manusa/actions-setup-minikube@96202dee4ae1c2f46a62fe197273aaf22b83f42d # v2.16.1
+        with:
+          minikube version: 'v1.36.0'
+          kubernetes version: 'v1.29.15'
+          driver: docker
+          github token: ${{ secrets.GITHUB_TOKEN }}
+          start args: '--addons=ingress'
+
+      - name: Run Integration Tests
+        timeout-minutes: 15
+        run: |-
+          echo 'Creating namespace'
+          kubectl create namespace rolling
+          echo 'Creating resources'
+          kubectl apply -f ./rolling-update-kubernetes/pod-cost.yml
+          echo 'Adding proxy port'
+          kubectl proxy --port=8080 &
+          echo 'Running tests'
+          sbt "rolling-update-kubernetes-int-test/test"
+          ./integration-test/rolling-update-kubernetes/test-cr.sh
+
+      - name: Print logs on failure
+        if: ${{ failure() }}
+        run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/build.sbt b/build.sbt
index 23bbf590..a427b541 100644
--- a/build.sbt
+++ b/build.sbt
@@ -188,6 +188,19 @@ lazy val rollingUpdateKubernetes = 
pekkoModule("rolling-update-kubernetes")
     mimaPreviousArtifacts := Set.empty)
   .dependsOn(managementPki)
 
+lazy val rollingUpdateKubernetesIntTest = 
pekkoModule("rolling-update-kubernetes-int-test")
+  .enablePlugins(AutomateHeaderPlugin)
+  .disablePlugins(MimaPlugin)
+  .settings(
+    name := "pekko-rolling-update-kubernetes-int-test",
+    libraryDependencies := Dependencies.rollingUpdateKubernetesIntTest,
+    // following is needed by Agrona lib
+    // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+    Test / fork := true,
+    Test / javaOptions += 
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
+  .dependsOn(rollingUpdateKubernetes)
+  .enablePlugins(NoPublish)
+
 lazy val billOfMaterials = Project("bill-of-materials", 
file("bill-of-materials"))
   .enablePlugins(BillOfMaterialsPlugin)
   .disablePlugins(MimaPlugin)
@@ -219,6 +232,20 @@ lazy val leaseKubernetesIntTest = 
pekkoModule("lease-kubernetes-int-test")
       Cmd("RUN", "chmod +x /opt/docker/bin/pekko-lease-kubernetes-int-test")))
   .enablePlugins(NoPublish)
 
+lazy val integrationTestRollingUpdateKubernetes = 
pekkoIntTestModule("rolling-update-kubernetes")
+  .enablePlugins(JavaAppPackaging, DockerPlugin)
+  .enablePlugins(AutomateHeaderPlugin)
+  .disablePlugins(MimaPlugin)
+  .settings(
+    name := "integration-test-rolling-update-kubernetes",
+    libraryDependencies := Dependencies.bootstrapDemos,
+    version ~= (_.replace('+', '-')),
+    dockerUpdateLatest := true,
+    dockerExposedPorts := Seq(8080, 8558, 2552))
+  .dependsOn(rollingUpdateKubernetes, management, managementClusterHttp, 
managementClusterBootstrap,
+    discoveryKubernetesApi)
+  .enablePlugins(NoPublish)
+
 lazy val integrationTestKubernetesApi = pekkoIntTestModule("kubernetes-api")
   .disablePlugins(MimaPlugin)
   .enablePlugins(AutomateHeaderPlugin)
diff --git 
a/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
 
b/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
index 0bace9b5..da497358 100644
--- 
a/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
+++ 
b/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
@@ -8,7 +8,7 @@
  */
 
 /*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.discovery.kubernetes
@@ -17,31 +17,33 @@ import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.nio.file.{ Files, Paths }
 import java.security.{ KeyStore, SecureRandom }
+import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager 
}
+
+import scala.collection.immutable
 import scala.collection.immutable.Seq
+import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.util.Try
 import scala.util.control.{ NoStackTrace, NonFatal }
 
 import org.apache.pekko
-import org.apache.pekko.http.javadsl.model.headers.AcceptEncoding
-import org.apache.pekko.http.scaladsl.coding.Coders
 import pekko.actor.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
 import pekko.discovery.kubernetes.JsonFormat._
 import pekko.discovery.kubernetes.KubernetesApiServiceDiscovery.{ targets, 
KubernetesApiException }
 import pekko.discovery.{ Lookup, ServiceDiscovery }
-import pekko.event.{ LogSource, Logging }
-import pekko.http.scaladsl.HttpsConnectionContext
-import pekko.http.scaladsl._
+import pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
+import pekko.event.Logging
+import pekko.http.javadsl.model.headers.AcceptEncoding
+import pekko.http.scaladsl.{ HttpsConnectionContext, _ }
+import pekko.http.scaladsl.coding.Coders
 import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.model.headers.{ Authorization, HttpEncodings, 
OAuth2BearerToken }
 import pekko.http.scaladsl.unmarshalling.Unmarshal
 import pekko.pki.kubernetes.PemManagersProvider
 
-import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager 
}
-
 object KubernetesApiServiceDiscovery {
 
   /**
@@ -57,7 +59,7 @@ object KubernetesApiServiceDiscovery {
       podNamespace: String,
       podDomain: String,
       rawIp: Boolean,
-      containerName: Option[String]): Seq[ResolvedTarget] =
+      containerName: Option[String]): immutable.Seq[ResolvedTarget] =
     for {
       item <- podList.items
       if item.metadata.flatMap(_.deletionTimestamp).isEmpty
@@ -73,7 +75,7 @@ object KubernetesApiServiceDiscovery {
       // Maybe port is an Option of a port, and will be None if no portName 
was requested
       maybePort <- portName match {
         case None =>
-          Seq(None)
+          List(None)
         case Some(name) =>
           for {
             container <- itemSpec.containers
@@ -92,6 +94,10 @@ object KubernetesApiServiceDiscovery {
 
   class KubernetesApiException(msg: String) extends RuntimeException(msg) with 
NoStackTrace
 
+  private[kubernetes] final case class KubernetesSetup(
+      podNamespace: String,
+      apiToken: String,
+      clientHttpsConnectionContext: HttpsConnectionContext)
 }
 
 /**
@@ -101,54 +107,60 @@ object KubernetesApiServiceDiscovery {
 class KubernetesApiServiceDiscovery(settings: Settings)(
     implicit system: ActorSystem) extends ServiceDiscovery {
 
-  import system.dispatcher
+  import KubernetesApiServiceDiscovery.KubernetesSetup
+  import pekko.discovery.kubernetes.KubernetesApiServiceDiscovery._
 
   private val http = Http()
 
   def this()(implicit system: ActorSystem) = this(Settings(system))
 
-  private val log = Logging(system, getClass)(LogSource.fromClass)
+  private val log = Logging(system, classOf[KubernetesApiServiceDiscovery])
 
-  private val sslContext = {
-    val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+  log.debug("Settings {}", settings)
 
-    val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
-    val keyStore = KeyStore.getInstance("PKCS12")
-    keyStore.load(null)
-    factory.init(keyStore, Array.empty)
-    val km: Array[KeyManager] = factory.getKeyManagers
-    val tm: Array[TrustManager] =
-      PemManagersProvider.buildTrustManagers(certificates)
-    val random: SecureRandom = new SecureRandom
-    val sslContext = SSLContext.getInstance(settings.tlsVersion)
-    sslContext.init(km, tm, random)
-    sslContext
+  private val kubernetesSetup: Future[KubernetesSetup] = {
+    implicit val blockingDispatcher: ExecutionContext = 
system.dispatchers.lookup(DefaultBlockingDispatcherId)
+    for {
+      apiToken: String <- Future {
+        readConfigVarFromFilesystem(settings.apiTokenPath, 
"api-token").getOrElse("")
+      }
+      namespace: String <- Future {
+        settings.podNamespace
+          .orElse(readConfigVarFromFilesystem(settings.podNamespacePath, 
"pod-namespace"))
+          .getOrElse("default")
+      }
+      httpsContext <- Future(clientHttpsConnectionContext())
+    } yield {
+      KubernetesSetup(namespace, apiToken, httpsContext)
+    }
   }
 
-  private val clientSslContext: HttpsConnectionContext = 
ConnectionContext.httpsClient(sslContext)
-
-  log.debug("Settings {}", settings)
+  import system.dispatcher
 
   override def lookup(query: Lookup, resolveTimeout: FiniteDuration): 
Future[Resolved] = {
     val labelSelector = settings.podLabelSelector(query.serviceName)
 
-    log.info(
-      "Querying for pods with label selector: [{}]. Namespace: [{}]. Port: 
[{}]",
-      labelSelector,
-      podNamespace,
-      query.portName)
-
     for {
-      request <- optionToFuture(
-        podRequest(apiToken, podNamespace, labelSelector),
-        s"Unable to form request; check Kubernetes environment (expecting env 
vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})")
+      setup <- kubernetesSetup
+
+      request <- {
+        log.info(
+          "Querying for pods with label selector: [{}]. Namespace: [{}]. Port: 
[{}]",
+          labelSelector,
+          setup.podNamespace,
+          query.portName)
+
+        optionToFuture(
+          podRequest(setup.apiToken, setup.podNamespace, labelSelector),
+          s"Unable to form request; check Kubernetes environment (expecting 
env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})"
+        )
+      }
 
-      response <- http.singleRequest(request, 
clientSslContext).map(decodeResponse)
+      response <- http.singleRequest(request, 
setup.clientHttpsConnectionContext).map(decodeResponse)
 
       entity <- response.entity.toStrict(resolveTimeout)
 
       podList <- {
-
         response.status match {
           case StatusCodes.OK =>
             log.debug("Kubernetes API entity: [{}]", entity.data.utf8String)
@@ -176,15 +188,13 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
                 other,
                 body)
             }
-
             Future.failed(new KubernetesApiException(s"Non-200 from Kubernetes 
API server: $other"))
         }
-
       }
 
     } yield {
       val addresses =
-        targets(podList, query.portName, podNamespace, settings.podDomain, 
settings.rawIp, settings.containerName)
+        targets(podList, query.portName, setup.podNamespace, 
settings.podDomain, settings.rawIp, settings.containerName)
       if (addresses.isEmpty && podList.items.nonEmpty) {
         if (log.isInfoEnabled) {
           val containerPortNames = 
podList.items.flatMap(_.spec).flatMap(_.containers).flatMap(_.ports).flatten.toSet
@@ -200,11 +210,43 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
     }
   }
 
-  private val apiToken = readConfigVarFromFilesystem(settings.apiTokenPath, 
"api-token").getOrElse("")
+  private def optionToFuture[T](option: Option[T], failMsg: String): Future[T] 
=
+    option.fold(Future.failed[T](new 
NoSuchElementException(failMsg)))(Future.successful)
 
-  private val podNamespace = settings.podNamespace
-    .orElse(readConfigVarFromFilesystem(settings.podNamespacePath, 
"pod-namespace"))
-    .getOrElse("default")
+  private def podRequest(token: String, namespace: String, labelSelector: 
String) =
+    for {
+      host <- sys.env.get(settings.apiServiceHostEnvName)
+      portStr <- sys.env.get(settings.apiServicePortEnvName)
+      port <- Try(portStr.toInt).toOption
+    } yield {
+      val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / 
"pods"
+      val query = Uri.Query("labelSelector" -> labelSelector)
+      val uri = Uri.from(scheme = "https", host = host, port = 
port).withPath(path).withQuery(query)
+
+      val authHeaders = immutable.Seq(Authorization(OAuth2BearerToken(token)))
+      val acceptEncodingHeader = 
HttpEncodings.getForKey(settings.httpRequestAcceptEncoding)
+        .map(httpEncoding => AcceptEncoding.create(httpEncoding))
+      HttpRequest(uri = uri, headers = authHeaders ++ acceptEncodingHeader)
+    }
+
+  /**
+   * This uses blocking IO, and so should only be used at startup from 
blocking dispatcher.
+   */
+  private def clientHttpsConnectionContext(): HttpsConnectionContext = {
+    val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+
+    val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+    val keyStore = KeyStore.getInstance("PKCS12")
+    keyStore.load(null)
+    factory.init(keyStore, Array.empty)
+    val km: Array[KeyManager] = factory.getKeyManagers
+    val tm: Array[TrustManager] =
+      PemManagersProvider.buildTrustManagers(certificates)
+    val random: SecureRandom = new SecureRandom
+    val sslContext = SSLContext.getInstance(settings.tlsVersion)
+    sslContext.init(km, tm, random)
+    ConnectionContext.httpsClient(sslContext)
+  }
 
   /**
    * This uses blocking IO, and so should only be used to read configuration 
at startup.
@@ -225,25 +267,6 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
     }
   }
 
-  private def optionToFuture[T](option: Option[T], failMsg: String): Future[T] 
=
-    option.fold(Future.failed[T](new 
NoSuchElementException(failMsg)))(Future.successful)
-
-  private def podRequest(token: String, namespace: String, labelSelector: 
String) =
-    for {
-      host <- sys.env.get(settings.apiServiceHostEnvName)
-      portStr <- sys.env.get(settings.apiServicePortEnvName)
-      port <- Try(portStr.toInt).toOption
-    } yield {
-      val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / 
"pods"
-      val query = Uri.Query("labelSelector" -> labelSelector)
-      val uri = Uri.from(scheme = "https", host = host, port = 
port).withPath(path).withQuery(query)
-
-      val authHeaders = Seq(Authorization(OAuth2BearerToken(token)))
-      val acceptEncodingHeader = 
HttpEncodings.getForKey(settings.httpRequestAcceptEncoding)
-        .map(httpEncoding => AcceptEncoding.create(httpEncoding))
-      HttpRequest(uri = uri, headers = authHeaders ++ acceptEncodingHeader)
-    }
-
   private def decodeResponse(response: HttpResponse): HttpResponse = {
     val decoder = response.encoding match {
       case HttpEncodings.gzip =>
diff --git a/integration-test/rolling-update-kubernetes/build.sbt 
b/integration-test/rolling-update-kubernetes/build.sbt
new file mode 100644
index 00000000..7684b719
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/build.sbt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+import com.typesafe.sbt.packager.docker._
+
+enablePlugins(JavaAppPackaging, DockerPlugin)
+
+version := "1.3.3.7" // we hard-code the version here, it could be anything 
really
+
+dockerExposedPorts := Seq(8080, 8558, 2552)
+dockerBaseImage := "eclipse-temurin:17-jre-alpine"
+
+dockerCommands ++= Seq(
+  Cmd("USER", "root"),
+  Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", 
"busybox-extras", "curl", "strace"),
+  Cmd("RUN", "chgrp -R 0 . && chmod -R g=u ."))
diff --git 
a/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml 
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
new file mode 100644
index 00000000..90cefd2c
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
@@ -0,0 +1,127 @@
+#deployment
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  labels:
+    app: pekko-rolling-update-demo
+  name: pekko-rolling-update-demo
+spec:
+  replicas: 3
+  selector:
+    matchLabels:
+     app: pekko-rolling-update-demo
+  strategy:
+    rollingUpdate:
+      maxSurge: 1
+      maxUnavailable: 0
+    type: RollingUpdate
+
+  template:
+    metadata:
+      labels:
+        app: pekko-rolling-update-demo
+        actorSystemName: pekko-rolling-update-demo
+    spec:
+      containers:
+      - name: pekko-rolling-update-demo
+        image: integration-test-rolling-update-kubernetes:1.3.3.7
+        # Remove for a real project, the image is picked up locally for the 
integration test
+        imagePullPolicy: Never
+        resources:
+          limits:
+            memory: "256Mi"
+          requests:
+            memory: "256Mi"
+            cpu: "300m"
+        #health
+        livenessProbe:
+          httpGet:
+            path: /alive
+            port: management
+        readinessProbe:
+          httpGet:
+            path: /ready
+            port: management
+        #health
+        ports:
+        # pekko-management bootstrap
+        - name: management
+          containerPort: 8558
+          protocol: TCP
+        - name: http
+          containerPort: 8080
+          protocol: TCP
+        env:
+        - name: KUBERNETES_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        # The pod deletion cost will use this var to identify the pod to be 
annotated
+        - name: KUBERNETES_POD_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: REQUIRED_CONTACT_POINT_NR
+          value: "3"
+        # Agrona requires the --add-opens JVM option to work on recent JDKs
+        - name: JAVA_TOOL_OPTIONS
+          value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75 -Xlog:gc 
-Dpekko.rollingupdate.kubernetes.custom-resource.enabled=on 
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED"
+#deployment
+---
+#rbac-reader
+#
+# Create a role, `pod-reader`, that can list pods and
+# bind the default service account in the namespace
+# that the binding is deployed to to that role.
+#
+
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: pod-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+  resources: ["pods"]
+  verbs: ["get", "watch", "list"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: pod-reader
+subjects:
+  # Uses the default service account.
+  # Consider creating a dedicated service account to run your
+  # Apache Pekko Cluster services and binding the role to that one.
+- kind: ServiceAccount
+  name: default
+roleRef:
+  kind: Role
+  name: pod-reader
+  apiGroup: rbac.authorization.k8s.io
+#rbac-reader
+---
+#rbac-podcost-cr
+#
+# Create a role, `podcost-access`, that can update the PodCost CR
+#
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: podcost-access
+rules:
+  - apiGroups: ["pekko.apache.org"]
+    resources: ["podcosts"]
+    verbs: ["get", "create", "update", "delete", "list"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: podcost-access
+subjects:
+  - kind: User
+    name: system:serviceaccount:pekko-rolling-update-demo-cr-ns:default
+roleRef:
+  kind: Role
+  name: podcost-access
+  apiGroup: rbac.authorization.k8s.io
+#rbac-podcost-cr
diff --git 
a/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml 
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
new file mode 100644
index 00000000..e09af9e4
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
@@ -0,0 +1,122 @@
+#deployment
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  labels:
+    app: pekko-rolling-update-demo
+  name: pekko-rolling-update-demo
+spec:
+  replicas: 3
+  selector:
+    matchLabels:
+     app: pekko-rolling-update-demo
+  strategy:
+    rollingUpdate:
+      maxSurge: 1
+      maxUnavailable: 0
+    type: RollingUpdate
+
+  template:
+    metadata:
+      labels:
+        app: pekko-rolling-update-demo
+        actorSystemName: pekko-rolling-update-demo
+    spec:
+      containers:
+      - name: pekko-rolling-update-demo
+        image: integration-test-rolling-update-kubernetes:1.3.3.7
+        # Remove for a real project, the image is picked up locally for the 
integration test
+        imagePullPolicy: Never
+        #health
+        livenessProbe:
+          httpGet:
+            path: /alive
+            port: management
+        readinessProbe:
+          httpGet:
+            path: /ready
+            port: management
+        #health
+        ports:
+        # pekko-management bootstrap
+        - name: management
+          containerPort: 8558
+          protocol: TCP
+        - name: http
+          containerPort: 8080
+          protocol: TCP
+        env:
+        - name: KUBERNETES_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        # The pod deletion cost will use this var to identify the pod to be 
annotated
+        - name: KUBERNETES_POD_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: REQUIRED_CONTACT_POINT_NR
+          value: "3"
+        # Agrona requires the --add-opens JVM option to work on recent JDKs
+        - name: JAVA_TOOL_OPTIONS
+          value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75 
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED"
+#deployment
+---
+#rbac-reader
+#
+# Create a role, `pod-reader`, that can list pods and
+# bind the default service account in the namespace
+# that the binding is deployed to to that role.
+#
+
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: pod-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+  resources: ["pods"]
+  verbs: ["get", "watch", "list"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: pod-reader
+subjects:
+  # Uses the default service account.
+  # Consider creating a dedicated service account to run your
+  # Apache Pekko Cluster services and binding the role to that one.
+- kind: ServiceAccount
+  name: default
+roleRef:
+  kind: Role
+  name: pod-reader
+  apiGroup: rbac.authorization.k8s.io
+#rbac-reader
+---
+#rbac-annotator
+#
+# Create a role, `pod-annotator`, that can annotate pods
+# with the pod-deletion-cost value
+#
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: pod-annotator
+rules:
+  - apiGroups: [""] # "" indicates the core API group
+    resources: ["pods"]
+    verbs: ["patch"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: annotate-pods
+subjects:
+  - kind: User
+    name: system:serviceaccount:pekko-rolling-update-demo-ns:default
+roleRef:
+  kind: Role
+  name: pod-annotator
+  apiGroup: rbac.authorization.k8s.io
+#rbac-annotator
diff --git 
a/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
 
b/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
new file mode 100644
index 00000000..d71beef2
--- /dev/null
+++ 
b/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+  loglevel = "DEBUG"
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
+  actor.provider = cluster
+}
+
+pekko.discovery {
+  kubernetes-api {
+    pod-label-selector = "app=%s"
+  }
+}
+
+pekko.management {
+  cluster.bootstrap {
+    contact-point-discovery {
+      discovery-method = kubernetes-api
+    }
+  }
+}
diff --git 
a/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml 
b/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml
new file mode 100644
index 00000000..4ed45763
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <target>System.out</target>
+        <encoder>
+            <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - 
%m%n%xException</pattern>
+        </encoder>
+    </appender>
+
+    <root level="DEBUG">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+
+    <logger name="org.apache.pekko.rollingupdate" level="DEBUG"/>
+    <logger name="org.apache.pekko.http" level="INFO"/>
+    <logger name="org.apache.pekko.io" level="INFO"/>
+
+</configuration>
diff --git 
a/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
 
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
new file mode 100644
index 00000000..db8466d7
--- /dev/null
+++ 
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.bootstrap
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.cluster.Cluster
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.server.Directives._
+import pekko.management.cluster.bootstrap.ClusterBootstrap
+import pekko.management.scaladsl.PekkoManagement
+import pekko.rollingupdate.kubernetes.PodDeletionCost
+
+object PodDeletionCostDemoApp extends App {
+
+  implicit val system: ActorSystem = ActorSystem("pekko-rolling-update-demo")
+
+  import system.log
+  val cluster = Cluster(system)
+
+  log.info(s"Started [$system], cluster.selfAddress = ${cluster.selfAddress}")
+
+  PekkoManagement(system).start()
+
+  ClusterBootstrap(system).start()
+
+  PodDeletionCost(system).start()
+
+  Http().newServerAt("0.0.0.0", 8080).bind(complete("Hello world"))
+}
diff --git a/integration-test/rolling-update-kubernetes/test-cr.sh 
b/integration-test/rolling-update-kubernetes/test-cr.sh
new file mode 100755
index 00000000..6a20f49b
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/test-cr.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+set -exu
+
+export NAMESPACE=pekko-rolling-update-demo-cr-ns
+export APP_NAME=pekko-rolling-update-demo
+export PROJECT_NAME=integration-test-rolling-update-kubernetes
+export CRD=rolling-update-kubernetes/pod-cost.yml
+export 
DEPLOYMENT=integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
+
+integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
diff --git a/integration-test/rolling-update-kubernetes/test.sh 
b/integration-test/rolling-update-kubernetes/test.sh
new file mode 100755
index 00000000..0a980a1d
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/test.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+set -exu
+
+export NAMESPACE=pekko-rolling-update-demo-ns
+export APP_NAME=pekko-rolling-update-demo
+export PROJECT_NAME=integration-test-rolling-update-kubernetes
+export 
DEPLOYMENT=integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
+
+integration-test/scripts/rollingupdate-kubernetes-test.sh
diff --git a/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh 
b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
new file mode 100755
index 00000000..c6f4ea4a
--- /dev/null
+++ b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
@@ -0,0 +1,74 @@
+#!/bin/bash -e
+
+echo "Running rollingupdate-kubernetes-cr-test.sh with deployment: $DEPLOYMENT"
+
+eval $(minikube -p minikube docker-env)
+sbt $PROJECT_NAME/Docker/publishLocal
+
+docker images | head
+
+kubectl create namespace $NAMESPACE || true
+kubectl apply -f $CRD
+kubectl -n $NAMESPACE delete deployment $APP_NAME || true
+kubectl -n $NAMESPACE apply -f $DEPLOYMENT
+
+for i in {1..20}
+do
+  echo "Waiting for pods to get ready..."
+  kubectl get pods -n $NAMESPACE
+  phase=$(kubectl get pods -o jsonpath="{.items[*].status.phase}" -n 
$NAMESPACE)
+  status=$(kubectl get pods -o 
jsonpath="{.items[*].status.containerStatuses[*].ready}" -n $NAMESPACE)
+  if [ "$phase" == "Running Running Running" ] && [ "$status" == "true true 
true" ]
+  then
+    break
+  fi
+  sleep 4
+done
+
+if [ $i -eq 20 ]
+then
+  echo "Pods did not get ready"
+  kubectl events $APP_NAME -n $NAMESPACE
+  kubectl describe deployment $APP_NAME -n $NAMESPACE
+
+  echo ""
+  echo "Logs from all $APP_NAME containers"
+  kubectl logs -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+  echo ""
+  echo "Logs from all previous $APP_NAME containers"
+  kubectl logs -p -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+  exit -1
+fi
+
+max_tries=10
+try_count=0
+
+# Loop until all pods are included or the maximum number of tries is reached
+while true
+do
+  # Get the list of pods matching the namespace and app name, and are in the 
Running state
+  pod_list=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running | 
awk '{ print $1 }' | sort)
+
+  # Get the pods in the CR
+  cr_pod_list=$(kubectl describe podcosts.pekko.apache.org $APP_NAME -n 
$NAMESPACE | grep "Pod Name" | awk '{print $3}' | sort)
+
+  if [ "$pod_list" = "$cr_pod_list" ]
+    then
+      echo "Found expected pods in CR: $cr_pod_list"
+      break
+  else
+      echo "Expected $pod_list, but didn't find expected pods in CR: 
$cr_pod_list"
+  fi
+
+  # Increment the try count and check if the maximum number of tries is reached
+  try_count=$((try_count+1))
+  if [[ $try_count -ge $max_tries ]]; then
+    echo "Exceeded max retries, aborting"
+    exit 1
+  fi
+
+  # Wait for 10 seconds before trying again
+  sleep 10
+done
diff --git a/integration-test/scripts/rollingupdate-kubernetes-test.sh 
b/integration-test/scripts/rollingupdate-kubernetes-test.sh
new file mode 100755
index 00000000..3d61fa4d
--- /dev/null
+++ b/integration-test/scripts/rollingupdate-kubernetes-test.sh
@@ -0,0 +1,134 @@
+#!/bin/bash -e
+
+echo "Running rollingupdate-kubernetes-test.sh with deployment: $DEPLOYMENT"
+
+eval $(minikube -p minikube docker-env)
+sbt $PROJECT_NAME/Docker/publishLocal
+
+docker images | head
+
+kubectl create namespace $NAMESPACE || true
+kubectl -n $NAMESPACE delete deployment $APP_NAME || true
+kubectl -n $NAMESPACE apply -f $DEPLOYMENT
+
+for i in {1..20}
+do
+  echo "Waiting for pods to get ready..."
+  kubectl get pods -n $NAMESPACE
+  phase=$(kubectl get pods -o jsonpath="{.items[*].status.phase}" -n 
$NAMESPACE)
+  status=$(kubectl get pods -o 
jsonpath="{.items[*].status.containerStatuses[*].ready}" -n $NAMESPACE)
+  if [ "$phase" == "Running Running Running" ] && [ "$status" == "true true 
true" ]
+  then
+    break
+  fi
+  sleep 4
+done
+
+if [ $i -eq 20 ]
+then
+  echo "Pods did not get ready"
+  kubectl events $APP_NAME -n $NAMESPACE
+  kubectl describe deployment $APP_NAME -n $NAMESPACE
+
+  echo ""
+  echo "Logs from all $APP_NAME containers"
+  kubectl logs -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+  echo ""
+  echo "Logs from all previous $APP_NAME containers"
+  kubectl logs -p -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+  exit -1
+fi
+
+max_tries=10
+try_count=0
+# Declare a map to store the annotated values for each pod
+declare -a pod_annotation_array
+
+# Loop until all pods are annotated or the maximum number of tries is reached
+while true
+do
+  # Get the list of pods matching the namespace and app name, and are in the 
Running state
+  pod_list=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running | 
awk '{ print $1 }')
+  annotated_count=0
+  pod_annotation_array=()
+  echo -e "## Pod List:\n$pod_list\n##"
+
+  for pod_name in $pod_list
+  do
+    # Get the value of the annotation for the pod
+    annotation_value=$(kubectl describe pod $pod_name -n $NAMESPACE | grep 
controller.kubernetes.io/pod-deletion-cost | awk '{print $3}')
+
+    # Check if the annotation value is set or empty
+    if [ -z "$annotation_value" ]
+    then
+      echo "The annotation value for pod $pod_name is empty"
+    else
+      echo "The annotation value for pod $pod_name is set to $annotation_value"
+      annotated_count=$((annotated_count+1))
+      # Store the annotated value for the pod in the map
+      pod_annotation_array+=("$annotation_value")
+    fi
+  done
+
+  if [ ! -z "$pod_list" ] && [ $annotated_count -eq $(echo $pod_list | wc -w) ]
+  then
+    echo "All pods were annotated successfully!"
+    break
+  fi
+
+  # Increment the try count and check if the maximum number of tries is reached
+  try_count=$((try_count+1))
+  if [ $try_count -eq $max_tries ]
+  then
+    echo "Maximum number of tries reached, not all pods are annotated"
+
+    echo "Logs"
+    echo "=============================="
+    for POD in $(kubectl get pods -n $NAMESPACE | grep $APP_NAME | awk '{ 
print $1 }')
+    do
+      echo "Logging for $POD"
+      kubectl logs $POD -n $NAMESPACE
+    done
+
+    exit 1
+  fi
+
+  # Wait for 10 seconds before trying again
+  sleep 10
+done
+
+# Get the name of the pod with the highest annotated value
+highest_annotated_index=$(echo "${pod_annotation_array[@]}" | tr ' ' '\n' | nl 
-v 1 | sort -nr -k 2 | head -n 1 | awk '{ print $1 }')
+highest_annotated_pod=$(echo $pod_list | cut -d ' ' -f 
$highest_annotated_index)
+
+# Scale down the cluster to one pod
+kubectl scale deployment $APP_NAME -n $NAMESPACE --replicas=1
+
+# Wait for the deployment to be scaled down
+for ((try_count=0; try_count<max_tries; try_count++)); do
+  pod_count=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running | 
wc -l)
+  if [ $pod_count -eq 1 ]
+  then
+    echo "Cluster scaled down to one pod"
+    break
+  fi
+  sleep 5
+done
+if [[ $try_count -ge $max_tries ]]; then
+  echo "Exceeded max retries, aborting"
+  exit 1
+fi
+
+# Get the name of the remaining pod
+remaining_pod=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running 
| awk '{ print $1 }')
+
+# Compare the remaining pod with the highest annotated pod
+if [ "$remaining_pod" = "$highest_annotated_pod" ]
+then
+  echo "Success: The remaining pod $remaining_pod is the highest annotated 
pod!"
+else
+  echo "Error: The remaining pod ($remaining_pod) is not the same as the 
highest annotated pod ($highest_annotated_pod)"
+  exit 1
+fi
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index d0e675db..58ef0694 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -166,6 +166,10 @@ object Dependencies {
     "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test) ++
     wireMockDependencies
 
+  val rollingUpdateKubernetesIntTest = Seq(
+    "org.scalatest" %% "scalatest" % scalaTestVersion,
+    "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test)
+
   val leaseKubernetesTest = Seq(
     "org.scalatest" %% "scalatest" % scalaTestVersion,
     "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test)
diff --git 
a/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
 
b/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
new file mode 100644
index 00000000..14378ea5
--- /dev/null
+++ 
b/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.cluster.Cluster
+import pekko.testkit.TestKit
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.CancelAfterFailure
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+/**
+ * This test requires an API server available on localhost:8080, the PodCost 
CRD created and a namespace called `rolling`
+ *
+ * One way of doing this is to have a kubectl proxy open:
+ *
+ * `kubectl proxy --port=8080`
+ */
+class KubernetesApiIntegrationTest
+    extends TestKit(
+      ActorSystem(
+        "KubernetesApiIntegrationSpec",
+        ConfigFactory.parseString("""
+    pekko.loglevel = DEBUG
+    pekko.actor.provider = cluster
+    pekko.remote.artery.canonical.port = 0
+    pekko.remote.artery.canonical.hostname = 127.0.0.1
+    """)))
+    with AnyWordSpecLike
+    with Matchers
+    with ScalaFutures
+    with BeforeAndAfterAll
+    with CancelAfterFailure
+    with Eventually {
+
+  implicit val patience: PatienceConfig = 
PatienceConfig(testKitSettings.DefaultTimeout.duration)
+
+  private val cluster = Cluster(system)
+
+  private val settings = new KubernetesSettings(
+    "",
+    "",
+    "localhost",
+    8080,
+    namespace = Some("rolling"),
+    "",
+    podName = "pod1",
+    secure = false,
+    apiServiceRequestTimeout = 1.second,
+    new CustomResourceSettings(
+      enabled = true,
+      crName = None,
+      cleanupAfter = 60.seconds
+    )
+  )
+
+  private val underTest =
+    new KubernetesApiImpl(system, settings, settings.namespace.get, apiToken = 
"", clientHttpsConnectionContext = None)
+  private val crName = KubernetesApi.makeDNS1039Compatible(system.name)
+  private val podName1 = "pod1"
+  private val podName2 = "pod2"
+  private var currentVersion = ""
+
+  override protected def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  override protected def beforeAll(): Unit = {
+    // do some operation to check the proxy is up
+    eventually {
+      Await.result(underTest.removePodCostResource(crName), 2.second) 
shouldEqual Done
+    }
+  }
+
+  "Kubernetes PodCost resource" should {
+    "be able to be created" in {
+      val podCostResource = 
underTest.readOrCreatePodCostResource(crName).futureValue
+      podCostResource.version shouldNot equal("")
+      podCostResource.version shouldNot equal(null)
+      podCostResource.pods shouldEqual Nil
+      currentVersion = podCostResource.version
+    }
+
+    "be able to read back with same version" in {
+      val podCostResource = 
underTest.readOrCreatePodCostResource(crName).futureValue
+      podCostResource.version shouldEqual currentVersion
+    }
+
+    "be able to update empty resource" in {
+      val podCost = PodCost(
+        podName1,
+        1,
+        cluster.selfUniqueAddress.address.toString,
+        cluster.selfUniqueAddress.longUid,
+        System.currentTimeMillis())
+      val podCostResource = underTest.updatePodCostResource(crName, 
currentVersion, Vector(podCost)).futureValue
+      val success: PodCostResource = podCostResource match {
+        case Right(r) => r
+        case Left(_)  => fail("There shouldn't be anyone else updating the 
resource.")
+      }
+      success.version shouldNot equal(currentVersion)
+      currentVersion = success.version
+      success.pods shouldEqual Vector(podCost)
+    }
+
+    "be able to update a resource if resource version is correct" in {
+      val podCost = PodCost(
+        podName1,
+        2,
+        cluster.selfUniqueAddress.address.toString,
+        cluster.selfUniqueAddress.longUid,
+        System.currentTimeMillis())
+      val podCostResource = underTest.updatePodCostResource(crName, 
currentVersion, Vector(podCost)).futureValue
+      val success: PodCostResource = podCostResource match {
+        case Right(r) => r
+        case Left(_)  => fail("There shouldn't be anyone else updating the 
resource.")
+      }
+      success.version shouldNot equal(currentVersion)
+      currentVersion = success.version
+      success.pods shouldEqual Vector(podCost)
+    }
+
+    "not be able to update a resource if resource version is incorrect" in {
+      val podCost = PodCost(
+        podName1,
+        3,
+        cluster.selfUniqueAddress.address.toString,
+        cluster.selfUniqueAddress.longUid,
+        System.currentTimeMillis())
+      val podCostResource = underTest.updatePodCostResource(crName, version = 
"10", Vector(podCost)).futureValue
+      val failure: PodCostResource = podCostResource match {
+        case Right(_) => fail("Expected update failure (we've used an invalid 
version!).")
+        case Left(r)  => r
+      }
+      failure.version shouldEqual currentVersion
+      currentVersion = failure.version
+      failure.pods.head.cost shouldNot equal(podCost.cost)
+      failure.pods.head.time shouldNot equal(podCost.time)
+    }
+
+    "be able to add more to the resource" in {
+      val podCost2 = PodCost(
+        podName2,
+        4,
+        cluster.selfUniqueAddress.address.toString,
+        cluster.selfUniqueAddress.longUid,
+        System.currentTimeMillis())
+      val podCostResource1 = 
underTest.readOrCreatePodCostResource(crName).futureValue
+      val podCostResource2 =
+        underTest.updatePodCostResource(crName, currentVersion, 
podCostResource1.pods :+ podCost2).futureValue
+      val success: PodCostResource = podCostResource2 match {
+        case Right(r) => r
+        case Left(_)  => fail("There shouldn't be anyone else updating the 
resource.")
+      }
+      success.version shouldNot equal(currentVersion)
+      currentVersion = success.version
+      success.pods.last shouldEqual podCost2
+      success.pods.size shouldEqual podCostResource1.pods.size + 1
+    }
+  }
+
+}
diff --git a/rolling-update-kubernetes/pod-cost-example.yml 
b/rolling-update-kubernetes/pod-cost-example.yml
new file mode 100644
index 00000000..4c58cd59
--- /dev/null
+++ b/rolling-update-kubernetes/pod-cost-example.yml
@@ -0,0 +1,16 @@
+apiVersion: pekko.apache.org/v1
+kind: PodCost
+metadata:
+  name: sampleservice
+spec:
+  pods:
+    - address: pekko://[email protected]:2552
+      cost: 10000
+      podName: "pod1"
+      time: 1681909802669
+      uid: 3870636288020406585
+    - address: pekko://[email protected]:2552
+      cost: 9900
+      podName: "pod2"
+      time: 1681909802716
+      uid: 1681910047053
diff --git a/rolling-update-kubernetes/pod-cost.yml 
b/rolling-update-kubernetes/pod-cost.yml
new file mode 100644
index 00000000..06c6f654
--- /dev/null
+++ b/rolling-update-kubernetes/pod-cost.yml
@@ -0,0 +1,49 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  # name must match the spec fields below, and be in the form: <plural>.<group>
+  name: podcosts.pekko.apache.org
+spec:
+  group: pekko.apache.org
+  versions:
+   - name: v1
+     storage: true
+     served: true
+     schema:
+       openAPIV3Schema:
+         type: object
+         properties:
+           spec:
+             type: object
+             properties:
+               version:
+                 type: string
+               pods:
+                 type: array
+                 items:
+                   type: object
+                   properties:
+                     # the name of the pod that should be updated with the 
pod-deletion-cost annotation
+                     podName:
+                       type: string
+                     # the value of the 
controller.kubernetes.io/pod-deletion-cost annotation
+                     cost:
+                       type: integer
+                     # address, uid and time are used for cleanup of removed 
members
+                     address:
+                       type: string
+                     # address, uid and time are used for cleanup of removed 
members
+                     uid:
+                       type: integer
+                     # address, uid and time are used for cleanup of removed 
members
+                     time:
+                       type: integer
+  scope: Namespaced
+  names:
+    # kind is normally the CamelCased singular type. Your resource manifests 
use this.
+    kind: PodCost
+    listKind: PodCostList
+    # singular name to be used as an alias on the CLI and for display
+    singular: podcost
+    # plural name to be used in the URL: /apis/<group>/<version>/<plural>
+    plural: podcosts
diff --git a/rolling-update-kubernetes/src/main/resources/reference.conf 
b/rolling-update-kubernetes/src/main/resources/reference.conf
index 4bc33b3a..e33010a1 100644
--- a/rolling-update-kubernetes/src/main/resources/reference.conf
+++ b/rolling-update-kubernetes/src/main/resources/reference.conf
@@ -19,6 +19,8 @@ pekko.rollingupdate.kubernetes {
     api-service-port = 8080
     api-service-port = ${?KUBERNETES_SERVICE_PORT}
 
+    api-service-request-timeout = 2s
+
     # Namespace file path. The namespace is to create the lock in. Can be 
overridden by "namespace"
     #
     # If this path doesn't exist, the namespace will default to "default".
@@ -46,4 +48,20 @@ pekko.rollingupdate.kubernetes {
         # Fixed time delay between retries when pod annotation fails
         retry-delay = 5s
     }
+
+    # An alternative if allowing PATCH of the pod resource is a security 
concern is to use a custom resource.
+    # Instead of updating the "controller.kubernetes.io/pod-deletion-cost" 
annotation directly it will
+    # update a PodCost custom resource and then you would have an operator 
that reconciles that and updates the
+    # pod-deletion-cost annotation of the pod resource.
+    custom-resource {
+      # When enabled the PodCost custom resource is updated instead of 
directly updating
+      # the "controller.kubernetes.io/pod-deletion-cost" annotation.
+      enabled = off
+      # The name of the custom resource instance (CR). If undefined, it will 
use the ActorSystem name.
+      # It's recommended to use a separate CR for each Pekko Cluster, but it's 
possible to share the
+      # CR in case it is preferred to have only one CR per namespace.
+      cr-name = ""
+      # Remove old entries that don't exist in the cluster membership after 
this duration.
+      cleanup-after = 60s
+    }
 }
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
index 57aa3cbd..80037ac0 100644
--- 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
@@ -13,8 +13,9 @@
 
 package org.apache.pekko.rollingupdate
 
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.cluster.Member
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.cluster.Member
 
 import scala.collection.SortedSet
 
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
deleted file mode 100644
index d1e8d294..00000000
--- 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.rollingupdate.kubernetes
-
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.http.scaladsl.model.HttpMethods.PATCH
-import org.apache.pekko.http.scaladsl.model.headers.Authorization
-import org.apache.pekko.http.scaladsl.model.headers.OAuth2BearerToken
-import org.apache.pekko.http.scaladsl.model.HttpEntity
-import org.apache.pekko.http.scaladsl.model.HttpRequest
-import org.apache.pekko.http.scaladsl.model.MediaTypes
-import org.apache.pekko.http.scaladsl.model.Uri
-import org.apache.pekko.util.ByteString
-
-import scala.collection.immutable
-
-/**
- * INTERNAL API
- */
-@InternalApi private[kubernetes] object ApiRequests {
-
-  def podDeletionCost(settings: KubernetesSettings, apiToken: String, 
namespace: String, cost: Int): HttpRequest = {
-    val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / 
"pods" / settings.podName
-    val scheme = if (settings.secure) "https" else "http"
-    val uri = Uri.from(scheme, host = settings.apiServiceHost, port = 
settings.apiServicePort).withPath(path)
-    val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
-
-    HttpRequest(
-      method = PATCH,
-      uri = uri,
-      headers = headers,
-      entity = HttpEntity(
-        MediaTypes.`application/merge-patch+json`,
-        ByteString(
-          s"""{"metadata": {"annotations": 
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""
-        ))
-    )
-  }
-
-}
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
new file mode 100644
index 00000000..9397180b
--- /dev/null
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import java.text.Normalizer
+
+import scala.collection.immutable
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.AddressFromURIString
+import pekko.annotation.InternalApi
+import pekko.cluster.UniqueAddress
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class PodCostResource(version: String, pods: 
immutable.Seq[PodCost])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class PodCost(podName: String, cost: Int, address: 
String, uid: Long, time: Long) {
+  @transient
+  lazy val uniqueAddress: UniqueAddress = 
UniqueAddress(AddressFromURIString(address), uid)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class PodCostException(message: String) 
extends RuntimeException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class PodCostTimeoutException(message: 
String) extends PodCostException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class PodCostClientException(message: 
String) extends PodCostException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object KubernetesApi {
+
+  /**
+   * Removes from the leading and trailing positions the specified characters.
+   */
+  private def trim(name: String, characters: List[Char]): String =
+    
name.dropWhile(characters.contains(_)).reverse.dropWhile(characters.contains(_)).reverse
+
+  /**
+   * Make a name compatible with DNS 1035 standard: like a single domain name 
segment.
+   * Regex to follow: [a-z]([-a-z0-9]*[a-z0-9])
+   * Validates the resulting name to be at most 63 characters, otherwise 
throws `IllegalArgumentException`.
+   */
+  def makeDNS1039Compatible(name: String): String = {
+    val normalized =
+      Normalizer.normalize(name, 
Normalizer.Form.NFKD).toLowerCase.replaceAll("[_.]", 
"-").replaceAll("[^-a-z0-9]", "")
+    if (normalized.length > 63)
+      throw new IllegalArgumentException(s"Too long resource name 
[$normalized]. At most 63 characters are accepted. " +
+        "A custom resource name can be defined in configuration 
`pekko.rollingupdate.kubernetes.custom-resource.cr-name`.")
+    trim(normalized, List('-'))
+  }
+}
+
+/**
+ * INTERNAL API
+ */
+private[pekko] trait KubernetesApi {
+
+  def namespace: String
+
+  def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done]
+
+  /**
+   * Reads a PodCost from the API server. If it doesn't exist it tries to 
create it.
+   * The creation can fail due to another instance creating at the same time, 
in this case
+   * the read is retried.
+   */
+  def readOrCreatePodCostResource(crName: String): Future[PodCostResource]
+
+  /**
+   * Update the named resource.
+   *
+   * Must call [[readOrCreatePodCostResource]] first to get a resource version.
+   *
+   * Can return one of three things:
+   *  - Future failure e.g. timed out waiting for k8s api server to respond
+   *  - Left - Update failed due to version not matching current in the k8s 
api server. In this case resource is returned so the version can be used for 
subsequent calls
+   *  - Right - Success
+   *
+   *  Any subsequent updates should also use the latest version or re-read 
with [[readOrCreatePodCostResource]]
+   */
+  def updatePodCostResource(
+      crName: String,
+      version: String,
+      pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, 
PodCostResource]]
+
+}
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
new file mode 100644
index 00000000..61437a2b
--- /dev/null
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.collection.immutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
+import pekko.event.Logging
+import pekko.event.LoggingAdapter
+import pekko.http.scaladsl.ConnectionContext
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.HttpsConnectionContext
+import pekko.http.scaladsl.marshalling.Marshal
+import pekko.http.scaladsl.model.HttpEntity
+import pekko.http.scaladsl.model.HttpMethods.PATCH
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.MediaTypes
+import pekko.http.scaladsl.model.StatusCodes.ClientError
+import pekko.http.scaladsl.model.Uri
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.Authorization
+import pekko.http.scaladsl.model.headers.OAuth2BearerToken
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import pekko.pattern.after
+import pekko.pki.kubernetes.PemManagersProvider
+import pekko.util.ByteString
+
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.security.KeyStore
+import java.security.SecureRandom
+import javax.net.ssl.KeyManager
+import javax.net.ssl.KeyManagerFactory
+import javax.net.ssl.SSLContext
+import javax.net.ssl.TrustManager
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] class KubernetesApiImpl(
+    system: ActorSystem,
+    settings: KubernetesSettings,
+    override val namespace: String,
+    apiToken: String,
+    clientHttpsConnectionContext: Option[HttpsConnectionContext])
+    extends KubernetesApi
+    with KubernetesJsonSupport {
+
+  import system.dispatcher
+
+  private implicit val sys: ActorSystem = system
+  private val log = Logging(system, classOf[KubernetesApiImpl])
+  private val http = Http()(system)
+
+  private val scheme = if (settings.secure) "https" else "http"
+  private lazy val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+
+  log.debug("kubernetes access namespace: {}. Secure: {}", namespace, 
settings.secure)
+
+  override def updatePodDeletionCostAnnotation(podName: String, cost: Int): 
Future[Done] = {
+    val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / 
"pods" / podName
+    val uri = Uri.from(scheme, host = settings.apiServiceHost, port = 
settings.apiServicePort).withPath(path)
+
+    val httpRequest = HttpRequest(
+      method = PATCH,
+      uri = uri,
+      headers = headers,
+      entity = HttpEntity(
+        MediaTypes.`application/merge-patch+json`,
+        ByteString(
+          s"""{"metadata": {"annotations": 
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""
+        ))
+    )
+    val httpResponse = makeRequest(
+      httpRequest,
+      s"Timed out updating pod-deletion-cost annotation for pod: [$podName] 
with cost: [$cost]. Namespace: [$namespace]")
+    httpResponse.map {
+      case HttpResponse(status, _, e, _) if status.isSuccess() =>
+        e.discardBytes()
+        Done
+      case HttpResponse(s @ ClientError(_), _, e, _) =>
+        e.discardBytes()
+        throw new PodCostClientException(s.toString())
+      case HttpResponse(status, _, e, _) =>
+        e.discardBytes()
+        throw new PodCostException(s"Request failed with status=$status")
+    }
+  }
+
+  /*
+  PATH: to get all: /apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts
+  PATH: to get a specific one: 
/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/<system-name>
+  curl -v -X POST 
localhost:8080/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/ -H 
"Content-Type: application/yaml" --data-binary "@pod-cost-example.yml"
+
+  responds with either:
+  409 Conflict Already Exists
+
+  OR
+
+  201 Created if it works
+   */
+  override def readOrCreatePodCostResource(crName: String): 
Future[PodCostResource] = {
+    val maxTries = 5
+
+    def loop(tries: Int = 0): Future[PodCostResource] = {
+      log.debug("Trying to create PodCost {}", tries)
+      for {
+        oldResource <- getPodCostResource(crName)
+        lr <- oldResource match {
+          case Some(found) =>
+            log.debug("{} already exists. Returning {}", crName, found)
+            Future.successful(found)
+          case None =>
+            log.info("PodCost {} does not exist, creating", crName)
+            createPodCostResource(crName).flatMap {
+              case Some(created) => Future.successful(created)
+              case None          =>
+                if (tries < maxTries) loop(tries + 1)
+                else Future.failed(new PodCostException(s"Unable to create or 
read PodCost after $maxTries tries"))
+            }
+        }
+      } yield lr
+    }
+
+    loop()
+  }
+
+  /*
+curl -v -X PUT 
localhost:8080/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/<system-name>
 --data-binary "@pod-cost-example.yml" -H "Content-Type: application/yaml"
+PUTs must contain resourceVersions. Response:
+409: Resource version is out of date
+200 if it is updated
+   */
+  /**
+   * Update the named resource.
+   */
+  override def updatePodCostResource(
+      crName: String,
+      version: String,
+      pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, 
PodCostResource]] = {
+    val cr = PodCostCustomResource(Metadata(crName, Some(version)), Spec(pods))
+    for {
+      entity <- Marshal(cr).to[RequestEntity]
+      response <- {
+        log.debug("updating {} to {}", crName, cr)
+        makeRequest(
+          requestForPath(pathForPodCostResource(crName), method = 
HttpMethods.PUT, entity),
+          s"Timed out updating PodCost [$crName]. It is not known if the 
update happened"
+        )
+      }
+      result <- response.status match {
+        case StatusCodes.OK =>
+          Unmarshal(response.entity)
+            .to[PodCostCustomResource]
+            .map(updatedCr => {
+              log.debug("CR after update: {}", updatedCr)
+              Right(toPodCostResource(updatedCr))
+            })
+        case StatusCodes.Conflict =>
+          getPodCostResource(crName).map {
+            case None =>
+              throw new PodCostException(s"GET after PUT conflict did not 
return a PodCost [$crName]")
+            case Some(cr) =>
+              log.debug("PodCostResource read after conflict: {}", cr)
+              Left(cr)
+          }
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .map(body =>
+              throw new PodCostException(
+                s"PUT for PodCost [$crName] returned unexpected status code 
$unexpected. Body: $body"))
+      }
+    } yield result
+  }
+
+  private[pekko] def removePodCostResource(crName: String): Future[Done] = {
+    for {
+      response <- makeRequest(
+        requestForPath(pathForPodCostResource(crName), HttpMethods.DELETE),
+        s"Timed out removing PodCost [$crName]. It is not known if the remove 
happened")
+
+      result <- response.status match {
+        case StatusCodes.OK =>
+          log.debug("PodCost deleted [{}]", crName)
+          response.discardEntityBytes()
+          Future.successful(Done)
+        case StatusCodes.NotFound =>
+          log.debug("PodCost already deleted [{}]", crName)
+          response.discardEntityBytes()
+          Future.successful(Done) // already deleted
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .map(body =>
+              throw new PodCostException(
+                s"Unexpected status code when deleting PodCost. Status: 
$unexpected. Body: $body"))
+      }
+    } yield result
+  }
+
+  private def getPodCostResource(crName: String): 
Future[Option[PodCostResource]] = {
+    val fResponse = 
makeRequest(requestForPath(pathForPodCostResource(crName)), s"Timed out reading 
PodCost [$crName]")
+    for {
+      response <- fResponse
+      entity <- response.entity.toStrict(settings.bodyReadTimeout)
+      lr <- response.status match {
+        case StatusCodes.OK =>
+          log.debug("Resource [{}] exists: {}", crName, entity)
+          Unmarshal(entity).to[PodCostCustomResource].map(cr => 
Some(toPodCostResource(cr)))
+        case StatusCodes.NotFound =>
+          response.discardEntityBytes()
+          log.debug("Resource [{}] does not exist", crName)
+          Future.successful(None)
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .map(body =>
+              throw new PodCostException(
+                s"Unexpected response from API server when retrieving PodCost 
StatusCode: $unexpected. Body: $body"))
+      }
+    } yield lr
+  }
+
+  private def handleUnauthorized(response: HttpResponse) = {
+    Unmarshal(response.entity)
+      .to[String]
+      .map(body =>
+        throw new PodCostException(
+          "Unauthorized to communicate with Kubernetes API server. See " +
+          
"https://pekko.apache.org/docs/pekko-management/current/rolling-updates.html#role-based-access-control
 " +
+          s"for setting up access control. Body: $body"))
+  }
+
+  private def pathForPodCostResource(crName: String): Uri.Path =
+    Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" / 
namespace / "podcosts" /
+    crName
+      .replaceAll("[^\\d\\w\\-\\.]", "")
+      .toLowerCase
+
+  private def requestForPath(
+      path: Uri.Path,
+      method: HttpMethod = HttpMethods.GET,
+      entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+    val uri = Uri.from(scheme = scheme, host = settings.apiServiceHost, port = 
settings.apiServicePort).withPath(path)
+    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+  }
+
+  private def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
+    val response = {
+      clientHttpsConnectionContext match {
+        case None                         => http.singleRequest(request)
+        case Some(httpsConnectionContext) => http.singleRequest(request, 
httpsConnectionContext)
+      }
+    }
+
+    // make sure we always consume response body (in case of timeout)
+    val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
+
+    val timeout = after(settings.apiServiceRequestTimeout, using = 
system.scheduler)(
+      Future.failed(new PodCostTimeoutException(s"$timeoutMsg. Is the API 
server up?")))
+
+    Future.firstCompletedOf(Seq(strictResponse, timeout))
+  }
+
+  private def toPodCostResource(cr: PodCostCustomResource) = {
+    log.debug("Converting {}", cr)
+    require(
+      cr.metadata.resourceVersion.isDefined,
+      s"PodCostCustomResource returned from Kubernetes without a 
resourceVersion: $cr")
+    PodCostResource(cr.metadata.resourceVersion.get, cr.spec.pods)
+  }
+
+  private def createPodCostResource(crName: String): 
Future[Option[PodCostResource]] = {
+    val cr = PodCostCustomResource(Metadata(crName, None), Spec(pods = 
Vector.empty))
+    for {
+      entity <- Marshal(cr).to[RequestEntity]
+      response <- makeRequest(
+        requestForPath(pathForPodCostResource(crName), HttpMethods.POST, 
entity = entity),
+        s"Timed out creating PodCost $crName")
+      responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+      resource <- response.status match {
+        case StatusCodes.Created =>
+          log.debug("PodCost resource created")
+          Unmarshal(responseEntity).to[PodCostCustomResource].map(cr => 
Some(toPodCostResource(cr)))
+        case StatusCodes.Conflict =>
+          log.debug("creation of PodCost resource failed as already exists. 
Will attempt to read again")
+          entity.discardBytes()
+          // someone else has created it
+          Future.successful(None)
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(responseEntity)
+            .to[String]
+            .map(body =>
+              throw new PodCostException(
+                s"Unexpected response from API server when creating PodCost 
StatusCode: $unexpected. Body: $body"))
+      }
+    } yield resource
+  }
+
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object KubernetesApiImpl {
+  def apply(log: LoggingAdapter, k8sSettings: KubernetesSettings)(implicit 
system: ActorSystem) = {
+    implicit val blockingDispatcher: ExecutionContext = 
system.dispatchers.lookup(DefaultBlockingDispatcherId)
+    for {
+      apiToken: String <- Future {
+        readConfigVarFromFilesystem(k8sSettings.apiTokenPath, "api-token", 
log).getOrElse("")
+      }
+      podNamespace: String <- Future {
+        k8sSettings.namespace
+          .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath, 
"namespace", log))
+          .getOrElse("default")
+      }
+      httpsContext <- Future(clientHttpsConnectionContext(k8sSettings))
+    } yield {
+      new KubernetesApiImpl(system, k8sSettings, podNamespace, apiToken, 
httpsContext)
+    }
+  }
+
+  /**
+   * This uses blocking IO, and so should only be used to read configuration 
at startup from blocking dispatcher.
+   */
+  private def readConfigVarFromFilesystem(path: String, name: String, log: 
LoggingAdapter): Option[String] = {
+    val file = Paths.get(path)
+    if (Files.exists(file)) {
+      try {
+        Some(new String(Files.readAllBytes(file), "utf-8"))
+      } catch {
+        case NonFatal(e) =>
+          log.error(e, "Error reading {} from {}", name, path)
+          None
+      }
+    } else {
+      log.warning("Unable to read {} from {} because it doesn't exist.", name, 
path)
+      None
+    }
+  }
+
+  /**
+   * This uses blocking IO, and so should only be used at startup from 
blocking dispatcher.
+   */
+  private def clientHttpsConnectionContext(k8sSettings: KubernetesSettings): 
Option[HttpsConnectionContext] = {
+    if (k8sSettings.secure) {
+      val certificates = 
PemManagersProvider.loadCertificates(k8sSettings.apiCaPath)
+      val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+      val keyStore = KeyStore.getInstance("PKCS12")
+      keyStore.load(null)
+      factory.init(keyStore, Array.empty)
+      val km: Array[KeyManager] = factory.getKeyManagers
+      val tm: Array[TrustManager] =
+        PemManagersProvider.buildTrustManagers(certificates)
+      val random: SecureRandom = new SecureRandom
+      val sslContext = SSLContext.getInstance("TLSv1.2")
+      sslContext.init(km, tm, random)
+      Some(ConnectionContext.httpsClient(sslContext))
+    } else
+      None
+  }
+}
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
new file mode 100644
index 00000000..e46eb427
--- /dev/null
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.collection.immutable
+
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
+import spray.json.DefaultJsonProtocol
+import spray.json.JsonFormat
+import spray.json.RootJsonFormat
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class PodCostCustomResource(
+    metadata: Metadata,
+    spec: Spec,
+    kind: String = "PodCost",
+    apiVersion: String = "pekko.apache.org/v1")
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class Metadata(name: String, resourceVersion: Option[String])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class Spec(pods: immutable.Seq[PodCost])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+trait KubernetesJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
+  implicit val metadataFormat: JsonFormat[Metadata] = 
jsonFormat2(Metadata.apply)
+  implicit val podCostFormat: JsonFormat[PodCost] = jsonFormat5(PodCost.apply)
+  implicit val specFormat: JsonFormat[Spec] = jsonFormat1(Spec.apply)
+  implicit val podCostCustomResourceFormat: 
RootJsonFormat[PodCostCustomResource] = jsonFormat4(
+    PodCostCustomResource.apply)
+}
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
index 0bee4cf4..aa5249ce 100644
--- 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
@@ -8,12 +8,16 @@
  */
 
 /*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.rollingupdate.kubernetes
 
-import org.apache.pekko.annotation.InternalApi
+import scala.concurrent.duration.{ DurationInt, FiniteDuration }
+import scala.jdk.DurationConverters._
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
 import com.typesafe.config.Config
 
 /**
@@ -33,6 +37,17 @@ private[kubernetes] object KubernetesSettings {
   }
 
   def apply(config: Config): KubernetesSettings = {
+    val crName = config.getString("custom-resource.cr-name") match {
+      case ""   => None
+      case name => Some(name)
+    }
+
+    val customResourceSettings = new CustomResourceSettings(
+      enabled = config.getBoolean("custom-resource.enabled"),
+      crName = crName,
+      cleanupAfter = 
config.getDuration("custom-resource.cleanup-after").toScala
+    )
+
     new KubernetesSettings(
       config.getString("api-ca-path"),
       config.getString("api-token-path"),
@@ -41,7 +56,9 @@ private[kubernetes] object KubernetesSettings {
       config.optDefinedValue("namespace"),
       config.getString("namespace-path"),
       config.getString("pod-name"),
-      config.getBoolean("secure-api-server")
+      config.getBoolean("secure-api-server"),
+      config.getDuration("api-service-request-timeout").toScala,
+      customResourceSettings
     )
   }
 }
@@ -58,4 +75,18 @@ private[kubernetes] class KubernetesSettings(
     val namespace: Option[String],
     val namespacePath: String,
     val podName: String,
-    val secure: Boolean)
+    val secure: Boolean,
+    val apiServiceRequestTimeout: FiniteDuration,
+    val customResourceSettings: CustomResourceSettings,
+    val bodyReadTimeout: FiniteDuration = 1.second
+)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kubernetes] class CustomResourceSettings(
+    val enabled: Boolean,
+    val crName: Option[String],
+    val cleanupAfter: FiniteDuration
+)
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
index 3c1f7817..4e244a48 100644
--- 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
@@ -8,32 +8,24 @@
  */
 
 /*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.rollingupdate.kubernetes
 
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.ClassicActorSystemProvider
-import org.apache.pekko.actor.ExtendedActorSystem
-import org.apache.pekko.actor.Extension
-import org.apache.pekko.actor.ExtensionId
-import org.apache.pekko.actor.ExtensionIdProvider
-import org.apache.pekko.actor.Props
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
-import org.apache.pekko.event.Logging
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.BootstrapStep
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.Initializing
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.NotRunning
-
-import java.nio.file.Files
-import java.nio.file.Paths
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.AtomicBoolean
 import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
 import scala.util.control.NonFatal
 
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.Extension
+import pekko.actor.ExtensionId
+import pekko.actor.ExtensionIdProvider
+import pekko.event.Logging
+
 final class PodDeletionCost(implicit system: ExtendedActorSystem) extends 
Extension {
 
   private val log = Logging(system, classOf[PodDeletionCost])
@@ -41,9 +33,11 @@ final class PodDeletionCost(implicit system: 
ExtendedActorSystem) extends Extens
   private val config = system.settings.config.getConfig(configPath)
   private val k8sSettings = KubernetesSettings(config)
   private val costSettings = PodDeletionCostSettings(config)
+  implicit private val ec: ExecutionContext = system.dispatcher
+
   log.debug("Settings {}", k8sSettings)
 
-  private final val startStep = new AtomicReference[BootstrapStep](NotRunning)
+  private final val startStep = new AtomicBoolean(false)
 
   def start(): Unit = {
     if (k8sSettings.podName.isEmpty) {
@@ -51,42 +45,32 @@ final class PodDeletionCost(implicit system: 
ExtendedActorSystem) extends Extens
         "No configuration found to extract the pod name from. " +
         s"Be sure to provide the pod name with `$configPath.pod-name` " +
         "or by setting ENV variable `KUBERNETES_POD_NAME`.")
-    } else if (startStep.compareAndSet(NotRunning, Initializing)) {
-      log.debug("Starting PodDeletionCost for podName={} with settings={}", 
k8sSettings.podName, costSettings)
-
-      implicit val blockingDispatcher: ExecutionContext = 
system.dispatchers.lookup(DefaultBlockingDispatcherId)
-      val props = for {
-        apiToken: String <- Future { 
readConfigVarFromFilesystem(k8sSettings.apiTokenPath, 
"api-token").getOrElse("") }
-        podNamespace: String <- Future {
-          k8sSettings.namespace
-            .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath, 
"namespace"))
-            .getOrElse("default")
-        }
-      } yield Props(classOf[PodDeletionCostAnnotator], k8sSettings, apiToken, 
podNamespace, costSettings)
+    } else if (startStep.compareAndSet(false, true)) {
+      val props = KubernetesApiImpl(log, k8sSettings).map { kubernetesApi =>
+        val crName =
+          if (k8sSettings.customResourceSettings.enabled) {
+            val name =
+              
k8sSettings.customResourceSettings.crName.getOrElse(KubernetesApi.makeDNS1039Compatible(system.name))
+            log.info(
+              "Starting PodDeletionCost for podName [{}], [{}] oldest will be 
written to CR [{}].",
+              k8sSettings.podName,
+              costSettings.annotatedPodsNr,
+              name)
+            Some(name)
+          } else {
+            log.info(
+              "Starting PodDeletionCost for podName [{}], [{}] oldest will be 
annotated.",
+              k8sSettings.podName,
+              costSettings.annotatedPodsNr)
+            None
+          }
+        PodDeletionCostAnnotator.props(k8sSettings, costSettings, 
kubernetesApi, crName)
+      }
 
       props.foreach(system.systemActorOf(_, "podDeletionCostAnnotator"))
     } else log.warning("PodDeletionCost extension already initiated, yet 
start() method was called again. Ignoring.")
   }
 
-  /**
-   * This uses blocking IO, and so should only be used to read configuration 
at startup.
-   */
-  private def readConfigVarFromFilesystem(path: String, name: String): 
Option[String] = {
-    val file = Paths.get(path)
-    if (Files.exists(file)) {
-      try {
-        Some(new String(Files.readAllBytes(file), "utf-8"))
-      } catch {
-        case NonFatal(e) =>
-          log.error(e, "Error reading {} from {}", name, path)
-          None
-      }
-    } else {
-      log.warning("Unable to read {} from {} because it doesn't exist.", name, 
path)
-      None
-    }
-  }
-
   // autostart if the extension is loaded through the config extension list
   private val autostart =
     
system.settings.config.getStringList("pekko.extensions").contains(classOf[PodDeletionCost].getName)
@@ -112,13 +96,4 @@ object PodDeletionCost extends ExtensionId[PodDeletionCost] 
with ExtensionIdProv
 
   override def createExtension(system: ExtendedActorSystem): PodDeletionCost = 
new PodDeletionCost()(system)
 
-  /**
-   * INTERNAL API
-   */
-  @InternalApi private[kubernetes] object Internal {
-    sealed trait BootstrapStep
-    case object NotRunning extends BootstrapStep
-    case object Initializing extends BootstrapStep
-  }
-
 }
diff --git 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
index b0873e59..70b36825 100644
--- 
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
+++ 
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
@@ -8,90 +8,66 @@
  */
 
 /*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.rollingupdate.kubernetes
 
-import org.apache.pekko.actor.Actor
-import org.apache.pekko.actor.ActorLogging
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.Timers
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.cluster.Cluster
-import org.apache.pekko.cluster.ClusterEvent
-import org.apache.pekko.cluster.Member
-import org.apache.pekko.event.Logging.InfoLevel
-import org.apache.pekko.event.Logging.WarningLevel
-import org.apache.pekko.http.scaladsl.ConnectionContext
-import org.apache.pekko.http.scaladsl.Http
-import org.apache.pekko.http.scaladsl.HttpsConnectionContext
-import org.apache.pekko.http.scaladsl.model.StatusCodes.ClientError
-import org.apache.pekko.http.scaladsl.model._
-import org.apache.pekko.pattern.pipe
-import org.apache.pekko.pki.kubernetes.PemManagersProvider
-import org.apache.pekko.rollingupdate.OlderCostsMore
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.GiveUp
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.PodAnnotated
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryAnnotate
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryTimerId
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.ScheduleRetry
-import 
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.toResult
-import com.typesafe.config.Config
-
-import java.security.KeyStore
-import java.security.SecureRandom
+import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.TimeUnit
-import javax.net.ssl.KeyManager
-import javax.net.ssl.KeyManagerFactory
-import javax.net.ssl.SSLContext
-import javax.net.ssl.TrustManager
+
 import scala.collection.immutable
 import scala.collection.immutable.SortedSet
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.DurationLong
 import scala.concurrent.duration.FiniteDuration
 import scala.util.control.NonFatal
 
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.Actor
+import pekko.actor.ActorLogging
+import pekko.actor.ActorSystem
+import pekko.actor.Props
+import pekko.actor.Status
+import pekko.actor.Timers
+import pekko.annotation.InternalApi
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent
+import pekko.cluster.Member
+import pekko.cluster.UniqueAddress
+import pekko.event.Logging.InfoLevel
+import pekko.event.Logging.WarningLevel
+import pekko.pattern.pipe
+import pekko.rollingupdate.OlderCostsMore
+import com.typesafe.config.Config
+
 /**
  * INTERNAL API
  *
- * Actor responsible to annotate the hosting pod with the pod-deletion-cost.
+ * Actor responsible to annotate the hosting pod with the pod-deletion-cost or
+ * update the PodCost CR depending on configuration.
  * It will automatically retry upon a fixed-configurable delay if the 
annotation fails.
  */
 @InternalApi private[kubernetes] final class PodDeletionCostAnnotator(
     settings: KubernetesSettings,
-    apiToken: String,
-    podNamespace: String,
-    costSettings: PodDeletionCostSettings)
+    costSettings: PodDeletionCostSettings,
+    kubernetesApi: KubernetesApi,
+    crName: Option[String])
     extends Actor
     with ActorLogging
     with Timers {
+  import PodDeletionCostAnnotator._
+
+  private val podName = settings.podName
+  private val resourceLogDescription = if (crName.isDefined) "PodCost CR" else 
"pod-deletion-cost annotation"
+
   private val cluster = Cluster(context.system)
-  private val http = Http()(context.system)
 
   Cluster(context.system).subscribe(context.self, 
classOf[ClusterEvent.MemberUp], classOf[ClusterEvent.MemberRemoved])
 
-  private lazy val sslContext = {
-    val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
-    val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
-    val keyStore = KeyStore.getInstance("PKCS12")
-    keyStore.load(null)
-    factory.init(keyStore, Array.empty)
-    val km: Array[KeyManager] = factory.getKeyManagers
-    val tm: Array[TrustManager] =
-      PemManagersProvider.buildTrustManagers(certificates)
-    val random: SecureRandom = new SecureRandom
-    val sslContext = SSLContext.getInstance("TLSv1.2")
-    sslContext.init(km, tm, random)
-    sslContext
-  }
-  private val clientSslContext: Option[HttpsConnectionContext] =
-    if (settings.secure) Some(ConnectionContext.httpsClient(sslContext)) else 
None
-
-  implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
-  def receive = idle(0, SortedSet.empty(Member.ageOrdering), 0)
+  def receive: Receive = idle(0, SortedSet.empty(Member.ageOrdering), 0)
 
   private def idle(deletionCost: Int, membersByAgeDesc: SortedSet[Member], 
retryNr: Int): Receive = {
     case cs @ ClusterEvent.CurrentClusterState(members, _, _, _, _) =>
@@ -107,8 +83,8 @@ import scala.util.control.NonFatal
       updateIfNewCost(deletionCost, membersByAgeDesc - m, retryNr)
 
     case PodAnnotated =>
-      log.debug("Annotation updated successfully to {}", deletionCost)
-      // cancelling an eventual retry in case the annotation succeeded in the 
meantime
+      log.debug("{} updated successfully to [{}]", resourceLogDescription, 
deletionCost)
+      // cancelling an eventual retry in case the operation succeeded in the 
meantime
       timers.cancel(RetryTimerId)
       context.become(idle(deletionCost, membersByAgeDesc, 0))
 
@@ -116,17 +92,30 @@ import scala.util.control.NonFatal
       val ll = if (retryNr < 3) InfoLevel else WarningLevel
       log.log(
         ll,
-        s"Failed to update annotation: [$ex]. Scheduled retry with fixed delay 
of ${costSettings.retryDelay}, retry number $retryNr.")
+        s"Failed to update $resourceLogDescription: [$ex]. Scheduled retry 
with fixed delay of ${costSettings.retryDelay}, retry number $retryNr.")
 
-      timers.startSingleTimer(RetryTimerId, RetryAnnotate, 
costSettings.retryDelay)
+      val retryDelay =
+        if (crName.isDefined)
+          // add some random delay to minimize risk of conflicts
+          (costSettings.retryDelay.toMillis * (1 + 
ThreadLocalRandom.current().nextDouble(0.1))).toLong.milliseconds
+        else
+          costSettings.retryDelay
+      timers.startSingleTimer(RetryTimerId, RetryAnnotate, retryDelay)
       context.become(underRetryBackoff(membersByAgeDesc, retryNr))
 
     case GiveUp(er: String) =>
       log.error(
-        "There was a client error when trying to set pod-deletion-cost 
annotation. " +
+        "There was a client error when trying to set {}. " +
         "Not retrying, check configuration. Error: {}",
+        resourceLogDescription,
         er)
 
+    case Status.Failure(exc) =>
+      throw new IllegalStateException(
+        "Unexpected failure, Future failure should have been recovered " +
+        "to message before pipeTo self. This is a bug.",
+        exc)
+
     case msg => log.debug("Ignoring message {}", msg)
   }
 
@@ -142,6 +131,12 @@ import scala.util.control.NonFatal
     case RetryAnnotate =>
       updateIfNewCost(Int.MinValue, membersByAgeDesc, retryNr + 1)
 
+    case Status.Failure(exc) =>
+      throw new IllegalStateException(
+        "Unexpected failure, Future failure should have been recovered " +
+        "to message before pipeTo self. This is a bug.",
+        exc)
+
     case msg => log.debug("Under retry backoff, ignoring message {}", msg)
   }
 
@@ -158,18 +153,27 @@ import scala.util.control.NonFatal
 
     if (newCost != existingCost) {
       log.info(
-        "Updating pod-deletion-cost annotation for pod: [{}] with cost: [{}]. 
Namespace: [{}]",
-        settings.podName,
+        "Updating {} for pod: [{}] with cost: [{}]. Namespace: [{}]",
+        resourceLogDescription,
+        podName,
         newCost,
-        podNamespace
+        kubernetesApi.namespace
       )
-      val request = ApiRequests.podDeletionCost(settings, apiToken, 
podNamespace, newCost)
-      val response =
-        clientSslContext.map(http.singleRequest(request, 
_)).getOrElse(http.singleRequest(request))
 
-      toResult(response)(context.system).pipeTo(self)
+      implicit val dispatcher: ExecutionContext = context.system.dispatcher
+      updatePodCost(
+        kubernetesApi,
+        crName,
+        podName,
+        newCost,
+        cluster.selfUniqueAddress,
+        membersByAgeDesc,
+        
settings.customResourceSettings.cleanupAfter)(context.system).pipeTo(self)
+
       context.become(idle(newCost, membersByAgeDesc, retryNr))
-    } else context.become(idle(existingCost, membersByAgeDesc, retryNr))
+    } else {
+      context.become(idle(existingCost, membersByAgeDesc, retryNr))
+    }
   }
 
 }
@@ -206,23 +210,72 @@ import scala.util.control.NonFatal
   case class ScheduleRetry(cause: String) extends RequestResult
   case class GiveUp(cause: String) extends RequestResult
 
-  private[kubernetes] def toResult(futResponse: Future[HttpResponse])(
+  def props(
+      settings: KubernetesSettings,
+      costSettings: PodDeletionCostSettings,
+      kubernetesApi: KubernetesApi,
+      crName: Option[String]
+  ): Props =
+    Props(new PodDeletionCostAnnotator(settings, costSettings, kubernetesApi, 
crName))
+
+  private def updatePodCost(
+      kubernetesApi: KubernetesApi,
+      crNameOpt: Option[String],
+      podName: String,
+      newCost: Int,
+      selfUniqueAddress: UniqueAddress,
+      membersByAgeDesc: immutable.SortedSet[Member],
+      cleanupAfter: FiniteDuration)(implicit system: ActorSystem): 
Future[RequestResult] = {
+    import system.dispatcher
+    crNameOpt match {
+      case Some(crName) =>
+        val response =
+          kubernetesApi.readOrCreatePodCostResource(crName).flatMap { cr =>
+            val now = System.currentTimeMillis()
+            val newPodCost =
+              PodCost(podName, newCost, selfUniqueAddress.address.toString, 
selfUniqueAddress.longUid, now)
+            val newPods = cr.pods.filterNot { podCost =>
+              // remove entry that is to be added for this podName
+              podCost.podName == podName ||
+              // remove entries that don't exist in the cluster membership any 
more
+              (podCost.uniqueAddress.address.system == 
selfUniqueAddress.address.system && // only same cluster
+              now - podCost.time > cleanupAfter.toMillis && // in case new 
member hasn't been seen yet
+              !membersByAgeDesc.exists(_.uniqueAddress == 
podCost.uniqueAddress) // removed, not in cluster membership
+              )
+            } :+ newPodCost
+            kubernetesApi.updatePodCostResource(crName, cr.version, newPods)
+          }
+        updatePodCostResourceResult(response)
+      case None =>
+        val response = kubernetesApi.updatePodDeletionCostAnnotation(podName, 
newCost)
+        updatePodDeletionCostAnnotationResult(response)
+    }
+  }
+
+  private def updatePodDeletionCostAnnotationResult(futResponse: Future[Done])(
+      implicit system: ActorSystem): Future[RequestResult] = {
+    import system.dispatcher
+    futResponse
+      .map {
+        case Done => PodAnnotated
+      }
+      .recover {
+        case e: PodCostClientException => GiveUp(e.getMessage)
+        case NonFatal(e)               => ScheduleRetry(e.getMessage)
+      }
+  }
+
+  private def updatePodCostResourceResult(futResponse: 
Future[Either[PodCostResource, PodCostResource]])(
       implicit system: ActorSystem): Future[RequestResult] = {
     import system.dispatcher
     futResponse
       .map {
-        case HttpResponse(status, _, e, _) if status.isSuccess() =>
-          e.discardBytes()
-          PodAnnotated
-        case HttpResponse(s @ ClientError(_), _, e, _) =>
-          e.discardBytes()
-          GiveUp(s.toString())
-        case HttpResponse(status, _, e, _) =>
-          e.discardBytes()
-          ScheduleRetry(s"Request failed with status=$status")
+        case Right(_) => PodAnnotated
+        case Left(_)  => ScheduleRetry("Request failed with conflict")
       }
       .recover {
-        case NonFatal(e) => ScheduleRetry(e.getMessage)
+        case e: PodCostClientException => GiveUp(e.getMessage)
+        case NonFatal(e)               => ScheduleRetry(e.getMessage)
       }
   }
 }
diff --git 
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
 
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
new file mode 100644
index 00000000..10ae084b
--- /dev/null
+++ 
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent.MemberUp
+import pekko.cluster.Member
+import pekko.cluster.MemberStatus
+import pekko.cluster.UniqueAddress
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Millis
+import org.scalatest.time.Seconds
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object PodDeletionCostAnnotatorCrSpec {
+  val config = ConfigFactory.parseString("""
+      pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
+      pekko.actor.provider = cluster
+      pekko.rollingupdate.kubernetes.pod-deletion-cost.retry-delay = 1s
+
+      pekko.remote.artery.canonical.port = 0
+      pekko.remote.artery.canonical.hostname = 127.0.0.1
+
+      pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+      pekko.coordinated-shutdown.terminate-actor-system = off
+      pekko.coordinated-shutdown.run-by-actor-system-terminate = off
+      pekko.test.filter-leeway = 10s
+    """)
+
+  private[pekko] trait TestCallCount {
+    val callCount = new AtomicInteger()
+
+    def getCallCount(): Int = callCount.get()
+  }
+
+  private[pekko] class TestKubernetesApi extends KubernetesApi {
+    private var version = 1
+    private var podCosts = Vector.empty[PodCost]
+
+    override def namespace: String = "namespace-test"
+
+    override def updatePodDeletionCostAnnotation(podName: String, cost: Int): 
Future[Done] =
+      Future.successful(Done)
+
+    override def readOrCreatePodCostResource(crName: String): 
Future[PodCostResource] = this.synchronized {
+      Future.successful(PodCostResource(version.toString, podCosts))
+    }
+
+    override def updatePodCostResource(
+        crName: String,
+        v: String,
+        pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, 
PodCostResource]] = this.synchronized {
+
+      podCosts = pods.toVector
+      version = v.toInt + 1
+
+      Future.successful(Right(PodCostResource(version.toString, podCosts)))
+    }
+
+    def getPodCosts(): Vector[PodCost] = this.synchronized {
+      podCosts
+    }
+  }
+}
+
+class PodDeletionCostAnnotatorCrSpec
+    extends TestKit(
+      ActorSystem(
+        "PodDeletionCostAnnotatorCrSpec",
+        PodDeletionCostAnnotatorCrSpec.config
+      ))
+    with ImplicitSender
+    with AnyWordSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Eventually {
+
+  import PodDeletionCostAnnotatorCrSpec._
+
+  private val namespace = "namespace-test"
+  private val podName1 = "pod-test-1"
+  private val podName2 = "pod-test-2"
+  private lazy val system2 = ActorSystem("PodDeletionCostAnnotatorCrSpec", 
PodDeletionCostAnnotatorCrSpec.config)
+
+  private def settings(podName: String) = {
+    new KubernetesSettings(
+      apiCaPath = "",
+      apiTokenPath = "",
+      apiServiceHost = "localhost",
+      apiServicePort = 0,
+      namespace = Some(namespace),
+      namespacePath = "",
+      podName = podName,
+      secure = false,
+      apiServiceRequestTimeout = 2.seconds,
+      customResourceSettings = new CustomResourceSettings(enabled = true, 
crName = Some("test-cr"), 60.seconds)
+    )
+  }
+
+  private def annotatorProps(pod: String, api: KubernetesApi) = 
PodDeletionCostAnnotator.props(
+    settings(pod),
+    
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes")),
+    api,
+    crName = Some("test-cr")
+  )
+
+  override implicit val patienceConfig: PatienceConfig =
+    PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
+
+  override protected def afterAll(): Unit = {
+    super.shutdown()
+    TestKit.shutdownActorSystem(system2)
+  }
+
+  override protected def beforeEach(): Unit = {}
+
+  "The pod-deletion-cost annotator with CRD" should {
+
+    "have a single node cluster running first" in {
+      val probe = TestProbe()
+      Cluster(system).join(Cluster(system).selfMember.address)
+      probe.awaitAssert({
+          Cluster(system).selfMember.status == MemberStatus.Up
+        }, 3.seconds)
+    }
+
+    "write pod cost to custom resource" in {
+      val api = new TestKubernetesApi
+      EventFilter
+        .info(pattern = ".*Updating PodCost CR.*", occurrences = 1)
+        .intercept {
+          system.actorOf(annotatorProps(podName1, api))
+        }
+      eventually {
+        api.getPodCosts() should have size 1
+        api.getPodCosts().head.podName shouldEqual podName1
+      }
+    }
+
+    "update pod cost for second node in the cluster" in {
+      val api = new TestKubernetesApi
+
+      val probe = TestProbe()
+      Cluster(system2).join(Cluster(system).selfMember.address)
+      probe.awaitAssert({
+          Cluster(system2).selfMember.status == MemberStatus.Up
+        }, 3.seconds)
+
+      system2.actorOf(annotatorProps(podName2, api))
+
+      eventually {
+        val costs = api.getPodCosts()
+        costs.map(_.podName) should contain(podName2)
+      }
+    }
+
+  }
+
+}
diff --git 
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
 
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
index c0aeeecb..6109eb4e 100644
--- 
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
+++ 
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
@@ -13,20 +13,20 @@
 
 package org.apache.pekko.rollingupdate.kubernetes
 
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.Address
-import org.apache.pekko.actor.Props
-import org.apache.pekko.cluster.Cluster
-import org.apache.pekko.cluster.ClusterEvent.MemberUp
-import org.apache.pekko.cluster.Member
-import org.apache.pekko.cluster.MemberStatus
-import org.apache.pekko.cluster.MemberStatus.Up
-import org.apache.pekko.cluster.UniqueAddress
-import org.apache.pekko.testkit.EventFilter
-import org.apache.pekko.testkit.ImplicitSender
-import org.apache.pekko.testkit.TestKit
-import org.apache.pekko.testkit.TestProbe
-import org.apache.pekko.util.Version
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent.MemberUp
+import pekko.cluster.Member
+import pekko.cluster.MemberStatus
+import pekko.cluster.MemberStatus.Up
+import pekko.cluster.UniqueAddress
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+import pekko.util.Version
 import com.github.tomakehurst.wiremock.WireMockServer
 import com.github.tomakehurst.wiremock.client.MappingBuilder
 import com.github.tomakehurst.wiremock.client.WireMock
@@ -99,16 +99,27 @@ class PodDeletionCostAnnotatorSpec
       namespace = Some(namespace),
       namespacePath = "",
       podName = podName,
-      secure = false)
+      secure = false,
+      apiServiceRequestTimeout = 2.seconds,
+      customResourceSettings = new CustomResourceSettings(enabled = false, 
crName = None, cleanupAfter = 60.seconds)
+    )
   }
 
-  private def annotatorProps(pod: String) = Props(
-    classOf[PodDeletionCostAnnotator],
-    settings(pod),
-    "apiToken",
-    namespace,
-    
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes"))
-  )
+  private def kubernetesApi(pod: String) =
+    new KubernetesApiImpl(
+      system,
+      settings(pod),
+      namespace,
+      apiToken = "apiToken",
+      clientHttpsConnectionContext = None)
+
+  private def annotatorProps(pod: String) =
+    PodDeletionCostAnnotator.props(
+      settings(pod),
+      
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes")),
+      kubernetesApi(pod),
+      crName = None
+    )
 
   override implicit val patienceConfig: PatienceConfig =
     PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
@@ -241,7 +252,7 @@ class PodDeletionCostAnnotatorSpec
 
       assertState(scenarioName, "FAILING")
 
-      val underTest = expectLogWarning(".*Failed to update annotation:.*") {
+      val underTest = expectLogWarning(".*Failed to update pod-deletion-cost 
annotation:.*") {
         system.actorOf(annotatorProps(podName1))
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to