This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/main by this push:
new 86f33e79 Get token on every request and make fs calls async
86f33e79 is described below
commit 86f33e79718afa84e73f5638a4a85a80de48c834
Author: Matthew de Detrich <[email protected]>
AuthorDate: Mon Nov 17 15:29:07 2025 +0100
Get token on every request and make fs calls async
---
lease-kubernetes/src/main/resources/reference.conf | 15 ++++
.../lease/kubernetes/KubernetesSettings.scala | 31 +++++++-
.../internal/AbstractKubernetesApiImpl.scala | 83 ++++++++++++-------
.../kubernetes/internal/KubernetesApiImpl.scala | 28 ++++---
.../internal/NativeKubernetesApiImpl.scala | 29 ++++---
.../lease/kubernetes/KubernetesApiSpec.scala | 93 +++++++++++++++++++++-
.../lease/kubernetes/NativeKubernetesApiSpec.scala | 4 +-
7 files changed, 226 insertions(+), 57 deletions(-)
diff --git a/lease-kubernetes/src/main/resources/reference.conf
b/lease-kubernetes/src/main/resources/reference.conf
index fb36f4ba..c1ee5f7a 100644
--- a/lease-kubernetes/src/main/resources/reference.conf
+++ b/lease-kubernetes/src/main/resources/reference.conf
@@ -52,4 +52,19 @@ pekko.coordination.lease.kubernetes {
# server that are required. If this timeout is hit then the lease *may* be
taken due to the response being lost
# on the way back from the API server but will be reported as not taken
and can be safely retried.
lease-operation-timeout = 5s
+
+ # Settings that are specific to retrying requests with 401 responses due
to possible token rotation
+ token-rotation-retry {
+ # Number of total attempts to make
+ max-attempts = 5
+
+ # The minimum backoff to be used
+ min-backoff = 10 ms
+
+ # The maximum backoff to be used
+ max-backoff = 1 minute
+
+ # The random factor to be used
+ random-factor = 0.3
+ }
}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
index 6782fe1e..94814cc7 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesSettings.scala
@@ -53,6 +53,15 @@ private[pekko] object KubernetesSettings {
apiServerRequestTimeout < leaseTimeoutSettings.operationTimeout,
"'api-server-request-timeout can not be less than
'lease-operation-timeout'")
+ val retryConfPath = "token-rotation-retry"
+
+ val tokenRetrySettings = new TokenRetrySettings(
+ config.getInt(s"$retryConfPath.max-attempts"),
+ config.getDuration(s"$retryConfPath.min-backoff").toScala,
+ config.getDuration(s"$retryConfPath.max-backoff").toScala,
+ config.getDouble(s"$retryConfPath.random-factor")
+ )
+
new KubernetesSettings(
config.getString("api-ca-path"),
config.getString("api-token-path"),
@@ -63,11 +72,21 @@ private[pekko] object KubernetesSettings {
apiServerRequestTimeout,
secure = config.getBoolean("secure-api-server"),
tlsVersion = config.getString("tls-version"),
- bodyReadTimeout = apiServerRequestTimeout / 2)
-
+ bodyReadTimeout = apiServerRequestTimeout / 2,
+ tokenRetrySettings = tokenRetrySettings)
}
}
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] class TokenRetrySettings(
+ val maxAttempts: Int,
+ val minBackoff: FiniteDuration,
+ val maxBackoff: FiniteDuration,
+ val randomFactor: Double)
+
/**
* INTERNAL API
*/
@@ -82,4 +101,10 @@ private[pekko] class KubernetesSettings(
val apiServerRequestTimeout: FiniteDuration,
val secure: Boolean = true,
val tlsVersion: String = "TLSv1.2",
- val bodyReadTimeout: FiniteDuration = 1.second)
+ val bodyReadTimeout: FiniteDuration = 1.second,
+ val tokenRetrySettings: TokenRetrySettings = new TokenRetrySettings(
+ 5,
+ 10.millis,
+ 1.minute,
+ 0.3
+ ))
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
index 99fa22dc..21e17d57 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/AbstractKubernetesApiImpl.scala
@@ -15,7 +15,7 @@ package
org.apache.pekko.coordination.lease.kubernetes.internal
import org.apache.pekko
import pekko.Done
-import pekko.actor.ActorSystem
+import pekko.actor.{ ActorSystem, Scheduler }
import pekko.annotation.InternalApi
import pekko.coordination.lease.kubernetes.{ KubernetesApi,
KubernetesSettings, LeaseResource }
import pekko.coordination.lease.{ LeaseException, LeaseTimeoutException }
@@ -24,15 +24,16 @@ import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import pekko.http.scaladsl.unmarshalling.Unmarshal
import pekko.http.scaladsl.{ ConnectionContext, Http, HttpExt,
HttpsConnectionContext }
-import pekko.pattern.after
+import pekko.pattern.{ after, RetrySupport }
import pekko.pki.kubernetes.PemManagersProvider
+import pekko.stream.scaladsl.{ FileIO, Keep, Sink }
+import pekko.util.ByteString
-import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }
import java.security.{ KeyStore, SecureRandom }
import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager
}
import scala.collection.immutable
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
/**
@@ -66,12 +67,24 @@ import scala.util.control.NonFatal
private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(sslContext)
- protected val namespace: String =
-
settings.namespace.orElse(readConfigVarFromFilesystem(settings.namespacePath,
"namespace")).getOrElse("default")
+ protected val namespace: Future[String] = {
+ settings.namespace match {
+ case Some(nSpace) => Future.successful(nSpace)
+ case _ =>
+ readConfigVarFromFilesystem(settings.namespacePath,
"namespace").map(_.getOrElse("default"))(
+ ExecutionContext.parasitic)
+ }
+ }
protected val scheme: String = if (settings.secure) "https" else "http"
- private lazy val apiToken =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").getOrElse("")
- private lazy val headers = if (settings.secure)
immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil
+ private[pekko] def apiToken() =
readConfigVarFromFilesystem(settings.apiTokenPath, "api-token").map(
+ _.getOrElse(""))(ExecutionContext.parasitic)
+ private def headers() = if (settings.secure) {
+ apiToken().map { token =>
+ immutable.Seq(Authorization(OAuth2BearerToken(token)))
+ }(ExecutionContext.parasitic)
+ } else
+ Future.successful(Nil)
log.debug("kubernetes access namespace: {}. Secure: {}", namespace,
settings.secure)
@@ -79,7 +92,7 @@ import scala.util.control.NonFatal
protected def getLeaseResource(name: String): Future[Option[LeaseResource]]
- protected def pathForLease(name: String): Uri.Path
+ protected def pathForLease(name: String): Future[Uri.Path]
override def readOrCreateLeaseResource(name: String): Future[LeaseResource]
= {
// TODO backoff retry
@@ -110,10 +123,9 @@ import scala.util.control.NonFatal
private[pekko] def removeLease(name: String): Future[Done] = {
for {
- response <- makeRequest(
- requestForPath(pathForLease(name), HttpMethods.DELETE),
- s"Timed out removing lease [$name]. It is not known if the remove
happened")
-
+ leasePath <- pathForLease(name)
+ request <- requestForPath(leasePath, HttpMethods.DELETE)
+ response <- makeRequest(request, s"Timed out removing lease [$name]. It
is not known if the remove happened")
result <- response.status match {
case StatusCodes.OK =>
log.debug("Lease deleted {}", name)
@@ -148,17 +160,34 @@ import scala.util.control.NonFatal
protected def requestForPath(
path: Uri.Path,
method: HttpMethod = HttpMethods.GET,
- entity: RequestEntity = HttpEntity.Empty): HttpRequest = {
+ entity: RequestEntity = HttpEntity.Empty): Future[HttpRequest] = {
val uri = Uri.from(scheme = scheme, host = settings.apiServerHost, port =
settings.apiServerPort).withPath(path)
- HttpRequest(uri = uri, headers = headers, method = method, entity = entity)
+ headers().map { headers =>
+ HttpRequest(uri = uri, headers = headers, method = method, entity =
entity)
+ }(ExecutionContext.parasitic)
+ }
+
+ private[pekko] def makeRawRequest(request: HttpRequest):
Future[HttpResponse] = {
+ if (settings.secure)
+ http.singleRequest(request, clientSslContext)
+ else
+ http.singleRequest(request)
}
protected def makeRequest(request: HttpRequest, timeoutMsg: String):
Future[HttpResponse] = {
- val response =
- if (settings.secure)
- http.singleRequest(request, clientSslContext)
- else
- http.singleRequest(request)
+ // It's possible to legitimately get a 401 response due to kubernetes
doing a token rotation
+ implicit val scheduler: Scheduler = system.scheduler
+ val response = RetrySupport.retry(
+ () => makeRawRequest(request: HttpRequest),
+ (response: HttpResponse, _: Throwable) => {
+ log.warning("Received status code 401 as response, retrying due to
possible token rotation")
+ response.status == StatusCodes.Unauthorized
+ },
+ settings.tokenRetrySettings.maxAttempts,
+ settings.tokenRetrySettings.minBackoff,
+ settings.tokenRetrySettings.maxBackoff,
+ settings.tokenRetrySettings.randomFactor
+ )
// make sure we always consume response body (in case of timeout)
val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout))
@@ -169,22 +198,22 @@ import scala.util.control.NonFatal
Future.firstCompletedOf(Seq(strictResponse, timeout))
}
- /**
- * This uses blocking IO, and so should only be used to read configuration
at startup.
- */
- protected def readConfigVarFromFilesystem(path: String, name: String):
Option[String] = {
+ protected def readConfigVarFromFilesystem(path: String, name: String):
Future[Option[String]] = {
val file = Paths.get(path)
if (Files.exists(file)) {
try {
- Some(new String(Files.readAllBytes(file), StandardCharsets.UTF_8))
+ FileIO.fromPath(file)
+ .toMat(Sink.fold(ByteString.empty)(_ ++ _))(Keep.right)
+ .run()
+ .map(bs => Some(bs.utf8String))(ExecutionContext.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Error reading {} from {}", name, path)
- None
+ Future.successful(None)
}
} else {
log.warning("Unable to read {} from {} because it doesn't exist.", name,
path)
- None
+ Future.successful(None)
}
}
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
index d3abbf6d..f557b4b9 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/KubernetesApiImpl.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.coordination.lease.kubernetes.internal
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import org.apache.pekko
import pekko.actor.ActorSystem
@@ -59,10 +59,12 @@ PUTs must contain resourceVersions. Response:
val lcr = LeaseCustomResource(Metadata(leaseName, Some(version)),
Spec(ownerName, System.currentTimeMillis()))
for {
entity <- Marshal(lcr).to[RequestEntity]
+ leasePath <- pathForLease(leaseName)
+ request <- requestForPath(leasePath, method = HttpMethods.PUT, entity)
response <- {
log.debug("updating {} to {}", leaseName, lcr)
makeRequest(
- requestForPath(pathForLease(leaseName), method = HttpMethods.PUT,
entity),
+ request,
s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is
not known if the update happened")
}
result <- response.status match {
@@ -97,9 +99,10 @@ PUTs must contain resourceVersions. Response:
}
override def getLeaseResource(name: String): Future[Option[LeaseResource]] =
{
- val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed
out reading lease $name")
for {
- response <- fResponse
+ leasePath <- pathForLease(name)
+ request <- requestForPath(leasePath)
+ response <- makeRequest(request, s"Timed out reading lease $name")
entity <- response.entity.toStrict(settings.bodyReadTimeout)
lr <- response.status match {
case StatusCodes.OK =>
@@ -127,18 +130,21 @@ PUTs must contain resourceVersions. Response:
} yield lr
}
- override def pathForLease(name: String): Uri.Path =
- Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" /
namespace / "leases" / name
- .replaceAll("[^\\d\\w\\-\\.]", "")
- .toLowerCase
+ override def pathForLease(name: String): Future[Uri.Path] = {
+ namespace.map { ns =>
+ Uri.Path.Empty / "apis" / "pekko.apache.org" / "v1" / "namespaces" / ns
/ "leases" / name
+ .replaceAll("[^\\d\\w\\-\\.]", "")
+ .toLowerCase
+ }(ExecutionContext.parasitic)
+ }
override def createLeaseResource(name: String):
Future[Option[LeaseResource]] = {
val lcr = LeaseCustomResource(Metadata(name, None), Spec("",
System.currentTimeMillis()))
for {
entity <- Marshal(lcr).to[RequestEntity]
- response <- makeRequest(
- requestForPath(pathForLease(name), HttpMethods.POST, entity = entity),
- s"Timed out creating lease $name")
+ leasePath <- pathForLease(name)
+ request <- requestForPath(leasePath, HttpMethods.POST, entity = entity)
+ response <- makeRequest(request, s"Timed out creating lease $name")
responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
lr <- response.status match {
case StatusCodes.Created =>
diff --git
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
index 6fb5eb07..f6c6a815 100644
---
a/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
+++
b/lease-kubernetes/src/main/scala/org/apache/pekko/coordination/lease/kubernetes/internal/NativeKubernetesApiImpl.scala
@@ -29,7 +29,7 @@ import pekko.http.scaladsl.unmarshalling.Unmarshal
import java.time.{ Instant, LocalDateTime, ZoneId }
import java.time.format.{ DateTimeFormatter, DateTimeFormatterBuilder }
import java.time.temporal.ChronoField
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
object NativeKubernetesApiImpl {
// From
https://github.com/kubernetes-client/java/blob/e50fb2a6f30d4f07e3922430307e5e09058aaea1/kubernetes/src/main/java/io/kubernetes/client/openapi/JSON.java#L57
@@ -68,10 +68,11 @@ object NativeKubernetesApiImpl {
val lcr = NativeLeaseResource(Metadata(leaseName, Some(version)),
NativeSpec(ownerName, currentTimeRFC3339))
for {
entity <- Marshal(lcr).to[RequestEntity]
+ leasePath <- pathForLease(leaseName)
+ request <- requestForPath(leasePath, method = HttpMethods.PUT, entity)
response <- {
log.debug("updating {} to {}", leaseName, lcr)
- makeRequest(
- requestForPath(pathForLease(leaseName), method = HttpMethods.PUT,
entity),
+ makeRequest(request,
s"Timed out updating lease [$leaseName] to owner [$ownerName]. It is
not known if the update happened")
}
result <- response.status match {
@@ -106,9 +107,10 @@ object NativeKubernetesApiImpl {
}
override def getLeaseResource(name: String): Future[Option[LeaseResource]] =
{
- val fResponse = makeRequest(requestForPath(pathForLease(name)), s"Timed
out reading lease $name")
for {
- response <- fResponse
+ leasePath <- pathForLease(name)
+ request <- requestForPath(leasePath)
+ response <- makeRequest(request, s"Timed out reading lease $name")
entity <- response.entity.toStrict(settings.bodyReadTimeout)
lr <- response.status match {
case StatusCodes.OK =>
@@ -136,18 +138,21 @@ object NativeKubernetesApiImpl {
} yield lr
}
- override def pathForLease(name: String): Uri.Path =
- Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" /
namespace / "leases" / name
- .replaceAll("[^\\d\\w\\-\\.]", "")
- .toLowerCase
+ override def pathForLease(name: String): Future[Uri.Path] = {
+ namespace.map { ns =>
+ Uri.Path.Empty / "apis" / "coordination.k8s.io" / "v1" / "namespaces" /
ns / "leases" / name
+ .replaceAll("[^\\d\\w\\-\\.]", "")
+ .toLowerCase
+ }(ExecutionContext.parasitic)
+ }
override def createLeaseResource(name: String):
Future[Option[LeaseResource]] = {
val lcr = NativeLeaseResource(Metadata(name, None), NativeSpec("",
currentTimeRFC3339))
for {
entity <- Marshal(lcr).to[RequestEntity]
- response <- makeRequest(
- requestForPath(pathForLease(""), HttpMethods.POST, entity = entity),
- s"Timed out creating lease $name")
+ leasePath <- pathForLease("")
+ request <- requestForPath(leasePath, HttpMethods.POST, entity = entity)
+ response <- makeRequest(request, s"Timed out creating lease $name")
responseEntity <- response.entity.toStrict(settings.bodyReadTimeout)
lr <- response.status match {
case StatusCodes.Created =>
diff --git
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
index bb640048..63811d4b 100644
---
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
+++
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/KubernetesApiSpec.scala
@@ -13,12 +13,17 @@
package org.apache.pekko.coordination.lease.kubernetes
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.concurrent.Future
import scala.concurrent.duration._
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
import pekko.coordination.lease.kubernetes.internal.KubernetesApiImpl
-import pekko.http.scaladsl.model.StatusCodes
+import pekko.coordination.lease.LeaseException
+import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import pekko.testkit.TestKit
import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
@@ -65,7 +70,8 @@ class KubernetesApiSpec
val underTest = new KubernetesApiImpl(system, settings) {
// avoid touching slow CI filesystem
- override protected def readConfigVarFromFilesystem(path: String, name:
String): Option[String] = None
+ override protected def readConfigVarFromFilesystem(path: String, name:
String): Future[Option[String]] =
+ Future.successful(None)
}
val leaseName = "lease-1"
val client1 = "client-1"
@@ -269,6 +275,87 @@ class KubernetesApiSpec
underTest.removeLease(lease).failed.futureValue.getMessage shouldEqual
s"Timed out removing lease [$lease]. It is not known if the remove
happened. Is the API server up?"
}
- }
+ "not cache token so it can be rotated" in {
+ val path = File.createTempFile("kubernetes-api-spec", null)
+ path.deleteOnExit()
+
+ val newSettings = new KubernetesSettings(
+ "",
+ path.getAbsolutePath,
+ "localhost",
+ wireMockServer.port(),
+ namespace = Some("lease"),
+ "",
+ apiServerRequestTimeout = 1.second,
+ secure = false)
+
+ val tokenTest = new KubernetesApiImpl(system, newSettings)
+
+ val firstTokenValue = "first"
+ val secondTokenValue = "second"
+
+ Files.write(path.toPath, firstTokenValue.getBytes)
+ tokenTest.apiToken().futureValue shouldEqual firstTokenValue
+ Files.write(path.toPath, secondTokenValue.getBytes)
+ tokenTest.apiToken().futureValue shouldEqual secondTokenValue
+ }
+
+ "retry on 401 to handle token timeout" in {
+ val newSettings = new KubernetesSettings(
+ "",
+ "",
+ "localhost",
+ wireMockServer.port(),
+ namespace = Some("lease"),
+ "",
+ apiServerRequestTimeout = 1.second,
+ secure = false)
+
+ val toFail = new AtomicBoolean(true)
+ val retryUnauthorized = new KubernetesApiImpl(system, newSettings) {
+ // avoid touching slow CI filesystem
+ override protected def readConfigVarFromFilesystem(path: String, name:
String): Future[Option[String]] =
+ Future.successful(None)
+
+ override def makeRawRequest(request: HttpRequest):
Future[HttpResponse] =
+ if (toFail.getAndSet(false))
+ Future.successful(HttpResponse(
+ StatusCodes.Unauthorized
+ ))
+ else
+ Future.successful(HttpResponse(
+ StatusCodes.NotFound
+ ))
+ }
+
+ retryUnauthorized.getLeaseResource("").futureValue shouldEqual None
+ }
+
+ "eventually return unauthorized LeaseException when token rotation is not
happening" in {
+ val newSettings = new KubernetesSettings(
+ "",
+ "",
+ "localhost",
+ wireMockServer.port(),
+ namespace = Some("lease"),
+ "",
+ apiServerRequestTimeout = 1.second,
+ secure = false)
+
+ val retryUnauthorized = new KubernetesApiImpl(system, newSettings) {
+ // avoid touching slow CI filesystem
+ override protected def readConfigVarFromFilesystem(path: String, name:
String): Future[Option[String]] =
+ Future.successful(None)
+
+ override def makeRawRequest(request: HttpRequest):
Future[HttpResponse] =
+ Future.successful(HttpResponse(
+ StatusCodes.Unauthorized
+ ))
+ }
+
+ retryUnauthorized.getLeaseResource("").failed.futureValue shouldBe
an[LeaseException]
+ }
+
+ }
}
diff --git
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
index 74a108a1..f18e11d7 100644
---
a/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
+++
b/lease-kubernetes/src/test/scala/org/apache/pekko/coordination/lease/kubernetes/NativeKubernetesApiSpec.scala
@@ -36,6 +36,7 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
+import scala.concurrent.Future
import scala.concurrent.duration._
class NativeKubernetesApiSpec
@@ -71,7 +72,8 @@ class NativeKubernetesApiSpec
val underTest = new NativeKubernetesApiImpl(system, settings) {
// avoid touching slow CI filesystem
- override protected def readConfigVarFromFilesystem(path: String, name:
String): Option[String] = None
+ override protected def readConfigVarFromFilesystem(path: String, name:
String): Future[Option[String]] =
+ Future.successful(None)
}
val leaseName = "lease-1"
val client1 = "client-1"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]