This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new de751596 Port Akka management PRs #1134, #1141, #1142 to Pekko (#774)
de751596 is described below
commit de751596c79bd4c7525800336cd81fdcff1bf302
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 14 15:16:41 2026 +0100
Port Akka management PRs #1134, #1141, #1142 to Pekko (#774)
* Port Akka management PRs #1134, #1141, #1142 to Pekko
PR #1134 - Write pod cost to CRD (rolling-update-kubernetes):
- Add CustomResourceSettings and extend KubernetesSettings with
custom-resource config (enabled, cr-name, cleanup-after) and
api-service-request-timeout
- Create KubernetesApi trait with PodCost/PodCostResource types
- Create KubernetesJsonSupport for spray-json marshalling of CRD types
- Create KubernetesApiImpl with Pekko HTTP client, blocking IO on
DefaultBlockingDispatcherId at startup; uses pekko.apache.org API group
- Refactor PodDeletionCost extension to use KubernetesApiImpl
- Refactor PodDeletionCostAnnotator to support both direct annotation
and PodCost CR updates via KubernetesApi abstraction
- Delete ApiRequests.scala (functionality absorbed into KubernetesApiImpl)
- Update reference.conf with new config keys
- Update PodDeletionCostAnnotatorSpec to use new props() factory
- Add PodDeletionCostAnnotatorCrSpec for CRD-based cost updates
PR #1141 - Integration test scripts:
- Add integration-test/scripts/rollingupdate-kubernetes-test.sh
- Add integration-test/rollingupdate-kubernetes/kubernetes/ directory
PR #1142 - Blocking IO at startup (discovery-kubernetes-api):
- Move apiToken, podNamespace and SSL context creation to Future on
blocking dispatcher (kubernetesSetup) in KubernetesApiServiceDiscovery
- Add KubernetesSetup private case class to companion object
- Use Logging(system, classOf[...]) instead of getClass
- Change Seq(None) to List(None) in targets; return immutable.Seq
Co-authored-by: Copilot <[email protected]>
Co-authored-by: pjfanning <[email protected]>
* Fix log message grammar: 'oldest will written' -> 'oldest will be written'
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/e6cfbf1b-2d6a-4ebf-8f70-21d47b773b13
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* imports
* compile issue
* Update KubernetesSettings.scala
* Port remaining files from Akka PR #1134: pod-cost YAMLs, int-test module,
rolling-update integration test module, scripts, and GitHub workflow
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/ca58ec9f-6a95-4dec-980c-2007db4841ae
Co-authored-by: pjfanning <[email protected]>
* Fix bugs in rollingupdate-kubernetes-cr-test.sh: remove broken inner
loop, add try_count increment
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-management/sessions/ca58ec9f-6a95-4dec-980c-2007db4841ae
Co-authored-by: pjfanning <[email protected]>
* Update integration-tests-rolling-update-cr.yml
* scalafmt
* add-opens
* Update build.sbt
* Update logback.xml
* Update build.sbt
* Update build.sbt
* Update build.sbt
* Update integration test workflow triggers
Removed pull_request trigger from integration test workflow.
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../integration-tests-rolling-update-cr.yml | 68 ++++
build.sbt | 27 ++
.../kubernetes/KubernetesApiServiceDiscovery.scala | 153 ++++----
.../rolling-update-kubernetes/build.sbt | 22 ++
.../kubernetes/pekko-cluster-cr.yml | 127 +++++++
.../kubernetes/pekko-cluster.yml | 122 +++++++
.../src/main/resources/application.conf | 22 ++
.../src/main/resources/logback.xml | 19 +
.../cluster/bootstrap/PodDeletionCostDemoApp.scala | 41 +++
.../rolling-update-kubernetes/test-cr.sh | 11 +
integration-test/rolling-update-kubernetes/test.sh | 10 +
.../scripts/rollingupdate-kubernetes-cr-test.sh | 74 ++++
.../scripts/rollingupdate-kubernetes-test.sh | 134 +++++++
project/Dependencies.scala | 4 +
.../kubernetes/KubernetesApiIntegrationTest.scala | 182 ++++++++++
rolling-update-kubernetes/pod-cost-example.yml | 16 +
rolling-update-kubernetes/pod-cost.yml | 49 +++
.../src/main/resources/reference.conf | 18 +
.../apache/pekko/rollingupdate/CostStrategy.scala | 5 +-
.../rollingupdate/kubernetes/ApiRequests.scala | 51 ---
.../rollingupdate/kubernetes/KubernetesApi.scala | 116 ++++++
.../kubernetes/KubernetesApiImpl.scala | 389 +++++++++++++++++++++
.../kubernetes/KubernetesJsonSupport.scala | 56 +++
.../kubernetes/KubernetesSettings.scala | 39 ++-
.../rollingupdate/kubernetes/PodDeletionCost.scala | 95 ++---
.../kubernetes/PodDeletionCostAnnotator.scala | 217 +++++++-----
.../PodDeletionCostAnnotatorCrSpec.scala | 190 ++++++++++
.../kubernetes/PodDeletionCostAnnotatorSpec.scala | 57 +--
28 files changed, 2027 insertions(+), 287 deletions(-)
diff --git a/.github/workflows/integration-tests-rolling-update-cr.yml
b/.github/workflows/integration-tests-rolling-update-cr.yml
new file mode 100644
index 00000000..5b94a543
--- /dev/null
+++ b/.github/workflows/integration-tests-rolling-update-cr.yml
@@ -0,0 +1,68 @@
+name: Integration test for Rolling Update CR Kubernetes
+
+on:
+ push:
+ branches:
+ - main
+ - release-*
+ workflow_dispatch:
+
+permissions:
+ contents: read
+
+jobs:
+ integration-test:
+ name: Integration Tests for Rolling Update CR Kubernetes
+ runs-on: ubuntu-22.04
+ if: github.repository == 'apache/pekko-management'
+ strategy:
+ fail-fast: false
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v6
+ with:
+ fetch-depth: 0
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+ - name: Setup Java 17
+ uses: actions/setup-java@v5
+ with:
+ distribution: temurin
+ java-version: 17
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
+
+ - name: Setup Minikube
+ uses:
manusa/actions-setup-minikube@96202dee4ae1c2f46a62fe197273aaf22b83f42d # v2.16.1
+ with:
+ minikube version: 'v1.36.0'
+ kubernetes version: 'v1.29.15'
+ driver: docker
+ github token: ${{ secrets.GITHUB_TOKEN }}
+ start args: '--addons=ingress'
+
+ - name: Run Integration Tests
+ timeout-minutes: 15
+ run: |-
+ echo 'Creating namespace'
+ kubectl create namespace rolling
+ echo 'Creating resources'
+ kubectl apply -f ./rolling-update-kubernetes/pod-cost.yml
+ echo 'Adding proxy port'
+ kubectl proxy --port=8080 &
+ echo 'Running tests'
+ sbt "rolling-update-kubernetes-int-test/test"
+ ./integration-test/rolling-update-kubernetes/test-cr.sh
+
+ - name: Print logs on failure
+ if: ${{ failure() }}
+ run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/build.sbt b/build.sbt
index 23bbf590..a427b541 100644
--- a/build.sbt
+++ b/build.sbt
@@ -188,6 +188,19 @@ lazy val rollingUpdateKubernetes =
pekkoModule("rolling-update-kubernetes")
mimaPreviousArtifacts := Set.empty)
.dependsOn(managementPki)
+lazy val rollingUpdateKubernetesIntTest =
pekkoModule("rolling-update-kubernetes-int-test")
+ .enablePlugins(AutomateHeaderPlugin)
+ .disablePlugins(MimaPlugin)
+ .settings(
+ name := "pekko-rolling-update-kubernetes-int-test",
+ libraryDependencies := Dependencies.rollingUpdateKubernetesIntTest,
+ // following is needed by Agrona lib
+ // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+ Test / fork := true,
+ Test / javaOptions +=
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
+ .dependsOn(rollingUpdateKubernetes)
+ .enablePlugins(NoPublish)
+
lazy val billOfMaterials = Project("bill-of-materials",
file("bill-of-materials"))
.enablePlugins(BillOfMaterialsPlugin)
.disablePlugins(MimaPlugin)
@@ -219,6 +232,20 @@ lazy val leaseKubernetesIntTest =
pekkoModule("lease-kubernetes-int-test")
Cmd("RUN", "chmod +x /opt/docker/bin/pekko-lease-kubernetes-int-test")))
.enablePlugins(NoPublish)
+lazy val integrationTestRollingUpdateKubernetes =
pekkoIntTestModule("rolling-update-kubernetes")
+ .enablePlugins(JavaAppPackaging, DockerPlugin)
+ .enablePlugins(AutomateHeaderPlugin)
+ .disablePlugins(MimaPlugin)
+ .settings(
+ name := "integration-test-rolling-update-kubernetes",
+ libraryDependencies := Dependencies.bootstrapDemos,
+ version ~= (_.replace('+', '-')),
+ dockerUpdateLatest := true,
+ dockerExposedPorts := Seq(8080, 8558, 2552))
+ .dependsOn(rollingUpdateKubernetes, management, managementClusterHttp,
managementClusterBootstrap,
+ discoveryKubernetesApi)
+ .enablePlugins(NoPublish)
+
lazy val integrationTestKubernetesApi = pekkoIntTestModule("kubernetes-api")
.disablePlugins(MimaPlugin)
.enablePlugins(AutomateHeaderPlugin)
diff --git
a/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
b/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
index 0bace9b5..da497358 100644
---
a/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
+++
b/discovery-kubernetes-api/src/main/scala/org/apache/pekko/discovery/kubernetes/KubernetesApiServiceDiscovery.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.discovery.kubernetes
@@ -17,31 +17,33 @@ import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.security.{ KeyStore, SecureRandom }
+import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager
}
+
+import scala.collection.immutable
import scala.collection.immutable.Seq
+import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }
import org.apache.pekko
-import org.apache.pekko.http.javadsl.model.headers.AcceptEncoding
-import org.apache.pekko.http.scaladsl.coding.Coders
import pekko.actor.ActorSystem
import pekko.annotation.InternalApi
import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import pekko.discovery.kubernetes.JsonFormat._
import pekko.discovery.kubernetes.KubernetesApiServiceDiscovery.{ targets,
KubernetesApiException }
import pekko.discovery.{ Lookup, ServiceDiscovery }
-import pekko.event.{ LogSource, Logging }
-import pekko.http.scaladsl.HttpsConnectionContext
-import pekko.http.scaladsl._
+import pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
+import pekko.event.Logging
+import pekko.http.javadsl.model.headers.AcceptEncoding
+import pekko.http.scaladsl.{ HttpsConnectionContext, _ }
+import pekko.http.scaladsl.coding.Coders
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.{ Authorization, HttpEncodings,
OAuth2BearerToken }
import pekko.http.scaladsl.unmarshalling.Unmarshal
import pekko.pki.kubernetes.PemManagersProvider
-import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager
}
-
object KubernetesApiServiceDiscovery {
/**
@@ -57,7 +59,7 @@ object KubernetesApiServiceDiscovery {
podNamespace: String,
podDomain: String,
rawIp: Boolean,
- containerName: Option[String]): Seq[ResolvedTarget] =
+ containerName: Option[String]): immutable.Seq[ResolvedTarget] =
for {
item <- podList.items
if item.metadata.flatMap(_.deletionTimestamp).isEmpty
@@ -73,7 +75,7 @@ object KubernetesApiServiceDiscovery {
// Maybe port is an Option of a port, and will be None if no portName
was requested
maybePort <- portName match {
case None =>
- Seq(None)
+ List(None)
case Some(name) =>
for {
container <- itemSpec.containers
@@ -92,6 +94,10 @@ object KubernetesApiServiceDiscovery {
class KubernetesApiException(msg: String) extends RuntimeException(msg) with
NoStackTrace
+ private[kubernetes] final case class KubernetesSetup(
+ podNamespace: String,
+ apiToken: String,
+ clientHttpsConnectionContext: HttpsConnectionContext)
}
/**
@@ -101,54 +107,60 @@ object KubernetesApiServiceDiscovery {
class KubernetesApiServiceDiscovery(settings: Settings)(
implicit system: ActorSystem) extends ServiceDiscovery {
- import system.dispatcher
+ import KubernetesApiServiceDiscovery.KubernetesSetup
+ import pekko.discovery.kubernetes.KubernetesApiServiceDiscovery._
private val http = Http()
def this()(implicit system: ActorSystem) = this(Settings(system))
- private val log = Logging(system, getClass)(LogSource.fromClass)
+ private val log = Logging(system, classOf[KubernetesApiServiceDiscovery])
- private val sslContext = {
- val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+ log.debug("Settings {}", settings)
- val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
- val keyStore = KeyStore.getInstance("PKCS12")
- keyStore.load(null)
- factory.init(keyStore, Array.empty)
- val km: Array[KeyManager] = factory.getKeyManagers
- val tm: Array[TrustManager] =
- PemManagersProvider.buildTrustManagers(certificates)
- val random: SecureRandom = new SecureRandom
- val sslContext = SSLContext.getInstance(settings.tlsVersion)
- sslContext.init(km, tm, random)
- sslContext
+ private val kubernetesSetup: Future[KubernetesSetup] = {
+ implicit val blockingDispatcher: ExecutionContext =
system.dispatchers.lookup(DefaultBlockingDispatcherId)
+ for {
+ apiToken: String <- Future {
+ readConfigVarFromFilesystem(settings.apiTokenPath,
"api-token").getOrElse("")
+ }
+ namespace: String <- Future {
+ settings.podNamespace
+ .orElse(readConfigVarFromFilesystem(settings.podNamespacePath,
"pod-namespace"))
+ .getOrElse("default")
+ }
+ httpsContext <- Future(clientHttpsConnectionContext())
+ } yield {
+ KubernetesSetup(namespace, apiToken, httpsContext)
+ }
}
- private val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(sslContext)
-
- log.debug("Settings {}", settings)
+ import system.dispatcher
override def lookup(query: Lookup, resolveTimeout: FiniteDuration):
Future[Resolved] = {
val labelSelector = settings.podLabelSelector(query.serviceName)
- log.info(
- "Querying for pods with label selector: [{}]. Namespace: [{}]. Port:
[{}]",
- labelSelector,
- podNamespace,
- query.portName)
-
for {
- request <- optionToFuture(
- podRequest(apiToken, podNamespace, labelSelector),
- s"Unable to form request; check Kubernetes environment (expecting env
vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})")
+ setup <- kubernetesSetup
+
+ request <- {
+ log.info(
+ "Querying for pods with label selector: [{}]. Namespace: [{}]. Port:
[{}]",
+ labelSelector,
+ setup.podNamespace,
+ query.portName)
+
+ optionToFuture(
+ podRequest(setup.apiToken, setup.podNamespace, labelSelector),
+ s"Unable to form request; check Kubernetes environment (expecting
env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})"
+ )
+ }
- response <- http.singleRequest(request,
clientSslContext).map(decodeResponse)
+ response <- http.singleRequest(request,
setup.clientHttpsConnectionContext).map(decodeResponse)
entity <- response.entity.toStrict(resolveTimeout)
podList <- {
-
response.status match {
case StatusCodes.OK =>
log.debug("Kubernetes API entity: [{}]", entity.data.utf8String)
@@ -176,15 +188,13 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
other,
body)
}
-
Future.failed(new KubernetesApiException(s"Non-200 from Kubernetes
API server: $other"))
}
-
}
} yield {
val addresses =
- targets(podList, query.portName, podNamespace, settings.podDomain,
settings.rawIp, settings.containerName)
+ targets(podList, query.portName, setup.podNamespace,
settings.podDomain, settings.rawIp, settings.containerName)
if (addresses.isEmpty && podList.items.nonEmpty) {
if (log.isInfoEnabled) {
val containerPortNames =
podList.items.flatMap(_.spec).flatMap(_.containers).flatMap(_.ports).flatten.toSet
@@ -200,11 +210,43 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
}
}
- private val apiToken = readConfigVarFromFilesystem(settings.apiTokenPath,
"api-token").getOrElse("")
+ private def optionToFuture[T](option: Option[T], failMsg: String): Future[T]
=
+ option.fold(Future.failed[T](new
NoSuchElementException(failMsg)))(Future.successful)
- private val podNamespace = settings.podNamespace
- .orElse(readConfigVarFromFilesystem(settings.podNamespacePath,
"pod-namespace"))
- .getOrElse("default")
+ private def podRequest(token: String, namespace: String, labelSelector:
String) =
+ for {
+ host <- sys.env.get(settings.apiServiceHostEnvName)
+ portStr <- sys.env.get(settings.apiServicePortEnvName)
+ port <- Try(portStr.toInt).toOption
+ } yield {
+ val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace /
"pods"
+ val query = Uri.Query("labelSelector" -> labelSelector)
+ val uri = Uri.from(scheme = "https", host = host, port =
port).withPath(path).withQuery(query)
+
+ val authHeaders = immutable.Seq(Authorization(OAuth2BearerToken(token)))
+ val acceptEncodingHeader =
HttpEncodings.getForKey(settings.httpRequestAcceptEncoding)
+ .map(httpEncoding => AcceptEncoding.create(httpEncoding))
+ HttpRequest(uri = uri, headers = authHeaders ++ acceptEncodingHeader)
+ }
+
+ /**
+ * This uses blocking IO, and so should only be used at startup from
blocking dispatcher.
+ */
+ private def clientHttpsConnectionContext(): HttpsConnectionContext = {
+ val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+
+ val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+ val keyStore = KeyStore.getInstance("PKCS12")
+ keyStore.load(null)
+ factory.init(keyStore, Array.empty)
+ val km: Array[KeyManager] = factory.getKeyManagers
+ val tm: Array[TrustManager] =
+ PemManagersProvider.buildTrustManagers(certificates)
+ val random: SecureRandom = new SecureRandom
+ val sslContext = SSLContext.getInstance(settings.tlsVersion)
+ sslContext.init(km, tm, random)
+ ConnectionContext.httpsClient(sslContext)
+ }
/**
* This uses blocking IO, and so should only be used to read configuration
at startup.
@@ -225,25 +267,6 @@ class KubernetesApiServiceDiscovery(settings: Settings)(
}
}
- private def optionToFuture[T](option: Option[T], failMsg: String): Future[T]
=
- option.fold(Future.failed[T](new
NoSuchElementException(failMsg)))(Future.successful)
-
- private def podRequest(token: String, namespace: String, labelSelector:
String) =
- for {
- host <- sys.env.get(settings.apiServiceHostEnvName)
- portStr <- sys.env.get(settings.apiServicePortEnvName)
- port <- Try(portStr.toInt).toOption
- } yield {
- val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace /
"pods"
- val query = Uri.Query("labelSelector" -> labelSelector)
- val uri = Uri.from(scheme = "https", host = host, port =
port).withPath(path).withQuery(query)
-
- val authHeaders = Seq(Authorization(OAuth2BearerToken(token)))
- val acceptEncodingHeader =
HttpEncodings.getForKey(settings.httpRequestAcceptEncoding)
- .map(httpEncoding => AcceptEncoding.create(httpEncoding))
- HttpRequest(uri = uri, headers = authHeaders ++ acceptEncodingHeader)
- }
-
private def decodeResponse(response: HttpResponse): HttpResponse = {
val decoder = response.encoding match {
case HttpEncodings.gzip =>
diff --git a/integration-test/rolling-update-kubernetes/build.sbt
b/integration-test/rolling-update-kubernetes/build.sbt
new file mode 100644
index 00000000..7684b719
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/build.sbt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+import com.typesafe.sbt.packager.docker._
+
+enablePlugins(JavaAppPackaging, DockerPlugin)
+
+version := "1.3.3.7" // we hard-code the version here, it could be anything
really
+
+dockerExposedPorts := Seq(8080, 8558, 2552)
+dockerBaseImage := "eclipse-temurin:17-jre-alpine"
+
+dockerCommands ++= Seq(
+ Cmd("USER", "root"),
+ Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools",
"busybox-extras", "curl", "strace"),
+ Cmd("RUN", "chgrp -R 0 . && chmod -R g=u ."))
diff --git
a/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
new file mode 100644
index 00000000..90cefd2c
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
@@ -0,0 +1,127 @@
+#deployment
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app: pekko-rolling-update-demo
+ name: pekko-rolling-update-demo
+spec:
+ replicas: 3
+ selector:
+ matchLabels:
+ app: pekko-rolling-update-demo
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 0
+ type: RollingUpdate
+
+ template:
+ metadata:
+ labels:
+ app: pekko-rolling-update-demo
+ actorSystemName: pekko-rolling-update-demo
+ spec:
+ containers:
+ - name: pekko-rolling-update-demo
+ image: integration-test-rolling-update-kubernetes:1.3.3.7
+ # Remove for a real project, the image is picked up locally for the
integration test
+ imagePullPolicy: Never
+ resources:
+ limits:
+ memory: "256Mi"
+ requests:
+ memory: "256Mi"
+ cpu: "300m"
+ #health
+ livenessProbe:
+ httpGet:
+ path: /alive
+ port: management
+ readinessProbe:
+ httpGet:
+ path: /ready
+ port: management
+ #health
+ ports:
+ # pekko-management bootstrap
+ - name: management
+ containerPort: 8558
+ protocol: TCP
+ - name: http
+ containerPort: 8080
+ protocol: TCP
+ env:
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ # The pod deletion cost will use this var to identify the pod to be
annotated
+ - name: KUBERNETES_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ - name: REQUIRED_CONTACT_POINT_NR
+ value: "3"
+ # Agrona requires the --add-opens JVM option to work on recent JDKs
+ - name: JAVA_TOOL_OPTIONS
+ value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75 -Xlog:gc
-Dpekko.rollingupdate.kubernetes.custom-resource.enabled=on
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED"
+#deployment
+---
+#rbac-reader
+#
+# Create a role, `pod-reader`, that can list pods and
+# bind the default service account in the namespace
+# that the binding is deployed to to that role.
+#
+
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+ resources: ["pods"]
+ verbs: ["get", "watch", "list"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+subjects:
+ # Uses the default service account.
+ # Consider creating a dedicated service account to run your
+ # Apache Pekko Cluster services and binding the role to that one.
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: pod-reader
+ apiGroup: rbac.authorization.k8s.io
+#rbac-reader
+---
+#rbac-podcost-cr
+#
+# Create a role, `podcost-access`, that can update the PodCost CR
+#
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: podcost-access
+rules:
+ - apiGroups: ["pekko.apache.org"]
+ resources: ["podcosts"]
+ verbs: ["get", "create", "update", "delete", "list"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: podcost-access
+subjects:
+ - kind: User
+ name: system:serviceaccount:pekko-rolling-update-demo-cr-ns:default
+roleRef:
+ kind: Role
+ name: podcost-access
+ apiGroup: rbac.authorization.k8s.io
+#rbac-podcost-cr
diff --git
a/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
new file mode 100644
index 00000000..e09af9e4
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
@@ -0,0 +1,122 @@
+#deployment
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app: pekko-rolling-update-demo
+ name: pekko-rolling-update-demo
+spec:
+ replicas: 3
+ selector:
+ matchLabels:
+ app: pekko-rolling-update-demo
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 0
+ type: RollingUpdate
+
+ template:
+ metadata:
+ labels:
+ app: pekko-rolling-update-demo
+ actorSystemName: pekko-rolling-update-demo
+ spec:
+ containers:
+ - name: pekko-rolling-update-demo
+ image: integration-test-rolling-update-kubernetes:1.3.3.7
+ # Remove for a real project, the image is picked up locally for the
integration test
+ imagePullPolicy: Never
+ #health
+ livenessProbe:
+ httpGet:
+ path: /alive
+ port: management
+ readinessProbe:
+ httpGet:
+ path: /ready
+ port: management
+ #health
+ ports:
+ # pekko-management bootstrap
+ - name: management
+ containerPort: 8558
+ protocol: TCP
+ - name: http
+ containerPort: 8080
+ protocol: TCP
+ env:
+ - name: KUBERNETES_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ # The pod deletion cost will use this var to identify the pod to be
annotated
+ - name: KUBERNETES_POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ - name: REQUIRED_CONTACT_POINT_NR
+ value: "3"
+ # Agrona requires the --add-opens JVM option to work on recent JDKs
+ - name: JAVA_TOOL_OPTIONS
+ value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED"
+#deployment
+---
+#rbac-reader
+#
+# Create a role, `pod-reader`, that can list pods and
+# bind the default service account in the namespace
+# that the binding is deployed to to that role.
+#
+
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+rules:
+- apiGroups: [""] # "" indicates the core API group
+ resources: ["pods"]
+ verbs: ["get", "watch", "list"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-reader
+subjects:
+ # Uses the default service account.
+ # Consider creating a dedicated service account to run your
+ # Apache Pekko Cluster services and binding the role to that one.
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: pod-reader
+ apiGroup: rbac.authorization.k8s.io
+#rbac-reader
+---
+#rbac-annotator
+#
+# Create a role, `pod-annotator`, that can annotate pods
+# with the pod-deletion-cost value
+#
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: pod-annotator
+rules:
+ - apiGroups: [""] # "" indicates the core API group
+ resources: ["pods"]
+ verbs: ["patch"] # requires "patch" to annotate the pod
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: annotate-pods
+subjects:
+ - kind: User
+ name: system:serviceaccount:pekko-rolling-update-demo-ns:default
+roleRef:
+ kind: Role
+ name: pod-annotator
+ apiGroup: rbac.authorization.k8s.io
+#rbac-annotator
diff --git
a/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
b/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
new file mode 100644
index 00000000..d71beef2
--- /dev/null
+++
b/integration-test/rolling-update-kubernetes/src/main/resources/application.conf
@@ -0,0 +1,22 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ loglevel = "DEBUG"
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
+ actor.provider = cluster
+}
+
+pekko.discovery {
+ kubernetes-api {
+ pod-label-selector = "app=%s"
+ }
+}
+
+pekko.management {
+ cluster.bootstrap {
+ contact-point-discovery {
+ discovery-method = kubernetes-api
+ }
+ }
+}
diff --git
a/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml
b/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml
new file mode 100644
index 00000000..4ed45763
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/src/main/resources/logback.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <target>System.out</target>
+ <encoder>
+ <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} -
%m%n%xException</pattern>
+ </encoder>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+
+ <logger name="org.apache.pekko.rollingupdate" level="DEBUG"/>
+ <logger name="org.apache.pekko.http" level="INFO"/>
+ <logger name="org.apache.pekko.io" level="INFO"/>
+
+</configuration>
diff --git
a/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
new file mode 100644
index 00000000..db8466d7
--- /dev/null
+++
b/integration-test/rolling-update-kubernetes/src/main/scala/org/apache/pekko/cluster/bootstrap/PodDeletionCostDemoApp.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.bootstrap
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.cluster.Cluster
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.server.Directives._
+import pekko.management.cluster.bootstrap.ClusterBootstrap
+import pekko.management.scaladsl.PekkoManagement
+import pekko.rollingupdate.kubernetes.PodDeletionCost
+
+object PodDeletionCostDemoApp extends App {
+
+ implicit val system: ActorSystem = ActorSystem("pekko-rolling-update-demo")
+
+ import system.log
+ val cluster = Cluster(system)
+
+ log.info(s"Started [$system], cluster.selfAddress = ${cluster.selfAddress}")
+
+ PekkoManagement(system).start()
+
+ ClusterBootstrap(system).start()
+
+ PodDeletionCost(system).start()
+
+ Http().newServerAt("0.0.0.0", 8080).bind(complete("Hello world"))
+}
diff --git a/integration-test/rolling-update-kubernetes/test-cr.sh
b/integration-test/rolling-update-kubernetes/test-cr.sh
new file mode 100755
index 00000000..6a20f49b
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/test-cr.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+set -exu
+
+export NAMESPACE=pekko-rolling-update-demo-cr-ns
+export APP_NAME=pekko-rolling-update-demo
+export PROJECT_NAME=integration-test-rolling-update-kubernetes
+export CRD=rolling-update-kubernetes/pod-cost.yml
+export
DEPLOYMENT=integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster-cr.yml
+
+integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
diff --git a/integration-test/rolling-update-kubernetes/test.sh
b/integration-test/rolling-update-kubernetes/test.sh
new file mode 100755
index 00000000..0a980a1d
--- /dev/null
+++ b/integration-test/rolling-update-kubernetes/test.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+set -exu
+
+export NAMESPACE=pekko-rolling-update-demo-ns
+export APP_NAME=pekko-rolling-update-demo
+export PROJECT_NAME=integration-test-rolling-update-kubernetes
+export
DEPLOYMENT=integration-test/rolling-update-kubernetes/kubernetes/pekko-cluster.yml
+
+integration-test/scripts/rollingupdate-kubernetes-test.sh
diff --git a/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
new file mode 100755
index 00000000..c6f4ea4a
--- /dev/null
+++ b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh
@@ -0,0 +1,74 @@
+#!/bin/bash -e
+
+echo "Running rollingupdate-kubernetes-cr-test.sh with deployment: $DEPLOYMENT"
+
+eval $(minikube -p minikube docker-env)
+sbt $PROJECT_NAME/Docker/publishLocal
+
+docker images | head
+
+kubectl create namespace $NAMESPACE || true
+kubectl apply -f $CRD
+kubectl -n $NAMESPACE delete deployment $APP_NAME || true
+kubectl -n $NAMESPACE apply -f $DEPLOYMENT
+
+for i in {1..20}
+do
+ echo "Waiting for pods to get ready..."
+ kubectl get pods -n $NAMESPACE
+ phase=$(kubectl get pods -o jsonpath="{.items[*].status.phase}" -n
$NAMESPACE)
+ status=$(kubectl get pods -o
jsonpath="{.items[*].status.containerStatuses[*].ready}" -n $NAMESPACE)
+ if [ "$phase" == "Running Running Running" ] && [ "$status" == "true true
true" ]
+ then
+ break
+ fi
+ sleep 4
+done
+
+if [ $i -eq 20 ]
+then
+ echo "Pods did not get ready"
+ kubectl events $APP_NAME -n $NAMESPACE
+ kubectl describe deployment $APP_NAME -n $NAMESPACE
+
+ echo ""
+ echo "Logs from all $APP_NAME containers"
+ kubectl logs -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+ echo ""
+ echo "Logs from all previous $APP_NAME containers"
+ kubectl logs -p -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+ exit -1
+fi
+
+max_tries=10
+try_count=0
+
+# Loop until all pods are included or the maximum number of tries is reached
+while true
+do
+ # Get the list of pods matching the namespace and app name, and are in the
Running state
+ pod_list=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running |
awk '{ print $1 }' | sort)
+
+ # Get the pods in the CR
+ cr_pod_list=$(kubectl describe podcosts.pekko.apache.org $APP_NAME -n
$NAMESPACE | grep "Pod Name" | awk '{print $3}' | sort)
+
+ if [ "$pod_list" = "$cr_pod_list" ]
+ then
+ echo "Found expected pods in CR: $cr_pod_list"
+ break
+ else
+ echo "Expected $pod_list, but didn't find expected pods in CR:
$cr_pod_list"
+ fi
+
+ # Increment the try count and check if the maximum number of tries is reached
+ try_count=$((try_count+1))
+ if [[ $try_count -ge $max_tries ]]; then
+ echo "Exceeded max retries, aborting"
+ exit 1
+ fi
+
+ # Wait for 10 seconds before trying again
+ sleep 10
+done
diff --git a/integration-test/scripts/rollingupdate-kubernetes-test.sh
b/integration-test/scripts/rollingupdate-kubernetes-test.sh
new file mode 100755
index 00000000..3d61fa4d
--- /dev/null
+++ b/integration-test/scripts/rollingupdate-kubernetes-test.sh
@@ -0,0 +1,134 @@
+#!/bin/bash -e
+
+echo "Running rollingupdate-kubernetes-test.sh with deployment: $DEPLOYMENT"
+
+eval $(minikube -p minikube docker-env)
+sbt $PROJECT_NAME/Docker/publishLocal
+
+docker images | head
+
+kubectl create namespace $NAMESPACE || true
+kubectl -n $NAMESPACE delete deployment $APP_NAME || true
+kubectl -n $NAMESPACE apply -f $DEPLOYMENT
+
+for i in {1..20}
+do
+ echo "Waiting for pods to get ready..."
+ kubectl get pods -n $NAMESPACE
+ phase=$(kubectl get pods -o jsonpath="{.items[*].status.phase}" -n
$NAMESPACE)
+ status=$(kubectl get pods -o
jsonpath="{.items[*].status.containerStatuses[*].ready}" -n $NAMESPACE)
+ if [ "$phase" == "Running Running Running" ] && [ "$status" == "true true
true" ]
+ then
+ break
+ fi
+ sleep 4
+done
+
+if [ $i -eq 20 ]
+then
+ echo "Pods did not get ready"
+ kubectl events $APP_NAME -n $NAMESPACE
+ kubectl describe deployment $APP_NAME -n $NAMESPACE
+
+ echo ""
+ echo "Logs from all $APP_NAME containers"
+ kubectl logs -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+ echo ""
+ echo "Logs from all previous $APP_NAME containers"
+ kubectl logs -p -l app=$APP_NAME --all-containers=true -n $NAMESPACE || true
+
+ exit -1
+fi
+
+max_tries=10
+try_count=0
+# Declare a map to store the annotated values for each pod
+declare -a pod_annotation_array
+
+# Loop until all pods are annotated or the maximum number of tries is reached
+while true
+do
+ # Get the list of pods matching the namespace and app name, and are in the
Running state
+ pod_list=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running |
awk '{ print $1 }')
+ annotated_count=0
+ pod_annotation_array=()
+ echo -e "## Pod List:\n$pod_list\n##"
+
+ for pod_name in $pod_list
+ do
+ # Get the value of the annotation for the pod
+ annotation_value=$(kubectl describe pod $pod_name -n $NAMESPACE | grep
controller.kubernetes.io/pod-deletion-cost | awk '{print $3}')
+
+ # Check if the annotation value is set or empty
+ if [ -z "$annotation_value" ]
+ then
+ echo "The annotation value for pod $pod_name is empty"
+ else
+ echo "The annotation value for pod $pod_name is set to $annotation_value"
+ annotated_count=$((annotated_count+1))
+ # Store the annotated value for the pod in the map
+ pod_annotation_array+=("$annotation_value")
+ fi
+ done
+
+ if [ ! -z "$pod_list" ] && [ $annotated_count -eq $(echo $pod_list | wc -w) ]
+ then
+ echo "All pods were annotated successfully!"
+ break
+ fi
+
+ # Increment the try count and check if the maximum number of tries is reached
+ try_count=$((try_count+1))
+ if [ $try_count -eq $max_tries ]
+ then
+ echo "Maximum number of tries reached, not all pods are annotated"
+
+ echo "Logs"
+ echo "=============================="
+ for POD in $(kubectl get pods -n $NAMESPACE | grep $APP_NAME | awk '{
print $1 }')
+ do
+ echo "Logging for $POD"
+ kubectl logs $POD -n $NAMESPACE
+ done
+
+ exit 1
+ fi
+
+ # Wait for 10 seconds before trying again
+ sleep 10
+done
+
+# Get the name of the pod with the highest annotated value
+highest_annotated_index=$(echo "${pod_annotation_array[@]}" | tr ' ' '\n' | nl
-v 1 | sort -nr -k 2 | head -n 1 | awk '{ print $1 }')
+highest_annotated_pod=$(echo $pod_list | cut -d ' ' -f
$highest_annotated_index)
+
+# Scale down the cluster to one pod
+kubectl scale deployment $APP_NAME -n $NAMESPACE --replicas=1
+
+# Wait for the deployment to be scaled down
+for ((try_count=0; try_count<max_tries; try_count++)); do
+ pod_count=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running |
wc -l)
+ if [ $pod_count -eq 1 ]
+ then
+ echo "Cluster scaled down to one pod"
+ break
+ fi
+ sleep 5
+done
+if [[ $try_count -ge $max_tries ]]; then
+ echo "Exceeded max retries, aborting"
+ exit 1
+fi
+
+# Get the name of the remaining pod
+remaining_pod=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running
| awk '{ print $1 }')
+
+# Compare the remaining pod with the highest annotated pod
+if [ "$remaining_pod" = "$highest_annotated_pod" ]
+then
+ echo "Success: The remaining pod $remaining_pod is the highest annotated
pod!"
+else
+ echo "Error: The remaining pod ($remaining_pod) is not the same as the
highest annotated pod ($highest_annotated_pod)"
+ exit 1
+fi
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index d0e675db..58ef0694 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -166,6 +166,10 @@ object Dependencies {
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test) ++
wireMockDependencies
+ val rollingUpdateKubernetesIntTest = Seq(
+ "org.scalatest" %% "scalatest" % scalaTestVersion,
+ "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test)
+
val leaseKubernetesTest = Seq(
"org.scalatest" %% "scalatest" % scalaTestVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test)
diff --git
a/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
b/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
new file mode 100644
index 00000000..14378ea5
--- /dev/null
+++
b/rolling-update-kubernetes-int-test/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.cluster.Cluster
+import pekko.testkit.TestKit
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.CancelAfterFailure
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+/**
+ * This test requires an API server available on localhost:8080, the PodCost
CRD created and a namespace called `rolling`
+ *
+ * One way of doing this is to have a kubectl proxy open:
+ *
+ * `kubectl proxy --port=8080`
+ */
+class KubernetesApiIntegrationTest
+ extends TestKit(
+ ActorSystem(
+ "KubernetesApiIntegrationSpec",
+ ConfigFactory.parseString("""
+ pekko.loglevel = DEBUG
+ pekko.actor.provider = cluster
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.artery.canonical.hostname = 127.0.0.1
+ """)))
+ with AnyWordSpecLike
+ with Matchers
+ with ScalaFutures
+ with BeforeAndAfterAll
+ with CancelAfterFailure
+ with Eventually {
+
+ implicit val patience: PatienceConfig =
PatienceConfig(testKitSettings.DefaultTimeout.duration)
+
+ private val cluster = Cluster(system)
+
+ private val settings = new KubernetesSettings(
+ "",
+ "",
+ "localhost",
+ 8080,
+ namespace = Some("rolling"),
+ "",
+ podName = "pod1",
+ secure = false,
+ apiServiceRequestTimeout = 1.second,
+ new CustomResourceSettings(
+ enabled = true,
+ crName = None,
+ cleanupAfter = 60.seconds
+ )
+ )
+
+ private val underTest =
+ new KubernetesApiImpl(system, settings, settings.namespace.get, apiToken =
"", clientHttpsConnectionContext = None)
+ private val crName = KubernetesApi.makeDNS1039Compatible(system.name)
+ private val podName1 = "pod1"
+ private val podName2 = "pod2"
+ private var currentVersion = ""
+
+ override protected def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ override protected def beforeAll(): Unit = {
+ // do some operation to check the proxy is up
+ eventually {
+ Await.result(underTest.removePodCostResource(crName), 2.second)
shouldEqual Done
+ }
+ }
+
+ "Kubernetes PodCost resource" should {
+ "be able to be created" in {
+ val podCostResource =
underTest.readOrCreatePodCostResource(crName).futureValue
+ podCostResource.version shouldNot equal("")
+ podCostResource.version shouldNot equal(null)
+ podCostResource.pods shouldEqual Nil
+ currentVersion = podCostResource.version
+ }
+
+ "be able to read back with same version" in {
+ val podCostResource =
underTest.readOrCreatePodCostResource(crName).futureValue
+ podCostResource.version shouldEqual currentVersion
+ }
+
+ "be able to update empty resource" in {
+ val podCost = PodCost(
+ podName1,
+ 1,
+ cluster.selfUniqueAddress.address.toString,
+ cluster.selfUniqueAddress.longUid,
+ System.currentTimeMillis())
+ val podCostResource = underTest.updatePodCostResource(crName,
currentVersion, Vector(podCost)).futureValue
+ val success: PodCostResource = podCostResource match {
+ case Right(r) => r
+ case Left(_) => fail("There shouldn't be anyone else updating the
resource.")
+ }
+ success.version shouldNot equal(currentVersion)
+ currentVersion = success.version
+ success.pods shouldEqual Vector(podCost)
+ }
+
+ "be able to update a resource if resource version is correct" in {
+ val podCost = PodCost(
+ podName1,
+ 2,
+ cluster.selfUniqueAddress.address.toString,
+ cluster.selfUniqueAddress.longUid,
+ System.currentTimeMillis())
+ val podCostResource = underTest.updatePodCostResource(crName,
currentVersion, Vector(podCost)).futureValue
+ val success: PodCostResource = podCostResource match {
+ case Right(r) => r
+ case Left(_) => fail("There shouldn't be anyone else updating the
resource.")
+ }
+ success.version shouldNot equal(currentVersion)
+ currentVersion = success.version
+ success.pods shouldEqual Vector(podCost)
+ }
+
+ "not be able to update a resource if resource version is incorrect" in {
+ val podCost = PodCost(
+ podName1,
+ 3,
+ cluster.selfUniqueAddress.address.toString,
+ cluster.selfUniqueAddress.longUid,
+ System.currentTimeMillis())
+ val podCostResource = underTest.updatePodCostResource(crName, version =
"10", Vector(podCost)).futureValue
+ val failure: PodCostResource = podCostResource match {
+ case Right(_) => fail("Expected update failure (we've used an invalid
version!).")
+ case Left(r) => r
+ }
+ failure.version shouldEqual currentVersion
+ currentVersion = failure.version
+ failure.pods.head.cost shouldNot equal(podCost.cost)
+ failure.pods.head.time shouldNot equal(podCost.time)
+ }
+
+ "be able to add more to the resource" in {
+ val podCost2 = PodCost(
+ podName2,
+ 4,
+ cluster.selfUniqueAddress.address.toString,
+ cluster.selfUniqueAddress.longUid,
+ System.currentTimeMillis())
+ val podCostResource1 =
underTest.readOrCreatePodCostResource(crName).futureValue
+ val podCostResource2 =
+ underTest.updatePodCostResource(crName, currentVersion,
podCostResource1.pods :+ podCost2).futureValue
+ val success: PodCostResource = podCostResource2 match {
+ case Right(r) => r
+ case Left(_) => fail("There shouldn't be anyone else updating the
resource.")
+ }
+ success.version shouldNot equal(currentVersion)
+ currentVersion = success.version
+ success.pods.last shouldEqual podCost2
+ success.pods.size shouldEqual podCostResource1.pods.size + 1
+ }
+ }
+
+}
diff --git a/rolling-update-kubernetes/pod-cost-example.yml
b/rolling-update-kubernetes/pod-cost-example.yml
new file mode 100644
index 00000000..4c58cd59
--- /dev/null
+++ b/rolling-update-kubernetes/pod-cost-example.yml
@@ -0,0 +1,16 @@
+apiVersion: pekko.apache.org/v1
+kind: PodCost
+metadata:
+ name: sampleservice
+spec:
+ pods:
+ - address: pekko://[email protected]:2552
+ cost: 10000
+ podName: "pod1"
+ time: 1681909802669
+ uid: 3870636288020406585
+ - address: pekko://[email protected]:2552
+ cost: 9900
+ podName: "pod2"
+ time: 1681909802716
+ uid: 1681910047053
diff --git a/rolling-update-kubernetes/pod-cost.yml
b/rolling-update-kubernetes/pod-cost.yml
new file mode 100644
index 00000000..06c6f654
--- /dev/null
+++ b/rolling-update-kubernetes/pod-cost.yml
@@ -0,0 +1,49 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ # name must match the spec fields below, and be in the form: <plural>.<group>
+ name: podcosts.pekko.apache.org
+spec:
+ group: pekko.apache.org
+ versions:
+ - name: v1
+ storage: true
+ served: true
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ properties:
+ version:
+ type: string
+ pods:
+ type: array
+ items:
+ type: object
+ properties:
+ # the name of the pod that should be updated with the
pod-deletion-cost annotation
+ podName:
+ type: string
+ # the value of the
controller.kubernetes.io/pod-deletion-cost annotation
+ cost:
+ type: integer
+ # address, uid and time are used for cleanup of removed
members
+ address:
+ type: string
+ # address, uid and time are used for cleanup of removed
members
+ uid:
+ type: integer
+ # address, uid and time are used for cleanup of removed
members
+ time:
+ type: integer
+ scope: Namespaced
+ names:
+ # kind is normally the CamelCased singular type. Your resource manifests
use this.
+ kind: PodCost
+ listKind: PodCostList
+ # singular name to be used as an alias on the CLI and for display
+ singular: podcost
+ # plural name to be used in the URL: /apis/<group>/<version>/<plural>
+ plural: podcosts
diff --git a/rolling-update-kubernetes/src/main/resources/reference.conf
b/rolling-update-kubernetes/src/main/resources/reference.conf
index 4bc33b3a..e33010a1 100644
--- a/rolling-update-kubernetes/src/main/resources/reference.conf
+++ b/rolling-update-kubernetes/src/main/resources/reference.conf
@@ -19,6 +19,8 @@ pekko.rollingupdate.kubernetes {
api-service-port = 8080
api-service-port = ${?KUBERNETES_SERVICE_PORT}
+ api-service-request-timeout = 2s
+
# Namespace file path. The namespace is to create the lock in. Can be
overridden by "namespace"
#
# If this path doesn't exist, the namespace will default to "default".
@@ -46,4 +48,20 @@ pekko.rollingupdate.kubernetes {
# Fixed time delay between retries when pod annotation fails
retry-delay = 5s
}
+
+ # An alternative if allowing PATCH of the pod resource is a security
concern is to use a custom resource.
+ # Instead of updating the "controller.kubernetes.io/pod-deletion-cost"
annotation directly it will
+ # update a PodCost custom resource and then you would have an operator
that reconciles that and updates the
+ # pod-deletion-cost annotation of the pod resource.
+ custom-resource {
+ # When enabled the PodCost custom resource is updated instead of
directly updating
+ # the "controller.kubernetes.io/pod-deletion-cost" annotation.
+ enabled = off
+ # The name of the custom resource instance (CR). If undefined, it will
use the ActorSystem name.
+ # It's recommended to use a separate CR for each Pekko Cluster, but it's
possible to share the
+ # CR in case it is preferred to have only one CR per namespace.
+ cr-name = ""
+ # Remove old entries that don't exist in the cluster membership after
this duration.
+ cleanup-after = 60s
+ }
}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
index 57aa3cbd..80037ac0 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/CostStrategy.scala
@@ -13,8 +13,9 @@
package org.apache.pekko.rollingupdate
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.cluster.Member
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.cluster.Member
import scala.collection.SortedSet
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
deleted file mode 100644
index d1e8d294..00000000
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/ApiRequests.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.rollingupdate.kubernetes
-
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.http.scaladsl.model.HttpMethods.PATCH
-import org.apache.pekko.http.scaladsl.model.headers.Authorization
-import org.apache.pekko.http.scaladsl.model.headers.OAuth2BearerToken
-import org.apache.pekko.http.scaladsl.model.HttpEntity
-import org.apache.pekko.http.scaladsl.model.HttpRequest
-import org.apache.pekko.http.scaladsl.model.MediaTypes
-import org.apache.pekko.http.scaladsl.model.Uri
-import org.apache.pekko.util.ByteString
-
-import scala.collection.immutable
-
-/**
- * INTERNAL API
- */
-@InternalApi private[kubernetes] object ApiRequests {
-
- def podDeletionCost(settings: KubernetesSettings, apiToken: String,
namespace: String, cost: Int): HttpRequest = {
- val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace /
"pods" / settings.podName
- val scheme = if (settings.secure) "https" else "http"
- val uri = Uri.from(scheme, host = settings.apiServiceHost, port =
settings.apiServicePort).withPath(path)
- val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
-
- HttpRequest(
- method = PATCH,
- uri = uri,
- headers = headers,
- entity = HttpEntity(
- MediaTypes.`application/merge-patch+json`,
- ByteString(
- s"""{"metadata": {"annotations":
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""
- ))
- )
- }
-
-}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
new file mode 100644
index 00000000..9397180b
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApi.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import java.text.Normalizer
+
+import scala.collection.immutable
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.AddressFromURIString
+import pekko.annotation.InternalApi
+import pekko.cluster.UniqueAddress
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class PodCostResource(version: String, pods:
immutable.Seq[PodCost])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final case class PodCost(podName: String, cost: Int, address:
String, uid: Long, time: Long) {
+ @transient
+ lazy val uniqueAddress: UniqueAddress =
UniqueAddress(AddressFromURIString(address), uid)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed class PodCostException(message: String)
extends RuntimeException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class PodCostTimeoutException(message:
String) extends PodCostException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class PodCostClientException(message:
String) extends PodCostException(message)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object KubernetesApi {
+
+ /**
+ * Removes from the leading and trailing positions the specified characters.
+ */
+ private def trim(name: String, characters: List[Char]): String =
+
name.dropWhile(characters.contains(_)).reverse.dropWhile(characters.contains(_)).reverse
+
+ /**
+ * Make a name compatible with DNS 1035 standard: like a single domain name
segment.
+ * Regex to follow: [a-z]([-a-z0-9]*[a-z0-9])
+ * Validates the resulting name to be at most 63 characters, otherwise
throws `IllegalArgumentException`.
+ */
+ def makeDNS1039Compatible(name: String): String = {
+ val normalized =
+ Normalizer.normalize(name,
Normalizer.Form.NFKD).toLowerCase.replaceAll("[_.]",
"-").replaceAll("[^-a-z0-9]", "")
+ if (normalized.length > 63)
+ throw new IllegalArgumentException(s"Too long resource name
[$normalized]. At most 63 characters are accepted. " +
+ "A custom resource name can be defined in configuration
`pekko.rollingupdate.kubernetes.custom-resource.cr-name`.")
+ trim(normalized, List('-'))
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+private[pekko] trait KubernetesApi {
+
+ def namespace: String
+
+ def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done]
+
+ /**
+ * Reads a PodCost from the API server. If it doesn't exist it tries to
create it.
+ * The creation can fail due to another instance creating at the same time,
in this case
+ * the read is retried.
+ */
+ def readOrCreatePodCostResource(crName: String): Future[PodCostResource]
+
+ /**
+ * Update the named resource.
+ *
+ * Must call [[readOrCreatePodCostResource]] first to get a resource version.
+ *
+ * Can return one of three things:
+ * - Future failure e.g. timed out waiting for k8s api server to respond
+ * - Left - Update failed due to version not matching current in the k8s
api server. In this case resource is returned so the version can be used for
subsequent calls
+ * - Right - Success
+ *
+ * Any subsequent updates should also use the latest version or re-read
with [[readOrCreatePodCostResource]]
+ */
+ def updatePodCostResource(
+ crName: String,
+ version: String,
+ pods: immutable.Seq[PodCost]): Future[Either[PodCostResource,
PodCostResource]]
+
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
new file mode 100644
index 00000000..61437a2b
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesApiImpl.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.collection.immutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
+import pekko.event.Logging
+import pekko.event.LoggingAdapter
+import pekko.http.scaladsl.ConnectionContext
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.HttpsConnectionContext
+import pekko.http.scaladsl.marshalling.Marshal
+import pekko.http.scaladsl.model.HttpEntity
+import pekko.http.scaladsl.model.HttpMethods.PATCH
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.MediaTypes
+import pekko.http.scaladsl.model.StatusCodes.ClientError
+import pekko.http.scaladsl.model.Uri
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.Authorization
+import pekko.http.scaladsl.model.headers.OAuth2BearerToken
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import pekko.pattern.after
+import pekko.pki.kubernetes.PemManagersProvider
+import pekko.util.ByteString
+
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.security.KeyStore
+import java.security.SecureRandom
+import javax.net.ssl.KeyManager
+import javax.net.ssl.KeyManagerFactory
+import javax.net.ssl.SSLContext
+import javax.net.ssl.TrustManager
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] class KubernetesApiImpl(
+ system: ActorSystem,
+ settings: KubernetesSettings,
+ override val namespace: String,
+ apiToken: String,
+ clientHttpsConnectionContext: Option[HttpsConnectionContext])
+ extends KubernetesApi
+ with KubernetesJsonSupport {
+
+ import system.dispatcher
+
+ private implicit val sys: ActorSystem = system
+ private val log = Logging(system, classOf[KubernetesApiImpl])
+ private val http = Http()(system)
+
+ private val scheme = if (settings.secure) "https" else "http"
+ private lazy val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+
+ log.debug("kubernetes access namespace: {}. Secure: {}", namespace,
settings.secure)
+
+ override def updatePodDeletionCostAnnotation(podName: String, cost: Int):
Future[Done] = {
+ val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace /
"pods" / podName
+ val uri = Uri.from(scheme, host = settings.apiServiceHost, port =
settings.apiServicePort).withPath(path)
+
+ val httpRequest = HttpRequest(
+ method = PATCH,
+ uri = uri,
+ headers = headers,
+ entity = HttpEntity(
+ MediaTypes.`application/merge-patch+json`,
+ ByteString(
+ s"""{"metadata": {"annotations":
{"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}"""
+ ))
+ )
+ val httpResponse = makeRequest(
+ httpRequest,
+ s"Timed out updating pod-deletion-cost annotation for pod: [$podName]
with cost: [$cost]. Namespace: [$namespace]")
+ httpResponse.map {
+ case HttpResponse(status, _, e, _) if status.isSuccess() =>
+ e.discardBytes()
+ Done
+ case HttpResponse(s @ ClientError(_), _, e, _) =>
+ e.discardBytes()
+ throw new PodCostClientException(s.toString())
+ case HttpResponse(status, _, e, _) =>
+ e.discardBytes()
+ throw new PodCostException(s"Request failed with status=$status")
+ }
+ }
+
+ /*
+ PATH: to get all: /apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts
+ PATH: to get a specific one:
/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/<system-name>
+ curl -v -X POST
localhost:8080/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/ -H
"Content-Type: application/yaml" --data-binary "@pod-cost-example.yml"
+
+ responds with either:
+ 409 Conflict Already Exists
+
+ OR
+
+ 201 Created if it works
+ */
+ override def readOrCreatePodCostResource(crName: String):
Future[PodCostResource] = {
+ val maxTries = 5
+
+ def loop(tries: Int = 0): Future[PodCostResource] = {
+ log.debug("Trying to create PodCost {}", tries)
+ for {
+ oldResource <- getPodCostResource(crName)
+ lr <- oldResource match {
+ case Some(found) =>
+ log.debug("{} already exists. Returning {}", crName, found)
+ Future.successful(found)
+ case None =>
+ log.info("PodCost {} does not exist, creating", crName)
+ createPodCostResource(crName).flatMap {
+ case Some(created) => Future.successful(created)
+ case None =>
+ if (tries < maxTries) loop(tries + 1)
+ else Future.failed(new PodCostException(s"Unable to create or
read PodCost after $maxTries tries"))
+ }
+ }
+ } yield lr
+ }
+
+ loop()
+ }
+
+ /*
+curl -v -X PUT
localhost:8080/apis/pekko.apache.org/v1/namespaces/<namespace>/podcosts/<system-name>
--data-binary "@pod-cost-example.yml" -H "Content-Type: application/yaml"
+PUTs must contain resourceVersions. Response:
+409: Resource version is out of date
+200 if it is updated
+ */
+ /**
+ * Update the named resource.
+ */
+ override def updatePodCostResource(
+ crName: String,
+ version: String,
+ pods: immutable.Seq[PodCost]): Future[Either[PodCostResource,
PodCostResource]] = {
+ val cr = PodCostCustomResource(Metadata(crName, Some(version)), Spec(pods))
+ for {
+ entity <- Marshal(cr).to[RequestEntity]
+ response <- {
+ log.debug("updating {} to {}", crName, cr)
+ makeRequest(
+ requestForPath(pathForPodCostResource(crName), method =
HttpMethods.PUT, entity),
+ s"Timed out updating PodCost [$crName]. It is not known if the
update happened"
+ )
+ }
+ result <- response.status match {
+ case StatusCodes.OK =>
+ Unmarshal(response.entity)
+ .to[PodCostCustomResource]
+ .map(updatedCr => {
+ log.debug("CR after update: {}", updatedCr)
+ Right(toPodCostResource(updatedCr))
+ })
+ case StatusCodes.Conflict =>
+ getPodCostResource(crName).map {
+ case None =>
+ throw new PodCostException(s"GET after PUT conflict did not
return a PodCost [$crName]")
+ case Some(cr) =>
+ log.debug("PodCostResource read after conflict: {}", cr)
+ Left(cr)
+ }
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .map(body =>
+ throw new PodCostException(
+ s"PUT for PodCost [$crName] returned unexpected status code
$unexpected. Body: $body"))
+ }
+ } yield result
+ }
+
+ private[pekko] def removePodCostResource(crName: String): Future[Done] = {
+ for {
+ response <- makeRequest(
+ requestForPath(pathForPodCostResource(crName), HttpMethods.DELETE),
+ s"Timed out removing PodCost [$crName]. It is not known if the remove
happened")
+
+ result <- response.status match {
+ case StatusCodes.OK =>
+ log.debug("PodCost deleted [{}]", crName)
+ response.discardEntityBytes()
+ Future.successful(Done)
+ case StatusCodes.NotFound =>
+ log.debug("PodCost already deleted [{}]", crName)
+ response.discardEntityBytes()
+ Future.successful(Done) // already deleted
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .map(body =>
+ throw new PodCostException(
+ s"Unexpected status code when deleting PodCost. Status:
$unexpected. Body: $body"))
+ }
+ } yield result
+ }
+
+ private def getPodCostResource(crName: String):
Future[Option[PodCostResource]] = {
+ val fResponse =
makeRequest(requestForPath(pathForPodCostResource(crName)), s"Timed out reading
PodCost [$crName]")
+ for {
+ response <- fResponse
+ entity <- response.entity.toStrict(settings.bodyReadTimeout)
+ lr <- response.status match {
+ case StatusCodes.OK =>
+ log.debug("Resource [{}] exists: {}", crName, entity)
+ Unmarshal(entity).to[PodCostCustomResource].map(cr =>
Some(toPodCostResource(cr)))
+ case StatusCodes.NotFound =>
+ response.discardEntityBytes()
+ log.debug("Resource [{}] does not exist", crName)
+ Future.successful(None)
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .map(body =>
+ throw new PodCostException(
+ s"Unexpected response from API server when retrieving PodCost
StatusCode: $unexpected. Body: $body"))
+ }
+ } yield lr
+ }
+
+ private def handleUnauthorized(response: HttpResponse) = {
+ Unmarshal(response.entity)
+ .to[String]
+ .map(body =>
+ throw new PodCostException(
+ "Unauthorized to communicate with Kubernetes API server. See " +
+
"https://pekko.apache.org/docs/pekko-management/current/rolling-updates.html#role-based-access-control
" +
+ s"for setting up access control. Body: $body"))
+ }
+
+ private def pathForPodCostResource(crName: String): Uri.Path =
+ Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" /
namespace / "podcosts" /
+ crName
+ .replaceAll("[^\\d\\w\\-\\.]", "")
+ .toLowerCase
+
+ private def requestForPath(
+ path: Uri.Path,
+ method: HttpMethod = HttpMethods.GET,
+ entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+ val uri = Uri.from(scheme = scheme, host = settings.apiServiceHost, port =
settings.apiServicePort).withPath(path)
+ HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+ }
+
+ private def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
+ val response = {
+ clientHttpsConnectionContext match {
+ case None => http.singleRequest(request)
+ case Some(httpsConnectionContext) => http.singleRequest(request,
httpsConnectionContext)
+ }
+ }
+
+ // make sure we always consume response body (in case of timeout)
+ val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
+
+ val timeout = after(settings.apiServiceRequestTimeout, using =
system.scheduler)(
+ Future.failed(new PodCostTimeoutException(s"$timeoutMsg. Is the API
server up?")))
+
+ Future.firstCompletedOf(Seq(strictResponse, timeout))
+ }
+
+ private def toPodCostResource(cr: PodCostCustomResource) = {
+ log.debug("Converting {}", cr)
+ require(
+ cr.metadata.resourceVersion.isDefined,
+ s"PodCostCustomResource returned from Kubernetes without a
resourceVersion: $cr")
+ PodCostResource(cr.metadata.resourceVersion.get, cr.spec.pods)
+ }
+
+ private def createPodCostResource(crName: String):
Future[Option[PodCostResource]] = {
+ val cr = PodCostCustomResource(Metadata(crName, None), Spec(pods =
Vector.empty))
+ for {
+ entity <- Marshal(cr).to[RequestEntity]
+ response <- makeRequest(
+ requestForPath(pathForPodCostResource(crName), HttpMethods.POST,
entity = entity),
+ s"Timed out creating PodCost $crName")
+ responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+ resource <- response.status match {
+ case StatusCodes.Created =>
+ log.debug("PodCost resource created")
+ Unmarshal(responseEntity).to[PodCostCustomResource].map(cr =>
Some(toPodCostResource(cr)))
+ case StatusCodes.Conflict =>
+ log.debug("creation of PodCost resource failed as already exists.
Will attempt to read again")
+ entity.discardBytes()
+ // someone else has created it
+ Future.successful(None)
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(responseEntity)
+ .to[String]
+ .map(body =>
+ throw new PodCostException(
+ s"Unexpected response from API server when creating PodCost
StatusCode: $unexpected. Body: $body"))
+ }
+ } yield resource
+ }
+
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object KubernetesApiImpl {
+ def apply(log: LoggingAdapter, k8sSettings: KubernetesSettings)(implicit
system: ActorSystem) = {
+ implicit val blockingDispatcher: ExecutionContext =
system.dispatchers.lookup(DefaultBlockingDispatcherId)
+ for {
+ apiToken: String <- Future {
+ readConfigVarFromFilesystem(k8sSettings.apiTokenPath, "api-token",
log).getOrElse("")
+ }
+ podNamespace: String <- Future {
+ k8sSettings.namespace
+ .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath,
"namespace", log))
+ .getOrElse("default")
+ }
+ httpsContext <- Future(clientHttpsConnectionContext(k8sSettings))
+ } yield {
+ new KubernetesApiImpl(system, k8sSettings, podNamespace, apiToken,
httpsContext)
+ }
+ }
+
+ /**
+ * This uses blocking IO, and so should only be used to read configuration
at startup from blocking dispatcher.
+ */
+ private def readConfigVarFromFilesystem(path: String, name: String, log:
LoggingAdapter): Option[String] = {
+ val file = Paths.get(path)
+ if (Files.exists(file)) {
+ try {
+ Some(new String(Files.readAllBytes(file), "utf-8"))
+ } catch {
+ case NonFatal(e) =>
+ log.error(e, "Error reading {} from {}", name, path)
+ None
+ }
+ } else {
+ log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
+ None
+ }
+ }
+
+ /**
+ * This uses blocking IO, and so should only be used at startup from
blocking dispatcher.
+ */
+ private def clientHttpsConnectionContext(k8sSettings: KubernetesSettings):
Option[HttpsConnectionContext] = {
+ if (k8sSettings.secure) {
+ val certificates =
PemManagersProvider.loadCertificates(k8sSettings.apiCaPath)
+ val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+ val keyStore = KeyStore.getInstance("PKCS12")
+ keyStore.load(null)
+ factory.init(keyStore, Array.empty)
+ val km: Array[KeyManager] = factory.getKeyManagers
+ val tm: Array[TrustManager] =
+ PemManagersProvider.buildTrustManagers(certificates)
+ val random: SecureRandom = new SecureRandom
+ val sslContext = SSLContext.getInstance("TLSv1.2")
+ sslContext.init(km, tm, random)
+ Some(ConnectionContext.httpsClient(sslContext))
+ } else
+ None
+ }
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
new file mode 100644
index 00000000..e46eb427
--- /dev/null
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesJsonSupport.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import scala.collection.immutable
+
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
+import spray.json.DefaultJsonProtocol
+import spray.json.JsonFormat
+import spray.json.RootJsonFormat
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class PodCostCustomResource(
+ metadata: Metadata,
+ spec: Spec,
+ kind: String = "PodCost",
+ apiVersion: String = "pekko.apache.org/v1")
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class Metadata(name: String, resourceVersion: Option[String])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class Spec(pods: immutable.Seq[PodCost])
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+trait KubernetesJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
+ implicit val metadataFormat: JsonFormat[Metadata] =
jsonFormat2(Metadata.apply)
+ implicit val podCostFormat: JsonFormat[PodCost] = jsonFormat5(PodCost.apply)
+ implicit val specFormat: JsonFormat[Spec] = jsonFormat1(Spec.apply)
+ implicit val podCostCustomResourceFormat:
RootJsonFormat[PodCostCustomResource] = jsonFormat4(
+ PodCostCustomResource.apply)
+}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
index 0bee4cf4..aa5249ce 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/KubernetesSettings.scala
@@ -8,12 +8,16 @@
*/
/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.rollingupdate.kubernetes
-import org.apache.pekko.annotation.InternalApi
+import scala.concurrent.duration.{ DurationInt, FiniteDuration }
+import scala.jdk.DurationConverters._
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
import com.typesafe.config.Config
/**
@@ -33,6 +37,17 @@ private[kubernetes] object KubernetesSettings {
}
def apply(config: Config): KubernetesSettings = {
+ val crName = config.getString("custom-resource.cr-name") match {
+ case "" => None
+ case name => Some(name)
+ }
+
+ val customResourceSettings = new CustomResourceSettings(
+ enabled = config.getBoolean("custom-resource.enabled"),
+ crName = crName,
+ cleanupAfter =
config.getDuration("custom-resource.cleanup-after").toScala
+ )
+
new KubernetesSettings(
config.getString("api-ca-path"),
config.getString("api-token-path"),
@@ -41,7 +56,9 @@ private[kubernetes] object KubernetesSettings {
config.optDefinedValue("namespace"),
config.getString("namespace-path"),
config.getString("pod-name"),
- config.getBoolean("secure-api-server")
+ config.getBoolean("secure-api-server"),
+ config.getDuration("api-service-request-timeout").toScala,
+ customResourceSettings
)
}
}
@@ -58,4 +75,18 @@ private[kubernetes] class KubernetesSettings(
val namespace: Option[String],
val namespacePath: String,
val podName: String,
- val secure: Boolean)
+ val secure: Boolean,
+ val apiServiceRequestTimeout: FiniteDuration,
+ val customResourceSettings: CustomResourceSettings,
+ val bodyReadTimeout: FiniteDuration = 1.second
+)
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[kubernetes] class CustomResourceSettings(
+ val enabled: Boolean,
+ val crName: Option[String],
+ val cleanupAfter: FiniteDuration
+)
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
index 3c1f7817..4e244a48 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCost.scala
@@ -8,32 +8,24 @@
*/
/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.rollingupdate.kubernetes
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.ClassicActorSystemProvider
-import org.apache.pekko.actor.ExtendedActorSystem
-import org.apache.pekko.actor.Extension
-import org.apache.pekko.actor.ExtensionId
-import org.apache.pekko.actor.ExtensionIdProvider
-import org.apache.pekko.actor.Props
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId
-import org.apache.pekko.event.Logging
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.BootstrapStep
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.Initializing
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCost.Internal.NotRunning
-
-import java.nio.file.Files
-import java.nio.file.Paths
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
import scala.util.control.NonFatal
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.ClassicActorSystemProvider
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.Extension
+import pekko.actor.ExtensionId
+import pekko.actor.ExtensionIdProvider
+import pekko.event.Logging
+
final class PodDeletionCost(implicit system: ExtendedActorSystem) extends
Extension {
private val log = Logging(system, classOf[PodDeletionCost])
@@ -41,9 +33,11 @@ final class PodDeletionCost(implicit system:
ExtendedActorSystem) extends Extens
private val config = system.settings.config.getConfig(configPath)
private val k8sSettings = KubernetesSettings(config)
private val costSettings = PodDeletionCostSettings(config)
+ implicit private val ec: ExecutionContext = system.dispatcher
+
log.debug("Settings {}", k8sSettings)
- private final val startStep = new AtomicReference[BootstrapStep](NotRunning)
+ private final val startStep = new AtomicBoolean(false)
def start(): Unit = {
if (k8sSettings.podName.isEmpty) {
@@ -51,42 +45,32 @@ final class PodDeletionCost(implicit system:
ExtendedActorSystem) extends Extens
"No configuration found to extract the pod name from. " +
s"Be sure to provide the pod name with `$configPath.pod-name` " +
"or by setting ENV variable `KUBERNETES_POD_NAME`.")
- } else if (startStep.compareAndSet(NotRunning, Initializing)) {
- log.debug("Starting PodDeletionCost for podName={} with settings={}",
k8sSettings.podName, costSettings)
-
- implicit val blockingDispatcher: ExecutionContext =
system.dispatchers.lookup(DefaultBlockingDispatcherId)
- val props = for {
- apiToken: String <- Future {
readConfigVarFromFilesystem(k8sSettings.apiTokenPath,
"api-token").getOrElse("") }
- podNamespace: String <- Future {
- k8sSettings.namespace
- .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath,
"namespace"))
- .getOrElse("default")
- }
- } yield Props(classOf[PodDeletionCostAnnotator], k8sSettings, apiToken,
podNamespace, costSettings)
+ } else if (startStep.compareAndSet(false, true)) {
+ val props = KubernetesApiImpl(log, k8sSettings).map { kubernetesApi =>
+ val crName =
+ if (k8sSettings.customResourceSettings.enabled) {
+ val name =
+
k8sSettings.customResourceSettings.crName.getOrElse(KubernetesApi.makeDNS1039Compatible(system.name))
+ log.info(
+ "Starting PodDeletionCost for podName [{}], [{}] oldest will be
written to CR [{}].",
+ k8sSettings.podName,
+ costSettings.annotatedPodsNr,
+ name)
+ Some(name)
+ } else {
+ log.info(
+ "Starting PodDeletionCost for podName [{}], [{}] oldest will be
annotated.",
+ k8sSettings.podName,
+ costSettings.annotatedPodsNr)
+ None
+ }
+ PodDeletionCostAnnotator.props(k8sSettings, costSettings,
kubernetesApi, crName)
+ }
props.foreach(system.systemActorOf(_, "podDeletionCostAnnotator"))
} else log.warning("PodDeletionCost extension already initiated, yet
start() method was called again. Ignoring.")
}
- /**
- * This uses blocking IO, and so should only be used to read configuration
at startup.
- */
- private def readConfigVarFromFilesystem(path: String, name: String):
Option[String] = {
- val file = Paths.get(path)
- if (Files.exists(file)) {
- try {
- Some(new String(Files.readAllBytes(file), "utf-8"))
- } catch {
- case NonFatal(e) =>
- log.error(e, "Error reading {} from {}", name, path)
- None
- }
- } else {
- log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
- None
- }
- }
-
// autostart if the extension is loaded through the config extension list
private val autostart =
system.settings.config.getStringList("pekko.extensions").contains(classOf[PodDeletionCost].getName)
@@ -112,13 +96,4 @@ object PodDeletionCost extends ExtensionId[PodDeletionCost]
with ExtensionIdProv
override def createExtension(system: ExtendedActorSystem): PodDeletionCost =
new PodDeletionCost()(system)
- /**
- * INTERNAL API
- */
- @InternalApi private[kubernetes] object Internal {
- sealed trait BootstrapStep
- case object NotRunning extends BootstrapStep
- case object Initializing extends BootstrapStep
- }
-
}
diff --git
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
index b0873e59..70b36825 100644
---
a/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
+++
b/rolling-update-kubernetes/src/main/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala
@@ -8,90 +8,66 @@
*/
/*
- * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.rollingupdate.kubernetes
-import org.apache.pekko.actor.Actor
-import org.apache.pekko.actor.ActorLogging
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.Timers
-import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.cluster.Cluster
-import org.apache.pekko.cluster.ClusterEvent
-import org.apache.pekko.cluster.Member
-import org.apache.pekko.event.Logging.InfoLevel
-import org.apache.pekko.event.Logging.WarningLevel
-import org.apache.pekko.http.scaladsl.ConnectionContext
-import org.apache.pekko.http.scaladsl.Http
-import org.apache.pekko.http.scaladsl.HttpsConnectionContext
-import org.apache.pekko.http.scaladsl.model.StatusCodes.ClientError
-import org.apache.pekko.http.scaladsl.model._
-import org.apache.pekko.pattern.pipe
-import org.apache.pekko.pki.kubernetes.PemManagersProvider
-import org.apache.pekko.rollingupdate.OlderCostsMore
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.GiveUp
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.PodAnnotated
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryAnnotate
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryTimerId
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.ScheduleRetry
-import
org.apache.pekko.rollingupdate.kubernetes.PodDeletionCostAnnotator.toResult
-import com.typesafe.config.Config
-
-import java.security.KeyStore
-import java.security.SecureRandom
+import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
-import javax.net.ssl.KeyManager
-import javax.net.ssl.KeyManagerFactory
-import javax.net.ssl.SSLContext
-import javax.net.ssl.TrustManager
+
import scala.collection.immutable
import scala.collection.immutable.SortedSet
-import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.Actor
+import pekko.actor.ActorLogging
+import pekko.actor.ActorSystem
+import pekko.actor.Props
+import pekko.actor.Status
+import pekko.actor.Timers
+import pekko.annotation.InternalApi
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent
+import pekko.cluster.Member
+import pekko.cluster.UniqueAddress
+import pekko.event.Logging.InfoLevel
+import pekko.event.Logging.WarningLevel
+import pekko.pattern.pipe
+import pekko.rollingupdate.OlderCostsMore
+import com.typesafe.config.Config
+
/**
* INTERNAL API
*
- * Actor responsible to annotate the hosting pod with the pod-deletion-cost.
+ * Actor responsible to annotate the hosting pod with the pod-deletion-cost or
+ * update the PodCost CR depending on configuration.
* It will automatically retry upon a fixed-configurable delay if the
annotation fails.
*/
@InternalApi private[kubernetes] final class PodDeletionCostAnnotator(
settings: KubernetesSettings,
- apiToken: String,
- podNamespace: String,
- costSettings: PodDeletionCostSettings)
+ costSettings: PodDeletionCostSettings,
+ kubernetesApi: KubernetesApi,
+ crName: Option[String])
extends Actor
with ActorLogging
with Timers {
+ import PodDeletionCostAnnotator._
+
+ private val podName = settings.podName
+ private val resourceLogDescription = if (crName.isDefined) "PodCost CR" else
"pod-deletion-cost annotation"
+
private val cluster = Cluster(context.system)
- private val http = Http()(context.system)
Cluster(context.system).subscribe(context.self,
classOf[ClusterEvent.MemberUp], classOf[ClusterEvent.MemberRemoved])
- private lazy val sslContext = {
- val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
- val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
- val keyStore = KeyStore.getInstance("PKCS12")
- keyStore.load(null)
- factory.init(keyStore, Array.empty)
- val km: Array[KeyManager] = factory.getKeyManagers
- val tm: Array[TrustManager] =
- PemManagersProvider.buildTrustManagers(certificates)
- val random: SecureRandom = new SecureRandom
- val sslContext = SSLContext.getInstance("TLSv1.2")
- sslContext.init(km, tm, random)
- sslContext
- }
- private val clientSslContext: Option[HttpsConnectionContext] =
- if (settings.secure) Some(ConnectionContext.httpsClient(sslContext)) else
None
-
- implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher
- def receive = idle(0, SortedSet.empty(Member.ageOrdering), 0)
+ def receive: Receive = idle(0, SortedSet.empty(Member.ageOrdering), 0)
private def idle(deletionCost: Int, membersByAgeDesc: SortedSet[Member],
retryNr: Int): Receive = {
case cs @ ClusterEvent.CurrentClusterState(members, _, _, _, _) =>
@@ -107,8 +83,8 @@ import scala.util.control.NonFatal
updateIfNewCost(deletionCost, membersByAgeDesc - m, retryNr)
case PodAnnotated =>
- log.debug("Annotation updated successfully to {}", deletionCost)
- // cancelling an eventual retry in case the annotation succeeded in the
meantime
+ log.debug("{} updated successfully to [{}]", resourceLogDescription,
deletionCost)
+ // cancelling an eventual retry in case the operation succeeded in the
meantime
timers.cancel(RetryTimerId)
context.become(idle(deletionCost, membersByAgeDesc, 0))
@@ -116,17 +92,30 @@ import scala.util.control.NonFatal
val ll = if (retryNr < 3) InfoLevel else WarningLevel
log.log(
ll,
- s"Failed to update annotation: [$ex]. Scheduled retry with fixed delay
of ${costSettings.retryDelay}, retry number $retryNr.")
+ s"Failed to update $resourceLogDescription: [$ex]. Scheduled retry
with fixed delay of ${costSettings.retryDelay}, retry number $retryNr.")
- timers.startSingleTimer(RetryTimerId, RetryAnnotate,
costSettings.retryDelay)
+ val retryDelay =
+ if (crName.isDefined)
+ // add some random delay to minimize risk of conflicts
+ (costSettings.retryDelay.toMillis * (1 +
ThreadLocalRandom.current().nextDouble(0.1))).toLong.milliseconds
+ else
+ costSettings.retryDelay
+ timers.startSingleTimer(RetryTimerId, RetryAnnotate, retryDelay)
context.become(underRetryBackoff(membersByAgeDesc, retryNr))
case GiveUp(er: String) =>
log.error(
- "There was a client error when trying to set pod-deletion-cost
annotation. " +
+ "There was a client error when trying to set {}. " +
"Not retrying, check configuration. Error: {}",
+ resourceLogDescription,
er)
+ case Status.Failure(exc) =>
+ throw new IllegalStateException(
+ "Unexpected failure, Future failure should have been recovered " +
+ "to message before pipeTo self. This is a bug.",
+ exc)
+
case msg => log.debug("Ignoring message {}", msg)
}
@@ -142,6 +131,12 @@ import scala.util.control.NonFatal
case RetryAnnotate =>
updateIfNewCost(Int.MinValue, membersByAgeDesc, retryNr + 1)
+ case Status.Failure(exc) =>
+ throw new IllegalStateException(
+ "Unexpected failure, Future failure should have been recovered " +
+ "to message before pipeTo self. This is a bug.",
+ exc)
+
case msg => log.debug("Under retry backoff, ignoring message {}", msg)
}
@@ -158,18 +153,27 @@ import scala.util.control.NonFatal
if (newCost != existingCost) {
log.info(
- "Updating pod-deletion-cost annotation for pod: [{}] with cost: [{}].
Namespace: [{}]",
- settings.podName,
+ "Updating {} for pod: [{}] with cost: [{}]. Namespace: [{}]",
+ resourceLogDescription,
+ podName,
newCost,
- podNamespace
+ kubernetesApi.namespace
)
- val request = ApiRequests.podDeletionCost(settings, apiToken,
podNamespace, newCost)
- val response =
- clientSslContext.map(http.singleRequest(request,
_)).getOrElse(http.singleRequest(request))
- toResult(response)(context.system).pipeTo(self)
+ implicit val dispatcher: ExecutionContext = context.system.dispatcher
+ updatePodCost(
+ kubernetesApi,
+ crName,
+ podName,
+ newCost,
+ cluster.selfUniqueAddress,
+ membersByAgeDesc,
+
settings.customResourceSettings.cleanupAfter)(context.system).pipeTo(self)
+
context.become(idle(newCost, membersByAgeDesc, retryNr))
- } else context.become(idle(existingCost, membersByAgeDesc, retryNr))
+ } else {
+ context.become(idle(existingCost, membersByAgeDesc, retryNr))
+ }
}
}
@@ -206,23 +210,72 @@ import scala.util.control.NonFatal
case class ScheduleRetry(cause: String) extends RequestResult
case class GiveUp(cause: String) extends RequestResult
- private[kubernetes] def toResult(futResponse: Future[HttpResponse])(
+ def props(
+ settings: KubernetesSettings,
+ costSettings: PodDeletionCostSettings,
+ kubernetesApi: KubernetesApi,
+ crName: Option[String]
+ ): Props =
+ Props(new PodDeletionCostAnnotator(settings, costSettings, kubernetesApi,
crName))
+
+ private def updatePodCost(
+ kubernetesApi: KubernetesApi,
+ crNameOpt: Option[String],
+ podName: String,
+ newCost: Int,
+ selfUniqueAddress: UniqueAddress,
+ membersByAgeDesc: immutable.SortedSet[Member],
+ cleanupAfter: FiniteDuration)(implicit system: ActorSystem):
Future[RequestResult] = {
+ import system.dispatcher
+ crNameOpt match {
+ case Some(crName) =>
+ val response =
+ kubernetesApi.readOrCreatePodCostResource(crName).flatMap { cr =>
+ val now = System.currentTimeMillis()
+ val newPodCost =
+ PodCost(podName, newCost, selfUniqueAddress.address.toString,
selfUniqueAddress.longUid, now)
+ val newPods = cr.pods.filterNot { podCost =>
+ // remove entry that is to be added for this podName
+ podCost.podName == podName ||
+ // remove entries that don't exist in the cluster membership any
more
+ (podCost.uniqueAddress.address.system ==
selfUniqueAddress.address.system && // only same cluster
+ now - podCost.time > cleanupAfter.toMillis && // in case new
member hasn't been seen yet
+ !membersByAgeDesc.exists(_.uniqueAddress ==
podCost.uniqueAddress) // removed, not in cluster membership
+ )
+ } :+ newPodCost
+ kubernetesApi.updatePodCostResource(crName, cr.version, newPods)
+ }
+ updatePodCostResourceResult(response)
+ case None =>
+ val response = kubernetesApi.updatePodDeletionCostAnnotation(podName,
newCost)
+ updatePodDeletionCostAnnotationResult(response)
+ }
+ }
+
+ private def updatePodDeletionCostAnnotationResult(futResponse: Future[Done])(
+ implicit system: ActorSystem): Future[RequestResult] = {
+ import system.dispatcher
+ futResponse
+ .map {
+ case Done => PodAnnotated
+ }
+ .recover {
+ case e: PodCostClientException => GiveUp(e.getMessage)
+ case NonFatal(e) => ScheduleRetry(e.getMessage)
+ }
+ }
+
+ private def updatePodCostResourceResult(futResponse:
Future[Either[PodCostResource, PodCostResource]])(
implicit system: ActorSystem): Future[RequestResult] = {
import system.dispatcher
futResponse
.map {
- case HttpResponse(status, _, e, _) if status.isSuccess() =>
- e.discardBytes()
- PodAnnotated
- case HttpResponse(s @ ClientError(_), _, e, _) =>
- e.discardBytes()
- GiveUp(s.toString())
- case HttpResponse(status, _, e, _) =>
- e.discardBytes()
- ScheduleRetry(s"Request failed with status=$status")
+ case Right(_) => PodAnnotated
+ case Left(_) => ScheduleRetry("Request failed with conflict")
}
.recover {
- case NonFatal(e) => ScheduleRetry(e.getMessage)
+ case e: PodCostClientException => GiveUp(e.getMessage)
+ case NonFatal(e) => ScheduleRetry(e.getMessage)
}
}
}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
new file mode 100644
index 00000000..10ae084b
--- /dev/null
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.rollingupdate.kubernetes
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent.MemberUp
+import pekko.cluster.Member
+import pekko.cluster.MemberStatus
+import pekko.cluster.UniqueAddress
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Millis
+import org.scalatest.time.Seconds
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object PodDeletionCostAnnotatorCrSpec {
+ val config = ConfigFactory.parseString("""
+ pekko.loggers = ["org.apache.pekko.testkit.TestEventListener"]
+ pekko.actor.provider = cluster
+ pekko.rollingupdate.kubernetes.pod-deletion-cost.retry-delay = 1s
+
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.artery.canonical.hostname = 127.0.0.1
+
+ pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
+ pekko.coordinated-shutdown.terminate-actor-system = off
+ pekko.coordinated-shutdown.run-by-actor-system-terminate = off
+ pekko.test.filter-leeway = 10s
+ """)
+
+ private[pekko] trait TestCallCount {
+ val callCount = new AtomicInteger()
+
+ def getCallCount(): Int = callCount.get()
+ }
+
+ private[pekko] class TestKubernetesApi extends KubernetesApi {
+ private var version = 1
+ private var podCosts = Vector.empty[PodCost]
+
+ override def namespace: String = "namespace-test"
+
+ override def updatePodDeletionCostAnnotation(podName: String, cost: Int):
Future[Done] =
+ Future.successful(Done)
+
+ override def readOrCreatePodCostResource(crName: String):
Future[PodCostResource] = this.synchronized {
+ Future.successful(PodCostResource(version.toString, podCosts))
+ }
+
+ override def updatePodCostResource(
+ crName: String,
+ v: String,
+ pods: immutable.Seq[PodCost]): Future[Either[PodCostResource,
PodCostResource]] = this.synchronized {
+
+ podCosts = pods.toVector
+ version = v.toInt + 1
+
+ Future.successful(Right(PodCostResource(version.toString, podCosts)))
+ }
+
+ def getPodCosts(): Vector[PodCost] = this.synchronized {
+ podCosts
+ }
+ }
+}
+
+class PodDeletionCostAnnotatorCrSpec
+ extends TestKit(
+ ActorSystem(
+ "PodDeletionCostAnnotatorCrSpec",
+ PodDeletionCostAnnotatorCrSpec.config
+ ))
+ with ImplicitSender
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with Eventually {
+
+ import PodDeletionCostAnnotatorCrSpec._
+
+ private val namespace = "namespace-test"
+ private val podName1 = "pod-test-1"
+ private val podName2 = "pod-test-2"
+ private lazy val system2 = ActorSystem("PodDeletionCostAnnotatorCrSpec",
PodDeletionCostAnnotatorCrSpec.config)
+
+ private def settings(podName: String) = {
+ new KubernetesSettings(
+ apiCaPath = "",
+ apiTokenPath = "",
+ apiServiceHost = "localhost",
+ apiServicePort = 0,
+ namespace = Some(namespace),
+ namespacePath = "",
+ podName = podName,
+ secure = false,
+ apiServiceRequestTimeout = 2.seconds,
+ customResourceSettings = new CustomResourceSettings(enabled = true,
crName = Some("test-cr"), 60.seconds)
+ )
+ }
+
+ private def annotatorProps(pod: String, api: KubernetesApi) =
PodDeletionCostAnnotator.props(
+ settings(pod),
+
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes")),
+ api,
+ crName = Some("test-cr")
+ )
+
+ override implicit val patienceConfig: PatienceConfig =
+ PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
+
+ override protected def afterAll(): Unit = {
+ super.shutdown()
+ TestKit.shutdownActorSystem(system2)
+ }
+
+ override protected def beforeEach(): Unit = {}
+
+ "The pod-deletion-cost annotator with CRD" should {
+
+ "have a single node cluster running first" in {
+ val probe = TestProbe()
+ Cluster(system).join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+ }
+
+ "write pod cost to custom resource" in {
+ val api = new TestKubernetesApi
+ EventFilter
+ .info(pattern = ".*Updating PodCost CR.*", occurrences = 1)
+ .intercept {
+ system.actorOf(annotatorProps(podName1, api))
+ }
+ eventually {
+ api.getPodCosts() should have size 1
+ api.getPodCosts().head.podName shouldEqual podName1
+ }
+ }
+
+ "update pod cost for second node in the cluster" in {
+ val api = new TestKubernetesApi
+
+ val probe = TestProbe()
+ Cluster(system2).join(Cluster(system).selfMember.address)
+ probe.awaitAssert({
+ Cluster(system2).selfMember.status == MemberStatus.Up
+ }, 3.seconds)
+
+ system2.actorOf(annotatorProps(podName2, api))
+
+ eventually {
+ val costs = api.getPodCosts()
+ costs.map(_.podName) should contain(podName2)
+ }
+ }
+
+ }
+
+}
diff --git
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
index c0aeeecb..6109eb4e 100644
---
a/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
+++
b/rolling-update-kubernetes/src/test/scala/org/apache/pekko/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala
@@ -13,20 +13,20 @@
package org.apache.pekko.rollingupdate.kubernetes
-import org.apache.pekko.actor.ActorSystem
-import org.apache.pekko.actor.Address
-import org.apache.pekko.actor.Props
-import org.apache.pekko.cluster.Cluster
-import org.apache.pekko.cluster.ClusterEvent.MemberUp
-import org.apache.pekko.cluster.Member
-import org.apache.pekko.cluster.MemberStatus
-import org.apache.pekko.cluster.MemberStatus.Up
-import org.apache.pekko.cluster.UniqueAddress
-import org.apache.pekko.testkit.EventFilter
-import org.apache.pekko.testkit.ImplicitSender
-import org.apache.pekko.testkit.TestKit
-import org.apache.pekko.testkit.TestProbe
-import org.apache.pekko.util.Version
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.actor.Address
+import pekko.cluster.Cluster
+import pekko.cluster.ClusterEvent.MemberUp
+import pekko.cluster.Member
+import pekko.cluster.MemberStatus
+import pekko.cluster.MemberStatus.Up
+import pekko.cluster.UniqueAddress
+import pekko.testkit.EventFilter
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+import pekko.util.Version
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.MappingBuilder
import com.github.tomakehurst.wiremock.client.WireMock
@@ -99,16 +99,27 @@ class PodDeletionCostAnnotatorSpec
namespace = Some(namespace),
namespacePath = "",
podName = podName,
- secure = false)
+ secure = false,
+ apiServiceRequestTimeout = 2.seconds,
+ customResourceSettings = new CustomResourceSettings(enabled = false,
crName = None, cleanupAfter = 60.seconds)
+ )
}
- private def annotatorProps(pod: String) = Props(
- classOf[PodDeletionCostAnnotator],
- settings(pod),
- "apiToken",
- namespace,
-
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes"))
- )
+ private def kubernetesApi(pod: String) =
+ new KubernetesApiImpl(
+ system,
+ settings(pod),
+ namespace,
+ apiToken = "apiToken",
+ clientHttpsConnectionContext = None)
+
+ private def annotatorProps(pod: String) =
+ PodDeletionCostAnnotator.props(
+ settings(pod),
+
PodDeletionCostSettings(system.settings.config.getConfig("pekko.rollingupdate.kubernetes")),
+ kubernetesApi(pod),
+ crName = None
+ )
override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
@@ -241,7 +252,7 @@ class PodDeletionCostAnnotatorSpec
assertState(scenarioName, "FAILING")
- val underTest = expectLogWarning(".*Failed to update annotation:.*") {
+ val underTest = expectLogWarning(".*Failed to update pod-deletion-cost
annotation:.*") {
system.actorOf(annotatorProps(podName1))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]