This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new bb8dcdea #217 Support native kubernetes leases (#218)
bb8dcdea is described below
commit bb8dcdead22a37546c51295dc24982b2941be0c8
Author: Iilun <[email protected]>
AuthorDate: Tue May 14 12:28:39 2024 +0200
#217 Support native kubernetes leases (#218)
* #217 Support native kubernetes leases
* * Reverting documentation header changes
* Reverting rename of KubernetesApiImpl
* Fixing license of new classes
* * Reverting renaming of Json classes
* Fixing wrong import style
* Fixing remaining wrong import style
* Fixing binary incompatibility
* Fix binary incompatibilities
* Create kubernetes-lease.backwards.excludes
* Add mima incompatibilities to exclude
---------
Co-authored-by: Iilun <[email protected]>
Co-authored-by: PJ Fanning <[email protected]>
---
docs/src/main/paradox/kubernetes-lease.md | 31 ++-
.../kubernetes-lease.backwards.excludes | 22 ++
...esLease.scala => AbstractKubernetesLease.scala} | 22 +-
.../lease/kubernetes/KubernetesLease.scala | 104 +-------
.../lease/kubernetes/KubernetesSettings.scala | 2 +-
.../lease/kubernetes/NativeKubernetesLease.scala | 35 +++
.../internal/AbstractKubernetesApiImpl.scala | 191 ++++++++++++++
.../kubernetes/internal/KubernetesApiImpl.scala | 199 ++-------------
.../internal/KubernetesJsonSupport.scala | 18 ++
.../internal/NativeKubernetesApiImpl.scala | 196 ++++++++++++++
.../lease/kubernetes/NativeKubernetesApiSpec.scala | 284 +++++++++++++++++++++
11 files changed, 808 insertions(+), 296 deletions(-)
diff --git a/docs/src/main/paradox/kubernetes-lease.md
b/docs/src/main/paradox/kubernetes-lease.md
index 6e9aa011..d0b8f413 100644
--- a/docs/src/main/paradox/kubernetes-lease.md
+++ b/docs/src/main/paradox/kubernetes-lease.md
@@ -7,10 +7,14 @@ The API, configuration and behavior may change based on
feedback from initial us
@@@
-This module is an implementation of a [Pekko Coordination
Lease](https://pekko.apache.org/docs/pekko/current/coordination.html#lease)
backed
-by a [Custom Resource Definition
(CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
in Kubernetes.
-Resources in Kubernetes offer [concurrency control and
consistency](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
-that have been used to build a distributed lease/lock.
+This module is an implementation of a [Pekko Coordination
Lease](https://pekko.apache.org/docs/pekko/current/coordination.html#lease) in
Kubernetes backed
+by two implementations:
+
+* a [Custom Resource Definition
(CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/).
Resources in Kubernetes offer [concurrency control and
consistency](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
+ that have been used to build a distributed lease/lock.
+* a native [Kubernetes Lease
Object](https://kubernetes.io/docs/concepts/architecture/leases/).
+
+
A lease can be used for:
@@ -44,7 +48,7 @@ different `ActorSystem` names because they all need a
separate lease.
#### Creating the Custom Resource Definition for the lease
-This requires admin privileges to your Kubernetes / Open Shift cluster but
only needs doing once.
+For the CRD Implementation, a Custom Resource must be created. This requires
admin privileges to your Kubernetes / Open Shift cluster but only needs doing
once.
Kubernetes:
@@ -56,6 +60,16 @@ Where lease.yml contains:
@@snip[lease.yaml](/lease-kubernetes/lease.yml)
+#### Enable the native implementation
+
+To enable the native implementation, the lease class must be changed in the
configuration to the following value:
+
+```
+pekko.coordination.lease.kubernetes {
+ lease-class =
"org.apache.pekko.coordination.lease.kubernetes.NativeKubernetesLease"
+}
+```
+
#### Role based access control
Each pod needs permission to read/create and update lease resources. They only
need access
@@ -69,9 +83,14 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: lease-access
rules:
+ # Using CRD implementation
- apiGroups: ["pekko.apache.org"]
resources: ["leases"]
verbs: ["get", "create", "update", "list"]
+ # Using native implementation
+ - apiGroups: ["coordination.k8s.io"]
+ resources: ["leases"]
+ verbs: ["get", "create", "update", "list"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
@@ -79,7 +98,7 @@ metadata:
name: lease-access
subjects:
- kind: User
- name: system:serviceaccount:<YOUR NAMSPACE>:default
+ name: system:serviceaccount:<YOUR NAMESPACE>:default
roleRef:
kind: Role
name: lease-access
diff --git
a/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
b/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
new file mode 100644
index 00000000..5e27005a
--- /dev/null
+++
b/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# The new Kubernetes Native Lease adds new classes and fields
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.org$apache$pekko$coordination$lease$kubernetes$internal$KubernetesJsonSupport$_setter_$nativeSpecFormat_=")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.org$apache$pekko$coordination$lease$kubernetes$internal$KubernetesJsonSupport$_setter_$leaseNativeResourceFormat_=")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.nativeSpecFormat")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.leaseNativeResourceFormat")
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
similarity index 86%
copy from
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
copy to
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
index d3104c2d..d8a6a20d 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
@@ -16,16 +16,13 @@ package org.apache.pekko.coordination.lease.kubernetes
import java.text.Normalizer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
-
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.annotation.nowarn
-
import org.apache.pekko
+import
pekko.coordination.lease.kubernetes.AbstractKubernetesLease.makeDNS1039Compatible
import pekko.actor.ExtendedActorSystem
-import
pekko.coordination.lease.kubernetes.KubernetesLease.makeDNS1039Compatible
import pekko.coordination.lease.kubernetes.LeaseActor._
-import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
import pekko.coordination.lease.scaladsl.Lease
import pekko.coordination.lease.LeaseException
import pekko.coordination.lease.LeaseSettings
@@ -36,7 +33,7 @@ import pekko.util.ConstantFun
import pekko.util.Timeout
import org.slf4j.LoggerFactory
-object KubernetesLease {
+object AbstractKubernetesLease {
val configPath = "pekko.coordination.lease.kubernetes"
private val leaseCounter = new AtomicInteger(1)
@@ -64,24 +61,23 @@ object KubernetesLease {
}
}
-class KubernetesLease private[pekko] (system: ExtendedActorSystem, leaseTaken:
AtomicBoolean, settings: LeaseSettings)
- extends Lease(settings) {
+abstract class AbstractKubernetesLease(system: ExtendedActorSystem,
leaseTaken: AtomicBoolean,
+ settings: LeaseSettings) extends Lease(settings) {
import pekko.pattern.ask
private val logger = LoggerFactory.getLogger(classOf[KubernetesLease])
- private val k8sSettings = KubernetesSettings(settings.leaseConfig,
settings.timeoutSettings)
- private val k8sApi = new KubernetesApiImpl(system, k8sSettings)
- private implicit val timeout: Timeout =
Timeout(settings.timeoutSettings.operationTimeout)
+ protected val k8sSettings: KubernetesSettings =
KubernetesSettings(settings.leaseConfig, settings.timeoutSettings)
- def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
- this(system, new AtomicBoolean(false), leaseSettings)
+ protected def k8sApi: KubernetesApi
+
+ private implicit val timeout: Timeout =
Timeout(settings.timeoutSettings.operationTimeout)
private val leaseName = makeDNS1039Compatible(settings.leaseName)
private val leaseActor = system.systemActorOf(
LeaseActor.props(k8sApi, settings, leaseName, leaseTaken),
- s"kubernetesLease${KubernetesLease.leaseCounter.incrementAndGet}")
+ s"kubernetesLease${AbstractKubernetesLease.leaseCounter.incrementAndGet}")
if (leaseName != settings.leaseName) {
logger.info(
"Original lease name [{}] sanitized for kubernetes: [{}]",
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
index d3104c2d..9caf493f 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
@@ -13,115 +13,31 @@
package org.apache.pekko.coordination.lease.kubernetes
-import java.text.Normalizer
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.Future
-import scala.util.{ Failure, Success }
-import scala.annotation.nowarn
-
import org.apache.pekko
import pekko.actor.ExtendedActorSystem
-import
pekko.coordination.lease.kubernetes.KubernetesLease.makeDNS1039Compatible
-import pekko.coordination.lease.kubernetes.LeaseActor._
import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
-import pekko.coordination.lease.scaladsl.Lease
-import pekko.coordination.lease.LeaseException
import pekko.coordination.lease.LeaseSettings
-import pekko.coordination.lease.LeaseTimeoutException
-import pekko.dispatch.ExecutionContexts
-import pekko.pattern.AskTimeoutException
-import pekko.util.ConstantFun
-import pekko.util.Timeout
-import org.slf4j.LoggerFactory
-
-object KubernetesLease {
- val configPath = "pekko.coordination.lease.kubernetes"
- private val leaseCounter = new AtomicInteger(1)
-
- /**
- * Limit the length of a name to 63 characters.
- * Some subsystem of Kubernetes cannot manage longer names.
- */
- private def truncateTo63Characters(name: String): String = name.take(63)
- /**
- * Removes from the leading and trailing positions the specified characters.
- */
- private def trim(name: String, characters: List[Char]): String =
-
name.dropWhile(characters.contains(_)).reverse.dropWhile(characters.contains(_)).reverse
+import scala.concurrent.Future
- /**
- * Make a name compatible with DNS 1039 standard: like a single domain name
segment.
- * Regex to follow: [a-z]([-a-z0-9]*[a-z0-9])
- * Limit the resulting name to 63 characters
- */
- private def makeDNS1039Compatible(name: String): String = {
- val normalized =
- Normalizer.normalize(name,
Normalizer.Form.NFKD).toLowerCase.replaceAll("[_.]",
"-").replaceAll("[^-a-z0-9]", "")
- trim(truncateTo63Characters(normalized), List('-'))
- }
+object KubernetesLease {
+ val configPath: String = AbstractKubernetesLease.configPath
}
-class KubernetesLease private[pekko] (system: ExtendedActorSystem, leaseTaken:
AtomicBoolean, settings: LeaseSettings)
- extends Lease(settings) {
-
- import pekko.pattern.ask
+class KubernetesLease(system: ExtendedActorSystem, leaseTaken: AtomicBoolean,
settings: LeaseSettings)
+ extends AbstractKubernetesLease(system, leaseTaken, settings) {
- private val logger = LoggerFactory.getLogger(classOf[KubernetesLease])
-
- private val k8sSettings = KubernetesSettings(settings.leaseConfig,
settings.timeoutSettings)
- private val k8sApi = new KubernetesApiImpl(system, k8sSettings)
- private implicit val timeout: Timeout =
Timeout(settings.timeoutSettings.operationTimeout)
+ override def k8sApi = new KubernetesApiImpl(system, k8sSettings)
def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
this(system, new AtomicBoolean(false), leaseSettings)
- private val leaseName = makeDNS1039Compatible(settings.leaseName)
- private val leaseActor = system.systemActorOf(
- LeaseActor.props(k8sApi, settings, leaseName, leaseTaken),
- s"kubernetesLease${KubernetesLease.leaseCounter.incrementAndGet}")
- if (leaseName != settings.leaseName) {
- logger.info(
- "Original lease name [{}] sanitized for kubernetes: [{}]",
- Array[Object](settings.leaseName, leaseName): _*)
- }
- logger.debug(
- "Starting kubernetes lease actor [{}] for lease [{}], owner [{}]",
- leaseActor,
- leaseName,
- settings.ownerName)
-
- override def checkLease(): Boolean = leaseTaken.get()
+ override def checkLease(): Boolean = super.checkLease()
- @nowarn("msg=match may not be exhaustive")
- override def release(): Future[Boolean] = {
- (leaseActor ? Release())
- .transform {
- case Success(LeaseReleased) => Success(true)
- case Success(InvalidRequest(msg)) => Failure(new LeaseException(msg))
- case Failure(_: AskTimeoutException) => Failure(
- new LeaseTimeoutException(
- s"Timed out trying to release lease [$leaseName,
${settings.ownerName}]. It may still be taken."))
- case Failure(exception) => Failure(exception)
- }(ExecutionContexts.parasitic)
- }
+ override def release(): Future[Boolean] = super.release()
- override def acquire(): Future[Boolean] = {
- acquire(ConstantFun.scalaAnyToUnit)
+ override def acquire(): Future[Boolean] = super.acquire()
- }
- @nowarn("msg=match may not be exhaustive")
- override def acquire(leaseLostCallback: Option[Throwable] => Unit):
Future[Boolean] = {
- (leaseActor ? Acquire(leaseLostCallback))
- .transform {
- case Success(LeaseAcquired) => Success(true)
- case Success(LeaseTaken) => Success(false)
- case Success(InvalidRequest(msg)) => Failure(new LeaseException(msg))
- case Failure(_: AskTimeoutException) => Failure(new
LeaseTimeoutException(
- s"Timed out trying to acquire lease [$leaseName,
${settings.ownerName}]. It may still be taken."))
- case Failure(exception) => Failure(exception)
- }(ExecutionContexts.parasitic)
- }
+ override def acquire(leaseLostCallback: Option[Throwable] => Unit):
Future[Boolean] = super.acquire(leaseLostCallback)
}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
index 41828223..ffa1d3ac 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
@@ -39,7 +39,7 @@ private[pekko] object KubernetesSettings {
}
def apply(system: ActorSystem, leaseTimeoutSettings: TimeoutSettings):
KubernetesSettings = {
- apply(system.settings.config.getConfig(KubernetesLease.configPath),
leaseTimeoutSettings)
+
apply(system.settings.config.getConfig(AbstractKubernetesLease.configPath),
leaseTimeoutSettings)
}
def apply(config: Config, leaseTimeoutSettings: TimeoutSettings):
KubernetesSettings = {
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
new file mode 100644
index 00000000..3b82fb52
--- /dev/null
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes
+
+import org.apache.pekko
+import pekko.actor.ExtendedActorSystem
+import pekko.coordination.lease.LeaseSettings
+import pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+class NativeKubernetesLease private[pekko] (system: ExtendedActorSystem,
leaseTaken: AtomicBoolean,
+ settings: LeaseSettings)
+ extends AbstractKubernetesLease(system, leaseTaken, settings) {
+
+ override def k8sApi = new NativeKubernetesApiImpl(system, k8sSettings)
+
+ def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
+ this(system, new AtomicBoolean(false), leaseSettings)
+}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
new file mode 100644
index 00000000..2f1af141
--- /dev/null
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes.internal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.coordination.lease.kubernetes.{ KubernetesApi,
KubernetesSettings, LeaseResource }
+import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
+import pekko.event.{ LogSource, Logging, LoggingAdapter }
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import pekko.http.scaladsl.{ ConnectionContext, Http, HttpExt,
HttpsConnectionContext }
+import pekko.pattern.after
+import pekko.pki.kubernetes.PemManagersProvider
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{ Files, Paths }
+import java.security.{ KeyStore, SecureRandom }
+import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager
}
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+/**
+ * Could be shared between leases:
https://github.com/akka/akka-management/issues/680
+ * INTERNAL API
+ */
+@InternalApi private[pekko] abstract class AbstractKubernetesApiImpl(system:
ActorSystem, settings: KubernetesSettings)
+ extends KubernetesApi
+ with KubernetesJsonSupport {
+
+ import system.dispatcher
+
+ protected implicit val sys: ActorSystem = system
+ protected val log: LoggingAdapter = Logging(system,
getClass)(LogSource.fromClass)
+ private val http: HttpExt = Http()(system)
+
+ private lazy val sslContext: SSLContext = {
+ val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+ val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+ val keyStore = KeyStore.getInstance("PKCS12")
+ keyStore.load(null)
+ factory.init(keyStore, Array.empty)
+ val km: Array[KeyManager] = factory.getKeyManagers
+ val tm: Array[TrustManager] =
+ PemManagersProvider.buildTrustManagers(certificates)
+ val random: SecureRandom = new SecureRandom
+ val sslContext = SSLContext.getInstance("TLSv1.2")
+ sslContext.init(km, tm, random)
+ sslContext
+ }
+
+ private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(sslContext)
+
+ protected val namespace: String =
+
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath,
"namespace")).getOrElse("default")
+
+ protected val scheme: String = if (settings.secure) "https" else "http"
+ private lazy val apiToken =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
+ private lazy val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+
+ log.debug("kubernetes access namespace: {}. Secure: {}", namespace,
settings.secure)
+
+ protected def createLeaseResource(name: String):
Future[Option[LeaseResource]]
+
+ protected def getLeaseResource(name: String): Future[Option[LeaseResource]]
+
+ protected def pathForLease(name: String): Uri.Path
+
+ override def readOrCreateLeaseResource(name: String): Future[LeaseResource]
= {
+ // TODO backoff retry
+ val maxTries = 5
+
+ def loop(tries: Int = 0): Future[LeaseResource] = {
+ log.debug("Trying to create lease {}", tries)
+ for {
+ olr <- getLeaseResource(name)
+ lr <- olr match {
+ case Some(found) =>
+ log.debug("{} already exists. Returning {}", name, found)
+ Future.successful(found)
+ case None =>
+ log.info("lease {} does not exist, creating", name)
+ createLeaseResource(name).flatMap {
+ case Some(created) => Future.successful(created)
+ case None =>
+ if (tries < maxTries) loop(tries + 1)
+ else Future.failed(new LeaseException(s"Unable to create or
read lease after $maxTries tries"))
+ }
+ }
+ } yield lr
+ }
+
+ loop()
+ }
+
+ private[pekko] def removeLease(name: String): Future[Done] = {
+ for {
+ response <- makeRequest(
+ requestForPath(pathForLease(name), HttpMethods.DELETE),
+ s"Timed out removing lease [$name]. It is not known if the remove
happened")
+
+ result <- response.status match {
+ case StatusCodes.OK =>
+ log.debug("Lease deleted {}", name)
+ response.discardEntityBytes()
+ Future.successful(Done)
+ case StatusCodes.NotFound =>
+ log.debug("Lease already deleted {}", name)
+ response.discardEntityBytes()
+ Future.successful(Done) // already deleted
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .flatMap(body => {
+ Future.failed(
+ new LeaseException(s"Unexpected status code when deleting
lease. Status: $unexpected. Body: $body"))
+ })
+ }
+ } yield result
+ }
+
+ protected def handleUnauthorized(response: HttpResponse): Future[Nothing] = {
+ Unmarshal(response.entity)
+ .to[String]
+ .flatMap(body => {
+ Future.failed(new LeaseException(
+ s"Unauthorized to communicate with Kubernetes API server. See
https://pekko.apache.org/docs/pekko-management/current/kubernetes-lease.html#role-based-access-control
for setting up access control. Body: $body"))
+ })
+ }
+
+ protected def requestForPath(
+ path: Uri.Path,
+ method: HttpMethod = HttpMethods.GET,
+ entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+ val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port =
settings.apiServerPort).withPath(path)
+ HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+ }
+
+ protected def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
+ val response =
+ if (settings.secure)
+ http.singleRequest(request, clientSslContext)
+ else
+ http.singleRequest(request)
+
+ // make sure we always consume response body (in case of timeout)
+ val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
+
+ val timeout = after(settings.apiServerRequestTimeout, using =
system.scheduler)(
+ Future.failed(new LeaseTimeoutException(s"$timeoutMsg. Is the API server
up?")))
+
+ Future.firstCompletedOf(Seq(strictResponse, timeout))
+ }
+
+ /**
+ * This uses blocking IO, and so should only be used to read configuration
at startup.
+ */
+ protected def readConfigVarFromFilesystem(path: String, name: String):
Option[String] = {
+ val file = Paths.get(path)
+ if (Files.exists(file)) {
+ try {
+ Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
+ } catch {
+ case NonFatal(e) =>
+ log.error(e, "Error reading {} from {}", name, path)
+ None
+ }
+ } else {
+ log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
+ None
+ }
+ }
+
+}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
index 4b559ca4..d3abbf6d 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
@@ -13,110 +13,26 @@
package org.apache.pekko.coordination.lease.kubernetes.internal
-import java.nio.charset.StandardCharsets
-import java.nio.file.{ Files, Paths }
-import java.security.{ KeyStore, SecureRandom }
-
-import scala.collection.immutable
import scala.concurrent.Future
-import scala.util.control.NonFatal
import org.apache.pekko
-import pekko.Done
import pekko.actor.ActorSystem
import pekko.annotation.InternalApi
-import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
-import pekko.coordination.lease.kubernetes.{ KubernetesApi,
KubernetesSettings, LeaseResource }
-import pekko.event.{ LogSource, Logging }
-import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext }
+import pekko.coordination.lease.LeaseException
+import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource
}
import pekko.http.scaladsl.marshalling.Marshal
import pekko.http.scaladsl.model._
-import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import pekko.http.scaladsl.unmarshalling.Unmarshal
-import pekko.pattern.after
-import pekko.pki.kubernetes.PemManagersProvider
-
-import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager
}
/**
* Could be shared between leases:
https://github.com/akka/akka-management/issues/680
* INTERNAL API
*/
@InternalApi private[pekko] class KubernetesApiImpl(system: ActorSystem,
settings: KubernetesSettings)
- extends KubernetesApi
- with KubernetesJsonSupport {
+ extends AbstractKubernetesApiImpl(system, settings) {
import system.dispatcher
- private implicit val sys: ActorSystem = system
- private val log = Logging(system, getClass)(LogSource.fromClass)
- private val http = Http()(system)
-
- private lazy val sslContext = {
- val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
- val factory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
- val keyStore = KeyStore.getInstance("PKCS12")
- keyStore.load(null)
- factory.init(keyStore, Array.empty)
- val km: Array[KeyManager] = factory.getKeyManagers
- val tm: Array[TrustManager] =
- PemManagersProvider.buildTrustManagers(certificates)
- val random: SecureRandom = new SecureRandom
- val sslContext = SSLContext.getInstance("TLSv1.2")
- sslContext.init(km, tm, random)
- sslContext
- }
-
- private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(sslContext)
-
- private val namespace =
-
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath,
"namespace")).getOrElse("default")
-
- private val scheme = if (settings.secure) "https" else "http"
- private lazy val apiToken =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
- private lazy val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
-
- log.debug("kubernetes access namespace: {}. Secure: {}", namespace,
settings.secure)
-
- /*
- PATH: to get all: /apis/pekko.apache.org/v1/namespaces/<namespace>/leases
- PATH: to get a specific one:
/apis/pekko.apache.org/v1/namespaces/<namespace>/leases/<lease-name>
- curl -v -X POST
localhost:8080/apis/pekko.apache.org/v1/namespaces/lease/leases/ -H
"Content-Type: application/yaml" --data-binary "@lease-example.yml"
-
- responds with either:
- 409 Conflict Already Exists
-
- OR
-
- 201 Created if it works
- */
- override def readOrCreateLeaseResource(name: String): Future[LeaseResource]
= {
- // TODO backoff retry
- val maxTries = 5
-
- def loop(tries: Int = 0): Future[LeaseResource] = {
- log.debug("Trying to create lease {}", tries)
- for {
- olr <- getLeaseResource(name)
- lr <- olr match {
- case Some(found) =>
- log.debug("{} already exists. Returning {}", name, found)
- Future.successful(found)
- case None =>
- log.info("lease {} does not exist, creating", name)
- createLeaseResource(name).flatMap {
- case Some(created) => Future.successful(created)
- case None =>
- if (tries < maxTries) loop(tries + 1)
- else Future.failed(new LeaseException(s"Unable to create or
read lease after $maxTries tries"))
- }
- }
- } yield lr
- }
-
- loop()
- }
-
/*
curl -v -X PUT
localhost:8080/apis/pekko.apache.org/v1/namespaces/lease/leases/sbr-lease
--data-binary "@sbr-lease.yml" -H "Content-Type: application/yaml"
PUTs must contain resourceVersions. Response:
@@ -130,9 +46,9 @@ PUTs must contain resourceVersions. Response:
*
* Can return one of three things:
* - Future.Failure, e.g. timed out waiting for k8s api server to respond
- * - Future.sucess[Left(resource)]: the update failed due to version not
matching current in the k8s api server.
+ * - Future.success[Left(resource)]: the update failed due to version not
matching current in the k8s api server.
* In this case the current resource is returned so the version can be
used for subsequent calls
- * - Future.sucess[Right(resource)]: Returns the LeaseResource that
contains the clientName and new version.
+ * - Future.success[Right(resource)]: Returns the LeaseResource that
contains the clientName and new version.
* The new version should be used for any subsequent calls
*/
override def updateLeaseResource(
@@ -180,35 +96,7 @@ PUTs must contain resourceVersions. Response:
} yield result
}
- private[pekko] def removeLease(name: String): Future[Done] = {
- for {
- response <- makeRequest(
- requestForPath(pathForLease(name), HttpMethods.DELETE),
- s"Timed out removing lease [$name]. It is not known if the remove
happened")
-
- result <- response.status match {
- case StatusCodes.OK =>
- log.debug("Lease deleted {}", name)
- response.discardEntityBytes()
- Future.successful(Done)
- case StatusCodes.NotFound =>
- log.debug("Lease already deleted {}", name)
- response.discardEntityBytes()
- Future.successful(Done) // already deleted
- case StatusCodes.Unauthorized =>
- handleUnauthorized(response)
- case unexpected =>
- Unmarshal(response.entity)
- .to[String]
- .flatMap(body => {
- Future.failed(
- new LeaseException(s"Unexpected status code when deleting
lease. Status: $unexpected. Body: $body"))
- })
- }
- } yield result
- }
-
- private def getLeaseResource(name: String): Future[Option[LeaseResource]] = {
+ override def getLeaseResource(name: String): Future[Option[LeaseResource]] =
{
val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed
out reading lease $name")
for {
response <- fResponse
@@ -239,57 +127,12 @@ PUTs must contain resourceVersions. Response:
} yield lr
}
- private def handleUnauthorized(response: HttpResponse) = {
- Unmarshal(response.entity)
- .to[String]
- .flatMap(body => {
- Future.failed(new LeaseException(
- s"Unauthorized to communicate with Kubernetes API server. See
https://pekko.apache.org/docs/pekko-management/current/kubernetes-lease.html#role-based-access-control
for setting up access control. Body: $body"))
- })
- }
-
- private def pathForLease(name: String): Uri.Path =
+ override def pathForLease(name: String): Uri.Path =
Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" /
namespace / "leases" / name
.replaceAll("[^\\d\\w\\-\\.]", "")
.toLowerCase
- private def requestForPath(
- path: Uri.Path,
- method: HttpMethod = HttpMethods.GET,
- entity: RequestEntity = HttpEntity.Empty) = {
- val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port =
settings.apiServerPort).withPath(path)
- HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
- }
-
- private def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
- val response =
- if (settings.secure)
- http.singleRequest(request, clientSslContext)
- else
- http.singleRequest(request)
-
- // make sure we always consume response body (in case of timeout)
- val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
-
- val timeout = after(settings.apiServerRequestTimeout, using =
system.scheduler)(
- Future.failed(new LeaseTimeoutException(s"$timeoutMsg. Is the API server
up?")))
-
- Future.firstCompletedOf(Seq(strictResponse, timeout))
- }
-
- private def toLeaseResource(lcr: LeaseCustomResource) = {
- log.debug("Converting {}", lcr)
- require(
- lcr.metadata.resourceVersion.isDefined,
- s"LeaseCustomResource returned from Kubernetes without a
resourceVersion: $lcr")
- val owner = lcr.spec.owner match {
- case null | "" => None
- case other => Some(other)
- }
- LeaseResource(owner, lcr.metadata.resourceVersion.get, lcr.spec.time)
- }
-
- private def createLeaseResource(name: String): Future[Option[LeaseResource]]
= {
+ override def createLeaseResource(name: String):
Future[Option[LeaseResource]] = {
val lcr = LeaseCustomResource(Metadata(name, None), Spec("",
System.currentTimeMillis()))
for {
entity <- Marshal(lcr).to[RequestEntity]
@@ -321,23 +164,15 @@ PUTs must contain resourceVersions. Response:
} yield lr
}
- /**
- * This uses blocking IO, and so should only be used to read configuration
at startup.
- */
- protected def readConfigVarFromFilesystem(path: String, name: String):
Option[String] = {
- val file = Paths.get(path)
- if (Files.exists(file)) {
- try {
- Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
- } catch {
- case NonFatal(e) =>
- log.error(e, "Error reading {} from {}", name, path)
- None
- }
- } else {
- log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
- None
+ private def toLeaseResource(lcr: LeaseCustomResource) = {
+ log.debug("Converting {}", lcr)
+ require(
+ lcr.metadata.resourceVersion.isDefined,
+ s"LeaseCustomResource returned from Kubernetes without a
resourceVersion: $lcr")
+ val owner = lcr.spec.owner match {
+ case null | "" => None
+ case other => Some(other)
}
+ LeaseResource(owner, lcr.metadata.resourceVersion.get, lcr.spec.time)
}
-
}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
index 174df901..d50bd7b2 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
@@ -40,6 +40,22 @@ case class Metadata(name: String, resourceVersion:
Option[String])
@InternalApi
case class Spec(owner: String, time: Long)
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class NativeLeaseResource(
+ metadata: Metadata,
+ spec: NativeSpec,
+ kind: String = "Lease",
+ apiVersion: String = "coordination.k8s.io/v1")
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class NativeSpec(holderIdentity: String, acquireTime: String)
+
/**
* INTERNAL API
*/
@@ -47,5 +63,7 @@ case class Spec(owner: String, time: Long)
trait KubernetesJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val metadataFormat: JsonFormat[Metadata] =
jsonFormat2(Metadata.apply)
implicit val specFormat: JsonFormat[Spec] = jsonFormat2(Spec.apply)
+ implicit val nativeSpecFormat: JsonFormat[NativeSpec] =
jsonFormat2(NativeSpec.apply)
implicit val leaseCustomResourceFormat: RootJsonFormat[LeaseCustomResource]
= jsonFormat4(LeaseCustomResource.apply)
+ implicit val leaseNativeResourceFormat: RootJsonFormat[NativeLeaseResource]
= jsonFormat4(NativeLeaseResource.apply)
}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
new file mode 100644
index 00000000..6fb5eb07
--- /dev/null
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes.internal
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import
pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER
+import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource
}
+import pekko.coordination.lease.LeaseException
+import pekko.http.scaladsl.marshalling.Marshal
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import java.time.{ Instant, LocalDateTime, ZoneId }
+import java.time.format.{ DateTimeFormatter, DateTimeFormatterBuilder }
+import java.time.temporal.ChronoField
+import scala.concurrent.Future
+
+object NativeKubernetesApiImpl {
+ // From
https://github.com/kubernetes-client/java/blob/e50fb2a6f30d4f07e3922430307e5e09058aaea1/kubernetes/src/main/java/io/kubernetes/client/openapi/JSON.java#L57
+ val RFC3339MICRO_FORMATTER: DateTimeFormatter =
+ new DateTimeFormatterBuilder().parseDefaulting(ChronoField.OFFSET_SECONDS,
+
0).append(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")).optionalStart.appendFraction(
+ ChronoField.NANO_OF_SECOND, 6, 6,
true).optionalEnd.appendLiteral("Z").toFormatter
+}
+
+/**
+ * Could be shared between leases:
https://github.com/akka/akka-management/issues/680
+ * INTERNAL API
+ */
+@InternalApi private[pekko] class NativeKubernetesApiImpl(system: ActorSystem,
settings: KubernetesSettings)
+ extends AbstractKubernetesApiImpl(system, settings) {
+
+ import system.dispatcher
+
+ /**
+ * Update the named resource.
+ *
+ * Must [[readOrCreateLeaseResource]] to first to get a resource version.
+ *
+ * Can return one of three things:
+ * - Future.Failure, e.g. timed out waiting for k8s api server to respond
+ * - Future.sucess[Left(resource)]: the update failed due to version not
matching current in the k8s api server.
+ * In this case the current resource is returned so the version can be
used for subsequent calls
+ * - Future.sucess[Right(resource)]: Returns the LeaseResource that
contains the clientName and new version.
+ * The new version should be used for any subsequent calls
+ */
+ override def updateLeaseResource(
+ leaseName: String,
+ ownerName: String,
+ version: String,
+ time: Long = System.currentTimeMillis()): Future[Either[LeaseResource,
LeaseResource]] = {
+ val lcr = NativeLeaseResource(Metadata(leaseName, Some(version)),
NativeSpec(ownerName, currentTimeRFC3339))
+ for {
+ entity <- Marshal(lcr).to[RequestEntity]
+ response <- {
+ log.debug("updating {} to {}", leaseName, lcr)
+ makeRequest(
+ requestForPath(pathForLease(leaseName), method = HttpMethods.PUT,
entity),
+ s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is
not known if the update happened")
+ }
+ result <- response.status match {
+ case StatusCodes.OK =>
+ Unmarshal(response.entity)
+ .to[NativeLeaseResource]
+ .map(updatedLcr => {
+ log.debug("LCR after update: {}", updatedLcr)
+ Right(toLeaseResource(updatedLcr))
+ })
+ case StatusCodes.Conflict =>
+ getLeaseResource(leaseName).flatMap {
+ case None =>
+ Future.failed(
+ new LeaseException(s"GET after PUT conflict did not return a
lease. Lease[$leaseName-$ownerName]"))
+ case Some(lr) =>
+ log.debug("LeaseResource read after conflict: {}", lr)
+ Future.successful(Left(lr))
+ }
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .flatMap(body => {
+ Future.failed(
+ new LeaseException(
+ s"PUT for lease $leaseName returned unexpected status code
$unexpected. Body: $body"))
+ })
+ }
+ } yield result
+ }
+
+ override def getLeaseResource(name: String): Future[Option[LeaseResource]] =
{
+ val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed
out reading lease $name")
+ for {
+ response <- fResponse
+ entity <- response.entity.toStrict(settings.bodyReadTimeout)
+ lr <- response.status match {
+ case StatusCodes.OK =>
+ // it exists, parse it
+ log.debug("Resource {} exists: {}", name, entity)
+ Unmarshal(entity)
+ .to[NativeLeaseResource]
+ .map(lcr => {
+ Some(toLeaseResource(lcr))
+ })
+ case StatusCodes.NotFound =>
+ response.discardEntityBytes()
+ log.debug("Resource does not exist: {}", name)
+ Future.successful(None)
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ Unmarshal(response.entity)
+ .to[String]
+ .flatMap(body => {
+ Future.failed(new LeaseException(
+ s"Unexpected response from API server when retrieving lease
StatusCode: $unexpected. Body: $body"))
+ })
+ }
+ } yield lr
+ }
+
+ override def pathForLease(name: String): Uri.Path =
+ Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" /
namespace / "leases" / name
+ .replaceAll("[^\\d\\w\\-\\.]", "")
+ .toLowerCase
+
+ override def createLeaseResource(name: String):
Future[Option[LeaseResource]] = {
+ val lcr = NativeLeaseResource(Metadata(name, None), NativeSpec("",
currentTimeRFC3339))
+ for {
+ entity <- Marshal(lcr).to[RequestEntity]
+ response <- makeRequest(
+ requestForPath(pathForLease(""), HttpMethods.POST, entity = entity),
+ s"Timed out creating lease $name")
+ responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+ lr <- response.status match {
+ case StatusCodes.Created =>
+ log.debug("lease resource created")
+ Unmarshal(responseEntity).to[NativeLeaseResource].map(lcr =>
Some(toLeaseResource(lcr)))
+ case StatusCodes.Conflict =>
+ log.debug("creation of lease resource failed as already exists. Will
attempt to read again")
+ entity.discardBytes()
+ // someone else has created it
+ Future.successful(None)
+ case StatusCodes.Unauthorized =>
+ handleUnauthorized(response)
+ case unexpected =>
+ responseEntity
+ .toStrict(settings.bodyReadTimeout)
+ .flatMap(e => Unmarshal(e).to[String])
+ .flatMap(body => {
+ Future.failed(
+ new LeaseException(
+ s"Unexpected response from API server when creating Lease
StatusCode: $unexpected. Body: $body"))
+ })
+ }
+ } yield lr
+ }
+
+ private def currentTimeRFC3339: String = {
+ RFC3339MICRO_FORMATTER.withZone(ZoneId.of("UTC")).format(Instant.now())
+ }
+
+ private def toLeaseResource(lcr: NativeLeaseResource) = {
+ log.debug("Converting {}", lcr)
+ require(
+ lcr.metadata.resourceVersion.isDefined,
+ s"LeaseCustomResource returned from Kubernetes without a
resourceVersion: $lcr")
+ val owner = lcr.spec.holderIdentity match {
+ case null | "" => None
+ case other => Some(other)
+ }
+ LeaseResource(owner, lcr.metadata.resourceVersion.get,
+ LocalDateTime.parse(lcr.spec.acquireTime, RFC3339MICRO_FORMATTER)
+ .atZone(ZoneId.of("UTC"))
+ .toInstant
+ .toEpochMilli)
+ }
+
+}
diff --git
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
new file mode 100644
index 00000000..576e1cbd
--- /dev/null
+++
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes
+
+import java.time.temporal.TemporalAccessor
+import java.time.{ Instant, LocalDateTime, ZoneId }
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl
+import pekko.http.scaladsl.model.StatusCodes
+import pekko.testkit.TestKit
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock._
+import
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+
+class NativeKubernetesApiSpec
+ extends TestKit(
+ ActorSystem(
+ "NativeKubernetesApiSpec",
+ ConfigFactory.parseString("""pekko.coordination.lease.kubernetes {
+ | lease-operation-timeout = 10s
+ |}
+ |""".stripMargin)))
+ with ScalaFutures
+ with AnyWordSpecLike
+ with BeforeAndAfterAll
+ with Matchers
+ with BeforeAndAfterEach {
+
+ val wireMockServer = new WireMockServer(wireMockConfig().port(0))
+ wireMockServer.start()
+
+ val settings = new KubernetesSettings(
+ "",
+ "",
+ "localhost",
+ wireMockServer.port(),
+ namespace = Some("lease"),
+ "",
+ apiServerRequestTimeout = 1.second,
+ false)
+
+ WireMock.configureFor(settings.apiServerPort)
+
+ implicit val patience: PatienceConfig =
PatienceConfig(testKitSettings.DefaultTimeout.duration)
+
+ val underTest = new NativeKubernetesApiImpl(system, settings) {
+ // avoid touching slow CI filesystem
+ override protected def readConfigVarFromFilesystem(path: String, name:
String): Option[String] = None
+ }
+ val leaseName = "lease-1"
+ val client1 = "client-1"
+ val client2 = "client-2"
+
+ override protected def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ override protected def beforeEach(): Unit = {
+ wireMockServer.resetAll()
+ }
+
+ private def toRFC3339MicroString(t: TemporalAccessor): String =
+
NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER.withZone(ZoneId.of("UTC")).format(t)
+ private def fromRFC3339MicroString(s: String): Long =
+ LocalDateTime.parse(s,
NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER).atZone(
+ ZoneId.of("UTC")).toInstant.toEpochMilli
+
+ "Kubernetes native lease resource" should {
+ "be able to be created" in {
+ val version = "1234"
+ stubFor(
+
post(urlEqualTo("/apis/coordination.k8s.io/v1/namespaces/lease/leases/"))
+ .willReturn(aResponse().withStatus(201).withHeader("Content-Type",
"application/json").withBody(s"""
+ |{
+ | "apiVersion": "coordination.k8s.io/v1",
+ | "kind": "Lease",
+ | "metadata": {
+ | "name": "lease-1",
+ | "namespace": "pekko-lease-tests",
+ | "resourceVersion": "$version",
+ | "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+ | },
+ | "spec": {
+ | "holderIdentity": "",
+ | "acquireTime": "2024-05-03T13:55:17.655342Z"
+ | }
+ |}
+ """.stripMargin)))
+
+ underTest.removeLease(leaseName).futureValue shouldEqual Done
+ val leaseRecord =
underTest.readOrCreateLeaseResource(leaseName).futureValue
+ leaseRecord.owner shouldEqual None
+ leaseRecord.version shouldNot equal("")
+ leaseRecord.version shouldEqual version
+ }
+
+ "update a lease successfully" in {
+ val holderIdentity = "client1"
+ val lease = "lease-1"
+ val version = "2"
+ val updatedVersion = "3"
+ val timestamp = toRFC3339MicroString(Instant.now())
+ stubFor(
+
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+ .willReturn(aResponse().withStatus(200).withHeader("Content-Type",
"application/json").withBody(s"""
+ |{
+ | "apiVersion": "coordination.k8s.io/v1",
+ | "kind": "Lease",
+ | "metadata": {
+ | "name": "lease-1",
+ | "namespace": "pekko-lease-tests",
+ | "resourceVersion": "$updatedVersion",
+ | "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+ | },
+ | "spec": {
+ | "holderIdentity": "$holderIdentity",
+ | "acquireTime": "$timestamp"
+ | }
+ |}
+ """.stripMargin)))
+
+ val response =
+ underTest.updateLeaseResource(lease, holderIdentity, version,
fromRFC3339MicroString(timestamp)).futureValue
+ response shouldEqual Right(LeaseResource(Some(holderIdentity),
updatedVersion, fromRFC3339MicroString(timestamp)))
+ }
+
+ "update a lease conflict" in {
+ val owner = "client1"
+ val conflictedHolderIdentity = "client2"
+ val lease = "lease-1"
+ val version = "2"
+ val updatedVersion = "3"
+ val timestamp = toRFC3339MicroString(Instant.now())
+ // Conflict
+ stubFor(
+
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+ .willReturn(aResponse().withStatus(StatusCodes.Conflict.intValue)))
+
+ // Read to get version
+ stubFor(
+
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+
aResponse().withStatus(StatusCodes.OK.intValue).withHeader("Content-Type",
"application/json").withBody(s"""
+ |{
+ | "apiVersion": "coordination.k8s.io/v1",
+ | "kind": "Lease",
+ | "metadata": {
+ | "name": "lease-1",
+ | "namespace": "pekko-lease-tests",
+ | "resourceVersion": "$updatedVersion",
+ | "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+ | },
+ | "spec": {
+ | "holderIdentity": "$conflictedHolderIdentity",
+ | "acquireTime": "$timestamp"
+ | }
+ |}
+ """.stripMargin)))
+
+ val response = underTest.updateLeaseResource(lease, owner, version,
fromRFC3339MicroString(timestamp)).futureValue
+ response shouldEqual Left(LeaseResource(Some(conflictedHolderIdentity),
updatedVersion,
+ fromRFC3339MicroString(timestamp)))
+ }
+
+ "remove lease via DELETE" in {
+ val lease = "lease-1"
+ stubFor(
+
delete(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+ .willReturn(aResponse().withStatus(StatusCodes.OK.intValue)))
+
+ val response = underTest.removeLease(lease).futureValue
+ response shouldEqual Done
+ }
+
+ "timeout on readLease" in {
+ val owner = "client1"
+ val lease = "lease-1"
+ val version = "2"
+ val timestamp = toRFC3339MicroString(Instant.now())
+
+ stubFor(
+
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+ aResponse()
+ .withFixedDelay((settings.apiServerRequestTimeout *
2).toMillis.toInt) // Oh noes
+ .withStatus(StatusCodes.OK.intValue)
+ .withHeader("Content-Type", "application/json")
+ .withBody(s"""
+ |{
+ | "apiVersion": "coordination.k8s.io/v1",
+ | "kind": "Lease",
+ | "metadata": {
+ | "name": "lease-1",
+ | "namespace": "pekko-lease-tests",
+ | "resourceVersion": "$version",
+ | "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+ | },
+ | "spec": {
+ | "holderIdentity": "$owner",
+ | "acquireTime": $timestamp
+ | }
+ |}
+ """.stripMargin)))
+
+ underTest
+ .readOrCreateLeaseResource(lease)
+ .failed
+ .futureValue
+ .getMessage shouldEqual s"Timed out reading lease $lease. Is the API
server up?"
+ }
+
+ "timeout on create lease" in {
+ val lease = "lease-1"
+
+ stubFor(
+
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+ .willReturn(aResponse().withStatus(StatusCodes.NotFound.intValue)))
+
+ stubFor(
+
post(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/")).willReturn(
+ aResponse()
+ .withFixedDelay((settings.apiServerRequestTimeout *
2).toMillis.toInt) // Oh noes
+ .withStatus(StatusCodes.OK.intValue)
+ .withHeader("Content-Type", "application/json")))
+
+ underTest
+ .readOrCreateLeaseResource(lease)
+ .failed
+ .futureValue
+ .getMessage shouldEqual s"Timed out creating lease $lease. Is the API
server up?"
+ }
+
+ "timeout on updating lease" in {
+ val lease = "lease-1"
+ val owner = "client"
+ stubFor(
+
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+ aResponse()
+ .withFixedDelay((settings.apiServerRequestTimeout *
2).toMillis.toInt) // Oh noes
+ .withStatus(StatusCodes.OK.intValue)
+ .withHeader("Content-Type", "application/json")))
+
+ underTest.updateLeaseResource(lease, owner,
"1").failed.futureValue.getMessage shouldEqual
+ s"Timed out updating lease [$lease] to owner [$owner]. It is not known
if the update happened. Is the API server up?"
+ }
+
+ "timeout on remove lease " in {
+ val lease = "lease-1"
+ stubFor(
+
delete(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+ aResponse()
+ .withFixedDelay((settings.apiServerRequestTimeout *
2).toMillis.toInt) // Oh noes
+ .withStatus(StatusCodes.OK.intValue)))
+
+ underTest.removeLease(lease).failed.futureValue.getMessage shouldEqual
+ s"Timed out removing lease [$lease]. It is not known if the remove
happened. Is the API server up?"
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]