This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.0.x
in repository https://gitbox.apache.org/repos/asf/pekko-management.git
The following commit(s) were added to refs/heads/1.0.x by this push:
new 68406fd2 [chore] improve error handling when invoking 'start' twice
(#418) (#420)
68406fd2 is described below
commit 68406fd2229525caab89d0fc4abeb2f43691fc4e
Author: Arnout Engelen <[email protected]>
AuthorDate: Fri May 2 19:55:55 2025 +0200
[chore] improve error handling when invoking 'start' twice (#418) (#420)
Currently, when you run `start` twice with the same config, it will
start the PekkoManagement extension once and return the same `Uri`.
This is a convenient 'idempotence'. However, previously this would
silently ignore the second invocation if the parameters of the two
calls were different, leading to a race condition.
The change in this commit adds error handling so that if the second
invocation uses different parameters, it will fail the second call
with a sensible error message.
---
.../management/scaladsl/PekkoManagement.scala | 89 ++++++++++++----------
.../pekko/management/PekkoManagementSpec.scala | 62 +++++++++++++++
2 files changed, 112 insertions(+), 39 deletions(-)
diff --git
a/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
b/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
index b92a5221..0b2e0e9a 100644
---
a/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
+++
b/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
@@ -77,8 +77,8 @@ final class PekkoManagement(implicit private[pekko] val
system: ExtendedActorSys
private val routeProviders: immutable.Seq[ManagementRouteProvider] =
loadRouteProviders()
- private val bindingFuture = new AtomicReference[Future[Http.ServerBinding]]()
- private val selfUriPromise = Promise[Uri]() // TODO has to keep config as
well as the Uri, so we can reject 2nd calls with diff uri
+ private val bindingFuture = new
AtomicReference[(ManagementRouteProviderSettings, Future[Http.ServerBinding])]()
+ private val selfUriPromise = Promise[Uri]()
private def providerSettings: ManagementRouteProviderSettings = {
// port is on purpose never inferred from protocol, because this HTTP
endpoint is not the "main" one for the app
@@ -126,48 +126,59 @@ final class PekkoManagement(implicit private[pekko] val
system: ExtendedActorSys
*/
def start(transformSettings: ManagementRouteProviderSettings =>
ManagementRouteProviderSettings): Future[Uri] = {
val serverBindingPromise = Promise[Http.ServerBinding]()
- if (bindingFuture.compareAndSet(null, serverBindingPromise.future)) {
+ val effectiveProviderSettings = transformSettings(providerSettings)
+ if (bindingFuture.compareAndSet(null, (effectiveProviderSettings,
serverBindingPromise.future))) {
try {
- val effectiveBindHostname = settings.Http.EffectiveBindHostname
- val effectiveBindPort = settings.Http.EffectiveBindPort
- val effectiveProviderSettings = transformSettings(providerSettings)
+ start(effectiveProviderSettings, serverBindingPromise)
+ } catch {
+ case NonFatal(ex) =>
+ log.warning(ex.getMessage)
+ Future.failed(new IllegalArgumentException("Failed to start Pekko
Management HTTP endpoint.", ex))
+ }
+ } else {
+ val (configForExistingBinding, _) = bindingFuture.get()
+ if (configForExistingBinding == effectiveProviderSettings)
+ selfUriPromise.future
+ else
+ Future.failed(
+ new IllegalStateException("Management extension already started with
different configuration parameters"))
+ }
+ }
- // TODO instead of binding to hardcoded things here, discovery could
also be used for this binding!
- // Basically: "give me the SRV host/port for the port called
`pekko-bootstrap`"
- // discovery.lookup("_pekko-bootstrap" +
".effective-name.default").find(myaddress)
- // ----
- // FIXME -- think about the style of how we want to make these
available
+ private def start(effectiveProviderSettings: ManagementRouteProviderSettings,
+ serverBindingPromise: Promise[Http.ServerBinding]): Future[Uri] = {
+ val effectiveBindHostname = settings.Http.EffectiveBindHostname
+ val effectiveBindPort = settings.Http.EffectiveBindPort
- log.info("Binding Pekko Management (HTTP) endpoint to: {}:{}",
effectiveBindHostname, effectiveBindPort)
+ // TODO instead of binding to hardcoded things here, discovery could also
be used for this binding!
+ // Basically: "give me the SRV host/port for the port called
`pekko-bootstrap`"
+ // discovery.lookup("_pekko-bootstrap" +
".effective-name.default").find(myaddress)
+ // ----
+ // FIXME -- think about the style of how we want to make these available
- val combinedRoutes = prepareCombinedRoutes(effectiveProviderSettings)
+ log.info("Binding Pekko Management (HTTP) endpoint to: {}:{}",
effectiveBindHostname, effectiveBindPort)
- val baseBuilder = Http()
- .newServerAt(effectiveBindHostname, effectiveBindPort)
- .withSettings(ServerSettings(system).withRemoteAddressHeader(true))
+ val combinedRoutes = prepareCombinedRoutes(effectiveProviderSettings)
- val securedBuilder = effectiveProviderSettings.httpsConnectionContext
match {
- case Some(httpsContext) => baseBuilder.enableHttps(httpsContext)
- case None => baseBuilder
- }
- val serverFutureBinding = securedBuilder.bind(combinedRoutes)
-
- serverBindingPromise.completeWith(serverFutureBinding).future.flatMap
{ binding =>
- val boundPort = binding.localAddress.getPort
- log.info(
-
ManagementLogMarker.boundHttp(s"$effectiveBindHostname:$boundPort"),
- "Bound Pekko Management (HTTP) endpoint to: {}:{}",
- effectiveBindHostname,
- boundPort)
-
selfUriPromise.success(effectiveProviderSettings.selfBaseUri.withPort(boundPort)).future
- }
+ val baseBuilder = Http()
+ .newServerAt(effectiveBindHostname, effectiveBindPort)
+ .withSettings(ServerSettings(system).withRemoteAddressHeader(true))
- } catch {
- case NonFatal(ex) =>
- log.warning(ex.getMessage)
- Future.failed(new IllegalArgumentException("Failed to start Pekko
Management HTTP endpoint.", ex))
- }
- } else selfUriPromise.future
+ val securedBuilder = effectiveProviderSettings.httpsConnectionContext
match {
+ case Some(httpsContext) => baseBuilder.enableHttps(httpsContext)
+ case None => baseBuilder
+ }
+ val serverFutureBinding = securedBuilder.bind(combinedRoutes)
+
+ serverBindingPromise.completeWith(serverFutureBinding).future.flatMap {
binding =>
+ val boundPort = binding.localAddress.getPort
+ log.info(
+ ManagementLogMarker.boundHttp(s"$effectiveBindHostname:$boundPort"),
+ "Bound Pekko Management (HTTP) endpoint to: {}:{}",
+ effectiveBindHostname,
+ boundPort)
+
selfUriPromise.success(effectiveProviderSettings.selfBaseUri.withPort(boundPort)).future
+ }
}
private def prepareCombinedRoutes(providerSettings:
ManagementRouteProviderSettings): Route = {
@@ -194,7 +205,7 @@ final class PekkoManagement(implicit private[pekko] val
system: ExtendedActorSys
case provided: Credentials.Provided =>
Optional.of(ProvidedCredentials(provided))
case _ => Optional.empty()
}
- authenticateBasicAsync(realm = "secured", c =>
auth.apply(credsToJava(c)).asScala.map(_.toScala)).optional
+ authenticateBasicAsync(realm = "secured", c =>
auth.apply(credsToJava(c)).asScala.map(_.toScala))
.apply(_ => inner)
case (Some(_), Some(_)) =>
@@ -224,7 +235,7 @@ final class PekkoManagement(implicit private[pekko] val
system: ExtendedActorSys
if (binding == null) {
Future.successful(Done)
} else if (bindingFuture.compareAndSet(binding, null)) {
- binding.flatMap(_.unbind()).map((_: Any) => Done)
+ binding._2.flatMap(_.unbind()).map((_: Any) => Done)
} else stop() // retry, CAS was not successful, someone else completed the
stop()
}
diff --git
a/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
b/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
new file mode 100644
index 00000000..4df87d18
--- /dev/null
+++
b/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.management
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.testkit.TestKit
+import org.apache.pekko.management.scaladsl.PekkoManagement
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class PekkoManagementSpec extends TestKit(ActorSystem("PekkoManagementSpec"))
+ with AnyWordSpecLike
+ with BeforeAndAfterAll
+ with Matchers {
+
+ val config = ConfigFactory.parseString(
+ """
+ |pekko.remote.log-remote-lifecycle-events = off
+ |pekko.remote.netty.tcp.port = 0
+ |pekko.remote.artery.canonical.port = 0
+ |
+ |pekko.management.http.port = 0
+ |#pekko.loglevel = DEBUG
+ """.stripMargin)
+
+ val mgmt = PekkoManagement(system)
+
+ "Pekko Management" should {
+ "successfully start" in {
+ // Starting twice with the same config actually starts once:
+ val started = mgmt.start(_.withReadOnly(true)).futureValue
+ val started2 = mgmt.start(_.withReadOnly(true)).futureValue
+ started should be(started2)
+
+ // But starting with a different config fails:
+ val e = mgmt.start(_.withReadOnly(false)).failed.futureValue
+ e.getMessage should be("Management extension already started with
different configuration parameters")
+ }
+ }
+
+ override def afterAll(): Unit = {
+ mgmt.stop()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]