This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git


The following commit(s) were added to refs/heads/main by this push:
     new bb8dcdea #217 Support native kubernetes leases (#218)
bb8dcdea is described below

commit bb8dcdead22a37546c51295dc24982b2941be0c8
Author: Iilun <[email protected]>
AuthorDate: Tue May 14 12:28:39 2024 +0200

    #217 Support native kubernetes leases (#218)
    
    * #217 Support native kubernetes leases
    
    * * Reverting documentation header changes
    * Reverting rename of KubernetesApiImpl
    * Fixing license of new classes
    
    * * Reverting renaming of Json classes
    * Fixing wrong import style
    
    * Fixing remaining wrong import style
    
    * Fixing binary incompatibility
    
    * Fix binary incompatibilities
    
    * Create kubernetes-lease.backwards.excludes
    
    * Add mima incompatibilities to exclude
    
    ---------
    
    Co-authored-by: Iilun <[email protected]>
    Co-authored-by: PJ Fanning <[email protected]>
---
 docs/src/main/paradox/kubernetes-lease.md          |  31 ++-
 .../kubernetes-lease.backwards.excludes            |  22 ++
 ...esLease.scala => AbstractKubernetesLease.scala} |  22 +-
 .../lease/kubernetes/KubernetesLease.scala         | 104 +-------
 .../lease/kubernetes/KubernetesSettings.scala      |   2 +-
 .../lease/kubernetes/NativeKubernetesLease.scala   |  35 +++
 .../internal/AbstractKubernetesApiImpl.scala       | 191 ++++++++++++++
 .../kubernetes/internal/KubernetesApiImpl.scala    | 199 ++-------------
 .../internal/KubernetesJsonSupport.scala           |  18 ++
 .../internal/NativeKubernetesApiImpl.scala         | 196 ++++++++++++++
 .../lease/kubernetes/NativeKubernetesApiSpec.scala | 284 +++++++++++++++++++++
 11 files changed, 808 insertions(+), 296 deletions(-)

diff --git a/docs/src/main/paradox/kubernetes-lease.md 
b/docs/src/main/paradox/kubernetes-lease.md
index 6e9aa011..d0b8f413 100644
--- a/docs/src/main/paradox/kubernetes-lease.md
+++ b/docs/src/main/paradox/kubernetes-lease.md
@@ -7,10 +7,14 @@ The API, configuration and behavior may change based on 
feedback from initial us
 
 @@@
 
-This module is an implementation of a [Pekko Coordination 
Lease](https://pekko.apache.org/docs/pekko/current/coordination.html#lease) 
backed 
-by a [Custom Resource Definition 
(CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 in Kubernetes.
-Resources in Kubernetes offer [concurrency control and 
consistency](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 
-that have been used to build a distributed lease/lock.
+This module is an implementation of a [Pekko Coordination 
Lease](https://pekko.apache.org/docs/pekko/current/coordination.html#lease) in 
Kubernetes backed 
+by two implementations:
+
+* a [Custom Resource Definition 
(CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/).
 Resources in Kubernetes offer [concurrency control and 
consistency](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
+  that have been used to build a distributed lease/lock.
+* a native [Kubernetes Lease 
Object](https://kubernetes.io/docs/concepts/architecture/leases/).
+
+
 
 A lease can be used for:
 
@@ -44,7 +48,7 @@ different `ActorSystem` names because they all need a 
separate lease.
 
 #### Creating the Custom Resource Definition for the lease
 
-This requires admin privileges to your Kubernetes / Open Shift cluster but 
only needs doing once.
+For the CRD Implementation, a Custom Resource must be created. This requires 
admin privileges to your Kubernetes / Open Shift cluster but only needs doing 
once.
 
 Kubernetes:
 
@@ -56,6 +60,16 @@ Where lease.yml contains:
 
 @@snip[lease.yaml](/lease-kubernetes/lease.yml)
 
+#### Enable the native implementation
+
+To enable the native implementation, the lease class must be changed in the 
configuration to the following value:
+
+```
+pekko.coordination.lease.kubernetes { 
+  lease-class = 
"org.apache.pekko.coordination.lease.kubernetes.NativeKubernetesLease"
+}
+```
+
 #### Role based access control
 
 Each pod needs permission to read/create and update lease resources. They only 
need access
@@ -69,9 +83,14 @@ apiVersion: rbac.authorization.k8s.io/v1
 metadata:
   name: lease-access
 rules:
+  # Using CRD implementation
   - apiGroups: ["pekko.apache.org"]
     resources: ["leases"]
     verbs: ["get", "create", "update", "list"]
+  # Using native implementation
+  - apiGroups: ["coordination.k8s.io"]
+    resources: ["leases"]
+    verbs: ["get", "create", "update", "list"]
 ---
 kind: RoleBinding
 apiVersion: rbac.authorization.k8s.io/v1
@@ -79,7 +98,7 @@ metadata:
   name: lease-access
 subjects:
   - kind: User
-    name: system:serviceaccount:<YOUR NAMSPACE>:default
+    name: system:serviceaccount:<YOUR NAMESPACE>:default
 roleRef:
   kind: Role
   name: lease-access
diff --git 
a/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
 
b/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
new file mode 100644
index 00000000..5e27005a
--- /dev/null
+++ 
b/lease-kubernetes/src/main/mima-filters/1.1.x.backwards.excludes/kubernetes-lease.backwards.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# The new Kubernetes Native Lease adds new classes and fields 
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.org$apache$pekko$coordination$lease$kubernetes$internal$KubernetesJsonSupport$_setter_$nativeSpecFormat_=")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.org$apache$pekko$coordination$lease$kubernetes$internal$KubernetesJsonSupport$_setter_$leaseNativeResourceFormat_=")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.nativeSpecFormat")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.coordination.lease.kubernetes.internal.KubernetesJsonSupport.leaseNativeResourceFormat")
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
similarity index 86%
copy from 
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
copy to 
lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
index d3104c2d..d8a6a20d 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/AbstractKubernetesLease.scala
@@ -16,16 +16,13 @@ package org.apache.pekko.coordination.lease.kubernetes
 import java.text.Normalizer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.atomic.AtomicInteger
-
 import scala.concurrent.Future
 import scala.util.{ Failure, Success }
 import scala.annotation.nowarn
-
 import org.apache.pekko
+import 
pekko.coordination.lease.kubernetes.AbstractKubernetesLease.makeDNS1039Compatible
 import pekko.actor.ExtendedActorSystem
-import 
pekko.coordination.lease.kubernetes.KubernetesLease.makeDNS1039Compatible
 import pekko.coordination.lease.kubernetes.LeaseActor._
-import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
 import pekko.coordination.lease.scaladsl.Lease
 import pekko.coordination.lease.LeaseException
 import pekko.coordination.lease.LeaseSettings
@@ -36,7 +33,7 @@ import pekko.util.ConstantFun
 import pekko.util.Timeout
 import org.slf4j.LoggerFactory
 
-object KubernetesLease {
+object AbstractKubernetesLease {
   val configPath = "pekko.coordination.lease.kubernetes"
   private val leaseCounter = new AtomicInteger(1)
 
@@ -64,24 +61,23 @@ object KubernetesLease {
   }
 }
 
-class KubernetesLease private[pekko] (system: ExtendedActorSystem, leaseTaken: 
AtomicBoolean, settings: LeaseSettings)
-    extends Lease(settings) {
+abstract class AbstractKubernetesLease(system: ExtendedActorSystem, 
leaseTaken: AtomicBoolean,
+    settings: LeaseSettings) extends Lease(settings) {
 
   import pekko.pattern.ask
 
   private val logger = LoggerFactory.getLogger(classOf[KubernetesLease])
 
-  private val k8sSettings = KubernetesSettings(settings.leaseConfig, 
settings.timeoutSettings)
-  private val k8sApi = new KubernetesApiImpl(system, k8sSettings)
-  private implicit val timeout: Timeout = 
Timeout(settings.timeoutSettings.operationTimeout)
+  protected val k8sSettings: KubernetesSettings = 
KubernetesSettings(settings.leaseConfig, settings.timeoutSettings)
 
-  def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
-    this(system, new AtomicBoolean(false), leaseSettings)
+  protected def k8sApi: KubernetesApi
+
+  private implicit val timeout: Timeout = 
Timeout(settings.timeoutSettings.operationTimeout)
 
   private val leaseName = makeDNS1039Compatible(settings.leaseName)
   private val leaseActor = system.systemActorOf(
     LeaseActor.props(k8sApi, settings, leaseName, leaseTaken),
-    s"kubernetesLease${KubernetesLease.leaseCounter.incrementAndGet}")
+    s"kubernetesLease${AbstractKubernetesLease.leaseCounter.incrementAndGet}")
   if (leaseName != settings.leaseName) {
     logger.info(
       "Original lease name [{}] sanitized for kubernetes: [{}]",
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
index d3104c2d..9caf493f 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesLease.scala
@@ -13,115 +13,31 @@
 
 package org.apache.pekko.coordination.lease.kubernetes
 
-import java.text.Normalizer
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.Future
-import scala.util.{ Failure, Success }
-import scala.annotation.nowarn
-
 import org.apache.pekko
 import pekko.actor.ExtendedActorSystem
-import 
pekko.coordination.lease.kubernetes.KubernetesLease.makeDNS1039Compatible
-import pekko.coordination.lease.kubernetes.LeaseActor._
 import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
-import pekko.coordination.lease.scaladsl.Lease
-import pekko.coordination.lease.LeaseException
 import pekko.coordination.lease.LeaseSettings
-import pekko.coordination.lease.LeaseTimeoutException
-import pekko.dispatch.ExecutionContexts
-import pekko.pattern.AskTimeoutException
-import pekko.util.ConstantFun
-import pekko.util.Timeout
-import org.slf4j.LoggerFactory
-
-object KubernetesLease {
-  val configPath = "pekko.coordination.lease.kubernetes"
-  private val leaseCounter = new AtomicInteger(1)
-
-  /**
-   * Limit the length of a name to 63 characters.
-   * Some subsystem of Kubernetes cannot manage longer names.
-   */
-  private def truncateTo63Characters(name: String): String = name.take(63)
 
-  /**
-   * Removes from the leading and trailing positions the specified characters.
-   */
-  private def trim(name: String, characters: List[Char]): String =
-    
name.dropWhile(characters.contains(_)).reverse.dropWhile(characters.contains(_)).reverse
+import scala.concurrent.Future
 
-  /**
-   * Make a name compatible with DNS 1039 standard: like a single domain name 
segment.
-   * Regex to follow: [a-z]([-a-z0-9]*[a-z0-9])
-   * Limit the resulting name to 63 characters
-   */
-  private def makeDNS1039Compatible(name: String): String = {
-    val normalized =
-      Normalizer.normalize(name, 
Normalizer.Form.NFKD).toLowerCase.replaceAll("[_.]", 
"-").replaceAll("[^-a-z0-9]", "")
-    trim(truncateTo63Characters(normalized), List('-'))
-  }
+object KubernetesLease {
+  val configPath: String = AbstractKubernetesLease.configPath
 }
 
-class KubernetesLease private[pekko] (system: ExtendedActorSystem, leaseTaken: 
AtomicBoolean, settings: LeaseSettings)
-    extends Lease(settings) {
-
-  import pekko.pattern.ask
+class KubernetesLease(system: ExtendedActorSystem, leaseTaken: AtomicBoolean, 
settings: LeaseSettings)
+    extends AbstractKubernetesLease(system, leaseTaken, settings) {
 
-  private val logger = LoggerFactory.getLogger(classOf[KubernetesLease])
-
-  private val k8sSettings = KubernetesSettings(settings.leaseConfig, 
settings.timeoutSettings)
-  private val k8sApi = new KubernetesApiImpl(system, k8sSettings)
-  private implicit val timeout: Timeout = 
Timeout(settings.timeoutSettings.operationTimeout)
+  override def k8sApi = new KubernetesApiImpl(system, k8sSettings)
 
   def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
     this(system, new AtomicBoolean(false), leaseSettings)
 
-  private val leaseName = makeDNS1039Compatible(settings.leaseName)
-  private val leaseActor = system.systemActorOf(
-    LeaseActor.props(k8sApi, settings, leaseName, leaseTaken),
-    s"kubernetesLease${KubernetesLease.leaseCounter.incrementAndGet}")
-  if (leaseName != settings.leaseName) {
-    logger.info(
-      "Original lease name [{}] sanitized for kubernetes: [{}]",
-      Array[Object](settings.leaseName, leaseName): _*)
-  }
-  logger.debug(
-    "Starting kubernetes lease actor [{}] for lease [{}], owner [{}]",
-    leaseActor,
-    leaseName,
-    settings.ownerName)
-
-  override def checkLease(): Boolean = leaseTaken.get()
+  override def checkLease(): Boolean = super.checkLease()
 
-  @nowarn("msg=match may not be exhaustive")
-  override def release(): Future[Boolean] = {
-    (leaseActor ? Release())
-      .transform {
-        case Success(LeaseReleased)       => Success(true)
-        case Success(InvalidRequest(msg)) => Failure(new LeaseException(msg))
-        case Failure(_: AskTimeoutException) => Failure(
-            new LeaseTimeoutException(
-              s"Timed out trying to release lease [$leaseName, 
${settings.ownerName}]. It may still be taken."))
-        case Failure(exception) => Failure(exception)
-      }(ExecutionContexts.parasitic)
-  }
+  override def release(): Future[Boolean] = super.release()
 
-  override def acquire(): Future[Boolean] = {
-    acquire(ConstantFun.scalaAnyToUnit)
+  override def acquire(): Future[Boolean] = super.acquire()
 
-  }
-  @nowarn("msg=match may not be exhaustive")
-  override def acquire(leaseLostCallback: Option[Throwable] => Unit): 
Future[Boolean] = {
-    (leaseActor ? Acquire(leaseLostCallback))
-      .transform {
-        case Success(LeaseAcquired)       => Success(true)
-        case Success(LeaseTaken)          => Success(false)
-        case Success(InvalidRequest(msg)) => Failure(new LeaseException(msg))
-        case Failure(_: AskTimeoutException) => Failure(new 
LeaseTimeoutException(
-            s"Timed out trying to acquire lease [$leaseName, 
${settings.ownerName}]. It may still be taken."))
-        case Failure(exception) => Failure(exception)
-      }(ExecutionContexts.parasitic)
-  }
+  override def acquire(leaseLostCallback: Option[Throwable] => Unit): 
Future[Boolean] = super.acquire(leaseLostCallback)
 }
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
index 41828223..ffa1d3ac 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
@@ -39,7 +39,7 @@ private[pekko] object KubernetesSettings {
   }
 
   def apply(system: ActorSystem, leaseTimeoutSettings: TimeoutSettings): 
KubernetesSettings = {
-    apply(system.settings.config.getConfig(KubernetesLease.configPath), 
leaseTimeoutSettings)
+    
apply(system.settings.config.getConfig(AbstractKubernetesLease.configPath), 
leaseTimeoutSettings)
   }
   def apply(config: Config, leaseTimeoutSettings: TimeoutSettings): 
KubernetesSettings = {
 
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
new file mode 100644
index 00000000..3b82fb52
--- /dev/null
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesLease.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes
+
+import org.apache.pekko
+import pekko.actor.ExtendedActorSystem
+import pekko.coordination.lease.LeaseSettings
+import pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+class NativeKubernetesLease private[pekko] (system: ExtendedActorSystem, 
leaseTaken: AtomicBoolean,
+    settings: LeaseSettings)
+    extends AbstractKubernetesLease(system, leaseTaken, settings) {
+
+  override def k8sApi = new NativeKubernetesApiImpl(system, k8sSettings)
+
+  def this(leaseSettings: LeaseSettings, system: ExtendedActorSystem) =
+    this(system, new AtomicBoolean(false), leaseSettings)
+}
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
new file mode 100644
index 00000000..2f1af141
--- /dev/null
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes.internal
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.coordination.lease.kubernetes.{ KubernetesApi, 
KubernetesSettings, LeaseResource }
+import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
+import pekko.event.{ LogSource, Logging, LoggingAdapter }
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import pekko.http.scaladsl.{ ConnectionContext, Http, HttpExt, 
HttpsConnectionContext }
+import pekko.pattern.after
+import pekko.pki.kubernetes.PemManagersProvider
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{ Files, Paths }
+import java.security.{ KeyStore, SecureRandom }
+import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager 
}
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+/**
+ * Could be shared between leases: 
https://github.com/akka/akka-management/issues/680
+ * INTERNAL API
+ */
+@InternalApi private[pekko] abstract class AbstractKubernetesApiImpl(system: 
ActorSystem, settings: KubernetesSettings)
+    extends KubernetesApi
+    with KubernetesJsonSupport {
+
+  import system.dispatcher
+
+  protected implicit val sys: ActorSystem = system
+  protected val log: LoggingAdapter = Logging(system, 
getClass)(LogSource.fromClass)
+  private val http: HttpExt = Http()(system)
+
+  private lazy val sslContext: SSLContext = {
+    val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
+    val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
+    val keyStore = KeyStore.getInstance("PKCS12")
+    keyStore.load(null)
+    factory.init(keyStore, Array.empty)
+    val km: Array[KeyManager] = factory.getKeyManagers
+    val tm: Array[TrustManager] =
+      PemManagersProvider.buildTrustManagers(certificates)
+    val random: SecureRandom = new SecureRandom
+    val sslContext = SSLContext.getInstance("TLSv1.2")
+    sslContext.init(km, tm, random)
+    sslContext
+  }
+
+  private lazy val clientSslContext: HttpsConnectionContext = 
ConnectionContext.httpsClient(sslContext)
+
+  protected val namespace: String =
+    
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath, 
"namespace")).getOrElse("default")
+
+  protected val scheme: String = if (settings.secure) "https" else "http"
+  private lazy val apiToken = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
+  private lazy val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+
+  log.debug("kubernetes access namespace: {}. Secure: {}", namespace, 
settings.secure)
+
+  protected def createLeaseResource(name: String): 
Future[Option[LeaseResource]]
+
+  protected def getLeaseResource(name: String): Future[Option[LeaseResource]]
+
+  protected def pathForLease(name: String): Uri.Path
+
+  override def readOrCreateLeaseResource(name: String): Future[LeaseResource] 
= {
+    // TODO backoff retry
+    val maxTries = 5
+
+    def loop(tries: Int = 0): Future[LeaseResource] = {
+      log.debug("Trying to create lease {}", tries)
+      for {
+        olr <- getLeaseResource(name)
+        lr <- olr match {
+          case Some(found) =>
+            log.debug("{} already exists. Returning {}", name, found)
+            Future.successful(found)
+          case None =>
+            log.info("lease {} does not exist, creating", name)
+            createLeaseResource(name).flatMap {
+              case Some(created) => Future.successful(created)
+              case None =>
+                if (tries < maxTries) loop(tries + 1)
+                else Future.failed(new LeaseException(s"Unable to create or 
read lease after $maxTries tries"))
+            }
+        }
+      } yield lr
+    }
+
+    loop()
+  }
+
+  private[pekko] def removeLease(name: String): Future[Done] = {
+    for {
+      response <- makeRequest(
+        requestForPath(pathForLease(name), HttpMethods.DELETE),
+        s"Timed out removing lease [$name]. It is not known if the remove 
happened")
+
+      result <- response.status match {
+        case StatusCodes.OK =>
+          log.debug("Lease deleted {}", name)
+          response.discardEntityBytes()
+          Future.successful(Done)
+        case StatusCodes.NotFound =>
+          log.debug("Lease already deleted {}", name)
+          response.discardEntityBytes()
+          Future.successful(Done) // already deleted
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .flatMap(body => {
+              Future.failed(
+                new LeaseException(s"Unexpected status code when deleting 
lease. Status: $unexpected. Body: $body"))
+            })
+      }
+    } yield result
+  }
+
+  protected def handleUnauthorized(response: HttpResponse): Future[Nothing] = {
+    Unmarshal(response.entity)
+      .to[String]
+      .flatMap(body => {
+        Future.failed(new LeaseException(
+          s"Unauthorized to communicate with Kubernetes API server. See 
https://pekko.apache.org/docs/pekko-management/current/kubernetes-lease.html#role-based-access-control
 for setting up access control. Body: $body"))
+      })
+  }
+
+  protected def requestForPath(
+      path: Uri.Path,
+      method: HttpMethod = HttpMethods.GET,
+      entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+    val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port = 
settings.apiServerPort).withPath(path)
+    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+  }
+
+  protected def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
+    val response =
+      if (settings.secure)
+        http.singleRequest(request, clientSslContext)
+      else
+        http.singleRequest(request)
+
+    // make sure we always consume response body (in case of timeout)
+    val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
+
+    val timeout = after(settings.apiServerRequestTimeout, using = 
system.scheduler)(
+      Future.failed(new LeaseTimeoutException(s"$timeoutMsg. Is the API server 
up?")))
+
+    Future.firstCompletedOf(Seq(strictResponse, timeout))
+  }
+
+  /**
+   * This uses blocking IO, and so should only be used to read configuration 
at startup.
+   */
+  protected def readConfigVarFromFilesystem(path: String, name: String): 
Option[String] = {
+    val file = Paths.get(path)
+    if (Files.exists(file)) {
+      try {
+        Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
+      } catch {
+        case NonFatal(e) =>
+          log.error(e, "Error reading {} from {}", name, path)
+          None
+      }
+    } else {
+      log.warning("Unable to read {} from {} because it doesn't exist.", name, 
path)
+      None
+    }
+  }
+
+}
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
index 4b559ca4..d3abbf6d 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
@@ -13,110 +13,26 @@
 
 package org.apache.pekko.coordination.lease.kubernetes.internal
 
-import java.nio.charset.StandardCharsets
-import java.nio.file.{ Files, Paths }
-import java.security.{ KeyStore, SecureRandom }
-
-import scala.collection.immutable
 import scala.concurrent.Future
-import scala.util.control.NonFatal
 
 import org.apache.pekko
-import pekko.Done
 import pekko.actor.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
-import pekko.coordination.lease.kubernetes.{ KubernetesApi, 
KubernetesSettings, LeaseResource }
-import pekko.event.{ LogSource, Logging }
-import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext }
+import pekko.coordination.lease.LeaseException
+import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource 
}
 import pekko.http.scaladsl.marshalling.Marshal
 import pekko.http.scaladsl.model._
-import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
 import pekko.http.scaladsl.unmarshalling.Unmarshal
-import pekko.pattern.after
-import pekko.pki.kubernetes.PemManagersProvider
-
-import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager 
}
 
 /**
  * Could be shared between leases: 
https://github.com/akka/akka-management/issues/680
  * INTERNAL API
  */
 @InternalApi private[pekko] class KubernetesApiImpl(system: ActorSystem, 
settings: KubernetesSettings)
-    extends KubernetesApi
-    with KubernetesJsonSupport {
+    extends AbstractKubernetesApiImpl(system, settings) {
 
   import system.dispatcher
 
-  private implicit val sys: ActorSystem = system
-  private val log = Logging(system, getClass)(LogSource.fromClass)
-  private val http = Http()(system)
-
-  private lazy val sslContext = {
-    val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
-    val factory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
-    val keyStore = KeyStore.getInstance("PKCS12")
-    keyStore.load(null)
-    factory.init(keyStore, Array.empty)
-    val km: Array[KeyManager] = factory.getKeyManagers
-    val tm: Array[TrustManager] =
-      PemManagersProvider.buildTrustManagers(certificates)
-    val random: SecureRandom = new SecureRandom
-    val sslContext = SSLContext.getInstance("TLSv1.2")
-    sslContext.init(km, tm, random)
-    sslContext
-  }
-
-  private lazy val clientSslContext: HttpsConnectionContext = 
ConnectionContext.httpsClient(sslContext)
-
-  private val namespace =
-    
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath, 
"namespace")).getOrElse("default")
-
-  private val scheme = if (settings.secure) "https" else "http"
-  private lazy val apiToken = 
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
-  private lazy val headers = if (settings.secure) 
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
-
-  log.debug("kubernetes access namespace: {}. Secure: {}", namespace, 
settings.secure)
-
-  /*
-  PATH: to get all: /apis/pekko.apache.org/v1/namespaces/<namespace>/leases
-  PATH: to get a specific one: 
/apis/pekko.apache.org/v1/namespaces/<namespace>/leases/<lease-name>
-  curl -v -X POST 
localhost:8080/apis/pekko.apache.org/v1/namespaces/lease/leases/ -H 
"Content-Type: application/yaml" --data-binary "@lease-example.yml"
-
-  responds with either:
-  409 Conflict Already Exists
-
-  OR
-
-  201 Created if it works
-   */
-  override def readOrCreateLeaseResource(name: String): Future[LeaseResource] 
= {
-    // TODO backoff retry
-    val maxTries = 5
-
-    def loop(tries: Int = 0): Future[LeaseResource] = {
-      log.debug("Trying to create lease {}", tries)
-      for {
-        olr <- getLeaseResource(name)
-        lr <- olr match {
-          case Some(found) =>
-            log.debug("{} already exists. Returning {}", name, found)
-            Future.successful(found)
-          case None =>
-            log.info("lease {} does not exist, creating", name)
-            createLeaseResource(name).flatMap {
-              case Some(created) => Future.successful(created)
-              case None =>
-                if (tries < maxTries) loop(tries + 1)
-                else Future.failed(new LeaseException(s"Unable to create or 
read lease after $maxTries tries"))
-            }
-        }
-      } yield lr
-    }
-
-    loop()
-  }
-
   /*
 curl -v -X PUT 
localhost:8080/apis/pekko.apache.org/v1/namespaces/lease/leases/sbr-lease 
--data-binary "@sbr-lease.yml" -H "Content-Type: application/yaml"
 PUTs must contain resourceVersions. Response:
@@ -130,9 +46,9 @@ PUTs must contain resourceVersions. Response:
    *
    * Can return one of three things:
    *  - Future.Failure, e.g. timed out waiting for k8s api server to respond
-   *  - Future.sucess[Left(resource)]: the update failed due to version not 
matching current in the k8s api server.
+   *  - Future.success[Left(resource)]: the update failed due to version not 
matching current in the k8s api server.
    *    In this case the current resource is returned so the version can be 
used for subsequent calls
-   *  - Future.sucess[Right(resource)]: Returns the LeaseResource that 
contains the clientName and new version.
+   *  - Future.success[Right(resource)]: Returns the LeaseResource that 
contains the clientName and new version.
    *    The new version should be used for any subsequent calls
    */
   override def updateLeaseResource(
@@ -180,35 +96,7 @@ PUTs must contain resourceVersions. Response:
     } yield result
   }
 
-  private[pekko] def removeLease(name: String): Future[Done] = {
-    for {
-      response <- makeRequest(
-        requestForPath(pathForLease(name), HttpMethods.DELETE),
-        s"Timed out removing lease [$name]. It is not known if the remove 
happened")
-
-      result <- response.status match {
-        case StatusCodes.OK =>
-          log.debug("Lease deleted {}", name)
-          response.discardEntityBytes()
-          Future.successful(Done)
-        case StatusCodes.NotFound =>
-          log.debug("Lease already deleted {}", name)
-          response.discardEntityBytes()
-          Future.successful(Done) // already deleted
-        case StatusCodes.Unauthorized =>
-          handleUnauthorized(response)
-        case unexpected =>
-          Unmarshal(response.entity)
-            .to[String]
-            .flatMap(body => {
-              Future.failed(
-                new LeaseException(s"Unexpected status code when deleting 
lease. Status: $unexpected. Body: $body"))
-            })
-      }
-    } yield result
-  }
-
-  private def getLeaseResource(name: String): Future[Option[LeaseResource]] = {
+  override def getLeaseResource(name: String): Future[Option[LeaseResource]] = 
{
     val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed 
out reading lease $name")
     for {
       response <- fResponse
@@ -239,57 +127,12 @@ PUTs must contain resourceVersions. Response:
     } yield lr
   }
 
-  private def handleUnauthorized(response: HttpResponse) = {
-    Unmarshal(response.entity)
-      .to[String]
-      .flatMap(body => {
-        Future.failed(new LeaseException(
-          s"Unauthorized to communicate with Kubernetes API server. See 
https://pekko.apache.org/docs/pekko-management/current/kubernetes-lease.html#role-based-access-control
 for setting up access control. Body: $body"))
-      })
-  }
-
-  private def pathForLease(name: String): Uri.Path =
+  override def pathForLease(name: String): Uri.Path =
     Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" / 
namespace / "leases" / name
       .replaceAll("[^\\d\\w\\-\\.]", "")
       .toLowerCase
 
-  private def requestForPath(
-      path: Uri.Path,
-      method: HttpMethod = HttpMethods.GET,
-      entity: RequestEntity = HttpEntity.Empty) = {
-    val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port = 
settings.apiServerPort).withPath(path)
-    HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
-  }
-
-  private def makeRequest(request: HttpRequest, timeoutMsg: String): 
Future[HttpResponse] = {
-    val response =
-      if (settings.secure)
-        http.singleRequest(request, clientSslContext)
-      else
-        http.singleRequest(request)
-
-    // make sure we always consume response body (in case of timeout)
-    val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
-
-    val timeout = after(settings.apiServerRequestTimeout, using = 
system.scheduler)(
-      Future.failed(new LeaseTimeoutException(s"$timeoutMsg. Is the API server 
up?")))
-
-    Future.firstCompletedOf(Seq(strictResponse, timeout))
-  }
-
-  private def toLeaseResource(lcr: LeaseCustomResource) = {
-    log.debug("Converting {}", lcr)
-    require(
-      lcr.metadata.resourceVersion.isDefined,
-      s"LeaseCustomResource returned from Kubernetes without a 
resourceVersion: $lcr")
-    val owner = lcr.spec.owner match {
-      case null | "" => None
-      case other     => Some(other)
-    }
-    LeaseResource(owner, lcr.metadata.resourceVersion.get, lcr.spec.time)
-  }
-
-  private def createLeaseResource(name: String): Future[Option[LeaseResource]] 
= {
+  override def createLeaseResource(name: String): 
Future[Option[LeaseResource]] = {
     val lcr = LeaseCustomResource(Metadata(name, None), Spec("", 
System.currentTimeMillis()))
     for {
       entity <- Marshal(lcr).to[RequestEntity]
@@ -321,23 +164,15 @@ PUTs must contain resourceVersions. Response:
     } yield lr
   }
 
-  /**
-   * This uses blocking IO, and so should only be used to read configuration 
at startup.
-   */
-  protected def readConfigVarFromFilesystem(path: String, name: String): 
Option[String] = {
-    val file = Paths.get(path)
-    if (Files.exists(file)) {
-      try {
-        Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
-      } catch {
-        case NonFatal(e) =>
-          log.error(e, "Error reading {} from {}", name, path)
-          None
-      }
-    } else {
-      log.warning("Unable to read {} from {} because it doesn't exist.", name, 
path)
-      None
+  private def toLeaseResource(lcr: LeaseCustomResource) = {
+    log.debug("Converting {}", lcr)
+    require(
+      lcr.metadata.resourceVersion.isDefined,
+      s"LeaseCustomResource returned from Kubernetes without a 
resourceVersion: $lcr")
+    val owner = lcr.spec.owner match {
+      case null | "" => None
+      case other     => Some(other)
     }
+    LeaseResource(owner, lcr.metadata.resourceVersion.get, lcr.spec.time)
   }
-
 }
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
index 174df901..d50bd7b2 100644
--- 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesJsonSupport.scala
@@ -40,6 +40,22 @@ case class Metadata(name: String, resourceVersion: 
Option[String])
 @InternalApi
 case class Spec(owner: String, time: Long)
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class NativeLeaseResource(
+    metadata: Metadata,
+    spec: NativeSpec,
+    kind: String = "Lease",
+    apiVersion: String = "coordination.k8s.io/v1")
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+case class NativeSpec(holderIdentity: String, acquireTime: String)
+
 /**
  * INTERNAL API
  */
@@ -47,5 +63,7 @@ case class Spec(owner: String, time: Long)
 trait KubernetesJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
   implicit val metadataFormat: JsonFormat[Metadata] = 
jsonFormat2(Metadata.apply)
   implicit val specFormat: JsonFormat[Spec] = jsonFormat2(Spec.apply)
+  implicit val nativeSpecFormat: JsonFormat[NativeSpec] = 
jsonFormat2(NativeSpec.apply)
   implicit val leaseCustomResourceFormat: RootJsonFormat[LeaseCustomResource] 
= jsonFormat4(LeaseCustomResource.apply)
+  implicit val leaseNativeResourceFormat: RootJsonFormat[NativeLeaseResource] 
= jsonFormat4(NativeLeaseResource.apply)
 }
diff --git 
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
new file mode 100644
index 00000000..6fb5eb07
--- /dev/null
+++ 
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes.internal
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.annotation.InternalApi
+import 
pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER
+import pekko.coordination.lease.kubernetes.{ KubernetesSettings, LeaseResource 
}
+import pekko.coordination.lease.LeaseException
+import pekko.http.scaladsl.marshalling.Marshal
+import pekko.http.scaladsl.model._
+import pekko.http.scaladsl.unmarshalling.Unmarshal
+import java.time.{ Instant, LocalDateTime, ZoneId }
+import java.time.format.{ DateTimeFormatter, DateTimeFormatterBuilder }
+import java.time.temporal.ChronoField
+import scala.concurrent.Future
+
+object NativeKubernetesApiImpl {
+  // From 
https://github.com/kubernetes-client/java/blob/e50fb2a6f30d4f07e3922430307e5e09058aaea1/kubernetes/src/main/java/io/kubernetes/client/openapi/JSON.java#L57
+  val RFC3339MICRO_FORMATTER: DateTimeFormatter =
+    new DateTimeFormatterBuilder().parseDefaulting(ChronoField.OFFSET_SECONDS,
+      
0).append(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")).optionalStart.appendFraction(
+      ChronoField.NANO_OF_SECOND, 6, 6, 
true).optionalEnd.appendLiteral("Z").toFormatter
+}
+
+/**
+ * Could be shared between leases: 
https://github.com/akka/akka-management/issues/680
+ * INTERNAL API
+ */
+@InternalApi private[pekko] class NativeKubernetesApiImpl(system: ActorSystem, 
settings: KubernetesSettings)
+    extends AbstractKubernetesApiImpl(system, settings) {
+
+  import system.dispatcher
+
+  /**
+   * Update the named resource.
+   *
+   * Must [[readOrCreateLeaseResource]] to first to get a resource version.
+   *
+   * Can return one of three things:
+   *  - Future.Failure, e.g. timed out waiting for k8s api server to respond
+   *  - Future.sucess[Left(resource)]: the update failed due to version not 
matching current in the k8s api server.
+   *    In this case the current resource is returned so the version can be 
used for subsequent calls
+   *  - Future.sucess[Right(resource)]: Returns the LeaseResource that 
contains the clientName and new version.
+   *    The new version should be used for any subsequent calls
+   */
+  override def updateLeaseResource(
+      leaseName: String,
+      ownerName: String,
+      version: String,
+      time: Long = System.currentTimeMillis()): Future[Either[LeaseResource, 
LeaseResource]] = {
+    val lcr = NativeLeaseResource(Metadata(leaseName, Some(version)), 
NativeSpec(ownerName, currentTimeRFC3339))
+    for {
+      entity <- Marshal(lcr).to[RequestEntity]
+      response <- {
+        log.debug("updating {} to {}", leaseName, lcr)
+        makeRequest(
+          requestForPath(pathForLease(leaseName), method = HttpMethods.PUT, 
entity),
+          s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is 
not known if the update happened")
+      }
+      result <- response.status match {
+        case StatusCodes.OK =>
+          Unmarshal(response.entity)
+            .to[NativeLeaseResource]
+            .map(updatedLcr => {
+              log.debug("LCR after update: {}", updatedLcr)
+              Right(toLeaseResource(updatedLcr))
+            })
+        case StatusCodes.Conflict =>
+          getLeaseResource(leaseName).flatMap {
+            case None =>
+              Future.failed(
+                new LeaseException(s"GET after PUT conflict did not return a 
lease. Lease[$leaseName-$ownerName]"))
+            case Some(lr) =>
+              log.debug("LeaseResource read after conflict: {}", lr)
+              Future.successful(Left(lr))
+          }
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .flatMap(body => {
+              Future.failed(
+                new LeaseException(
+                  s"PUT for lease $leaseName returned unexpected status code 
$unexpected. Body: $body"))
+            })
+      }
+    } yield result
+  }
+
+  override def getLeaseResource(name: String): Future[Option[LeaseResource]] = 
{
+    val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed 
out reading lease $name")
+    for {
+      response <- fResponse
+      entity <- response.entity.toStrict(settings.bodyReadTimeout)
+      lr <- response.status match {
+        case StatusCodes.OK =>
+          // it exists, parse it
+          log.debug("Resource {} exists: {}", name, entity)
+          Unmarshal(entity)
+            .to[NativeLeaseResource]
+            .map(lcr => {
+              Some(toLeaseResource(lcr))
+            })
+        case StatusCodes.NotFound =>
+          response.discardEntityBytes()
+          log.debug("Resource does not exist: {}", name)
+          Future.successful(None)
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          Unmarshal(response.entity)
+            .to[String]
+            .flatMap(body => {
+              Future.failed(new LeaseException(
+                s"Unexpected response from API server when retrieving lease 
StatusCode: $unexpected. Body: $body"))
+            })
+      }
+    } yield lr
+  }
+
+  override def pathForLease(name: String): Uri.Path =
+    Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" / 
namespace / "leases" / name
+      .replaceAll("[^\\d\\w\\-\\.]", "")
+      .toLowerCase
+
+  override def createLeaseResource(name: String): 
Future[Option[LeaseResource]] = {
+    val lcr = NativeLeaseResource(Metadata(name, None), NativeSpec("", 
currentTimeRFC3339))
+    for {
+      entity <- Marshal(lcr).to[RequestEntity]
+      response <- makeRequest(
+        requestForPath(pathForLease(""), HttpMethods.POST, entity = entity),
+        s"Timed out creating lease $name")
+      responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
+      lr <- response.status match {
+        case StatusCodes.Created =>
+          log.debug("lease resource created")
+          Unmarshal(responseEntity).to[NativeLeaseResource].map(lcr => 
Some(toLeaseResource(lcr)))
+        case StatusCodes.Conflict =>
+          log.debug("creation of lease resource failed as already exists. Will 
attempt to read again")
+          entity.discardBytes()
+          // someone else has created it
+          Future.successful(None)
+        case StatusCodes.Unauthorized =>
+          handleUnauthorized(response)
+        case unexpected =>
+          responseEntity
+            .toStrict(settings.bodyReadTimeout)
+            .flatMap(e => Unmarshal(e).to[String])
+            .flatMap(body => {
+              Future.failed(
+                new LeaseException(
+                  s"Unexpected response from API server when creating Lease 
StatusCode: $unexpected. Body: $body"))
+            })
+      }
+    } yield lr
+  }
+
+  private def currentTimeRFC3339: String = {
+    RFC3339MICRO_FORMATTER.withZone(ZoneId.of("UTC")).format(Instant.now())
+  }
+
+  private def toLeaseResource(lcr: NativeLeaseResource) = {
+    log.debug("Converting {}", lcr)
+    require(
+      lcr.metadata.resourceVersion.isDefined,
+      s"LeaseCustomResource returned from Kubernetes without a 
resourceVersion: $lcr")
+    val owner = lcr.spec.holderIdentity match {
+      case null | "" => None
+      case other     => Some(other)
+    }
+    LeaseResource(owner, lcr.metadata.resourceVersion.get,
+      LocalDateTime.parse(lcr.spec.acquireTime, RFC3339MICRO_FORMATTER)
+        .atZone(ZoneId.of("UTC"))
+        .toInstant
+        .toEpochMilli)
+  }
+
+}
diff --git 
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
new file mode 100644
index 00000000..576e1cbd
--- /dev/null
+++ 
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.coordination.lease.kubernetes
+
+import java.time.temporal.TemporalAccessor
+import java.time.{ Instant, LocalDateTime, ZoneId }
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ActorSystem
+import pekko.coordination.lease.kubernetes.internal.NativeKubernetesApiImpl
+import pekko.http.scaladsl.model.StatusCodes
+import pekko.testkit.TestKit
+import com.github.tomakehurst.wiremock.WireMockServer
+import com.github.tomakehurst.wiremock.client.WireMock
+import com.github.tomakehurst.wiremock.client.WireMock._
+import 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+
+class NativeKubernetesApiSpec
+    extends TestKit(
+      ActorSystem(
+        "NativeKubernetesApiSpec",
+        ConfigFactory.parseString("""pekko.coordination.lease.kubernetes {
+        |    lease-operation-timeout = 10s
+        |}
+        |""".stripMargin)))
+    with ScalaFutures
+    with AnyWordSpecLike
+    with BeforeAndAfterAll
+    with Matchers
+    with BeforeAndAfterEach {
+
+  val wireMockServer = new WireMockServer(wireMockConfig().port(0))
+  wireMockServer.start()
+
+  val settings = new KubernetesSettings(
+    "",
+    "",
+    "localhost",
+    wireMockServer.port(),
+    namespace = Some("lease"),
+    "",
+    apiServerRequestTimeout = 1.second,
+    false)
+
+  WireMock.configureFor(settings.apiServerPort)
+
+  implicit val patience: PatienceConfig = 
PatienceConfig(testKitSettings.DefaultTimeout.duration)
+
+  val underTest = new NativeKubernetesApiImpl(system, settings) {
+    // avoid touching slow CI filesystem
+    override protected def readConfigVarFromFilesystem(path: String, name: 
String): Option[String] = None
+  }
+  val leaseName = "lease-1"
+  val client1 = "client-1"
+  val client2 = "client-2"
+
+  override protected def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  override protected def beforeEach(): Unit = {
+    wireMockServer.resetAll()
+  }
+
+  private def toRFC3339MicroString(t: TemporalAccessor): String =
+    
NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER.withZone(ZoneId.of("UTC")).format(t)
+  private def fromRFC3339MicroString(s: String): Long =
+    LocalDateTime.parse(s, 
NativeKubernetesApiImpl.RFC3339MICRO_FORMATTER).atZone(
+      ZoneId.of("UTC")).toInstant.toEpochMilli
+
+  "Kubernetes native lease resource" should {
+    "be able to be created" in {
+      val version = "1234"
+      stubFor(
+        
post(urlEqualTo("/apis/coordination.k8s.io/v1/namespaces/lease/leases/"))
+          .willReturn(aResponse().withStatus(201).withHeader("Content-Type", 
"application/json").withBody(s"""
+               |{
+               |    "apiVersion": "coordination.k8s.io/v1",
+               |    "kind": "Lease",
+               |    "metadata": {
+               |        "name": "lease-1",
+               |        "namespace": "pekko-lease-tests",
+               |        "resourceVersion": "$version",
+               |        "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+               |    },
+               |    "spec": {
+               |        "holderIdentity": "",
+               |        "acquireTime": "2024-05-03T13:55:17.655342Z"
+               |    }
+               |}
+            """.stripMargin)))
+
+      underTest.removeLease(leaseName).futureValue shouldEqual Done
+      val leaseRecord = 
underTest.readOrCreateLeaseResource(leaseName).futureValue
+      leaseRecord.owner shouldEqual None
+      leaseRecord.version shouldNot equal("")
+      leaseRecord.version shouldEqual version
+    }
+
+    "update a lease successfully" in {
+      val holderIdentity = "client1"
+      val lease = "lease-1"
+      val version = "2"
+      val updatedVersion = "3"
+      val timestamp = toRFC3339MicroString(Instant.now())
+      stubFor(
+        
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+          .willReturn(aResponse().withStatus(200).withHeader("Content-Type", 
"application/json").withBody(s"""
+               |{
+               |    "apiVersion": "coordination.k8s.io/v1",
+               |    "kind": "Lease",
+               |    "metadata": {
+               |        "name": "lease-1",
+               |        "namespace": "pekko-lease-tests",
+               |        "resourceVersion": "$updatedVersion",
+               |        "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+               |    },
+               |    "spec": {
+               |        "holderIdentity": "$holderIdentity",
+               |        "acquireTime": "$timestamp"
+               |    }
+               |}
+            """.stripMargin)))
+
+      val response =
+        underTest.updateLeaseResource(lease, holderIdentity, version, 
fromRFC3339MicroString(timestamp)).futureValue
+      response shouldEqual Right(LeaseResource(Some(holderIdentity), 
updatedVersion, fromRFC3339MicroString(timestamp)))
+    }
+
+    "update a lease conflict" in {
+      val owner = "client1"
+      val conflictedHolderIdentity = "client2"
+      val lease = "lease-1"
+      val version = "2"
+      val updatedVersion = "3"
+      val timestamp = toRFC3339MicroString(Instant.now())
+      // Conflict
+      stubFor(
+        
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+          .willReturn(aResponse().withStatus(StatusCodes.Conflict.intValue)))
+
+      // Read to get version
+      stubFor(
+        
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+          
aResponse().withStatus(StatusCodes.OK.intValue).withHeader("Content-Type", 
"application/json").withBody(s"""
+               |{
+               |    "apiVersion": "coordination.k8s.io/v1",
+               |    "kind": "Lease",
+               |    "metadata": {
+               |        "name": "lease-1",
+               |        "namespace": "pekko-lease-tests",
+               |        "resourceVersion": "$updatedVersion",
+               |        "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+               |    },
+               |    "spec": {
+               |        "holderIdentity": "$conflictedHolderIdentity",
+               |        "acquireTime": "$timestamp"
+               |    }
+               |}
+            """.stripMargin)))
+
+      val response = underTest.updateLeaseResource(lease, owner, version, 
fromRFC3339MicroString(timestamp)).futureValue
+      response shouldEqual Left(LeaseResource(Some(conflictedHolderIdentity), 
updatedVersion,
+        fromRFC3339MicroString(timestamp)))
+    }
+
+    "remove lease via DELETE" in {
+      val lease = "lease-1"
+      stubFor(
+        
delete(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+          .willReturn(aResponse().withStatus(StatusCodes.OK.intValue)))
+
+      val response = underTest.removeLease(lease).futureValue
+      response shouldEqual Done
+    }
+
+    "timeout on readLease" in {
+      val owner = "client1"
+      val lease = "lease-1"
+      val version = "2"
+      val timestamp = toRFC3339MicroString(Instant.now())
+
+      stubFor(
+        
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+          aResponse()
+            .withFixedDelay((settings.apiServerRequestTimeout * 
2).toMillis.toInt) // Oh noes
+            .withStatus(StatusCodes.OK.intValue)
+            .withHeader("Content-Type", "application/json")
+            .withBody(s"""
+               |{
+               |    "apiVersion": "coordination.k8s.io/v1",
+               |    "kind": "Lease",
+               |    "metadata": {
+               |        "name": "lease-1",
+               |        "namespace": "pekko-lease-tests",
+               |        "resourceVersion": "$version",
+               |        "uid": "c369949e-296c-11e9-9c62-16f8dd5735ba"
+               |    },
+               |    "spec": {
+               |        "holderIdentity": "$owner",
+               |        "acquireTime": $timestamp
+               |    }
+               |}
+            """.stripMargin)))
+
+      underTest
+        .readOrCreateLeaseResource(lease)
+        .failed
+        .futureValue
+        .getMessage shouldEqual s"Timed out reading lease $lease. Is the API 
server up?"
+    }
+
+    "timeout on create lease" in {
+      val lease = "lease-1"
+
+      stubFor(
+        
get(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease"))
+          .willReturn(aResponse().withStatus(StatusCodes.NotFound.intValue)))
+
+      stubFor(
+        
post(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/")).willReturn(
+          aResponse()
+            .withFixedDelay((settings.apiServerRequestTimeout * 
2).toMillis.toInt) // Oh noes
+            .withStatus(StatusCodes.OK.intValue)
+            .withHeader("Content-Type", "application/json")))
+
+      underTest
+        .readOrCreateLeaseResource(lease)
+        .failed
+        .futureValue
+        .getMessage shouldEqual s"Timed out creating lease $lease. Is the API 
server up?"
+    }
+
+    "timeout on updating lease" in {
+      val lease = "lease-1"
+      val owner = "client"
+      stubFor(
+        
put(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+          aResponse()
+            .withFixedDelay((settings.apiServerRequestTimeout * 
2).toMillis.toInt) // Oh noes
+            .withStatus(StatusCodes.OK.intValue)
+            .withHeader("Content-Type", "application/json")))
+
+      underTest.updateLeaseResource(lease, owner, 
"1").failed.futureValue.getMessage shouldEqual
+      s"Timed out updating lease [$lease] to owner [$owner]. It is not known 
if the update happened. Is the API server up?"
+    }
+
+    "timeout on remove lease " in {
+      val lease = "lease-1"
+      stubFor(
+        
delete(urlEqualTo(s"/apis/coordination.k8s.io/v1/namespaces/lease/leases/$lease")).willReturn(
+          aResponse()
+            .withFixedDelay((settings.apiServerRequestTimeout * 
2).toMillis.toInt) // Oh noes
+            .withStatus(StatusCodes.OK.intValue)))
+
+      underTest.removeLease(lease).failed.futureValue.getMessage shouldEqual
+      s"Timed out removing lease [$lease]. It is not known if the remove 
happened. Is the API server up?"
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to