This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.0.x
in repository https://gitbox.apache.org/repos/asf/pekko-management.git


The following commit(s) were added to refs/heads/1.0.x by this push:
     new 68406fd2 [chore] improve error handling when invoking 'start' twice 
(#418) (#420)
68406fd2 is described below

commit 68406fd2229525caab89d0fc4abeb2f43691fc4e
Author: Arnout Engelen <[email protected]>
AuthorDate: Fri May 2 19:55:55 2025 +0200

    [chore] improve error handling when invoking 'start' twice (#418) (#420)
    
    Currently, when you run `start` twice with the same config, it will
    start the PekkoManagement extension once and return the same `Uri`.
    This is a convenient 'idempotence'. However, previously this would
    silently ignore the second invocation if the parameters of the two
    calls were different, leading to a race condition.
    
    The change in this commit adds error handling so that if the second
    invocation uses different parameters, it will fail the second call
    with a sensible error message.
---
 .../management/scaladsl/PekkoManagement.scala      | 89 ++++++++++++----------
 .../pekko/management/PekkoManagementSpec.scala     | 62 +++++++++++++++
 2 files changed, 112 insertions(+), 39 deletions(-)

diff --git 
a/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
 
b/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
index b92a5221..0b2e0e9a 100644
--- 
a/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
+++ 
b/management/src/main/scala/org/apache/pekko/management/scaladsl/PekkoManagement.scala
@@ -77,8 +77,8 @@ final class PekkoManagement(implicit private[pekko] val 
system: ExtendedActorSys
 
   private val routeProviders: immutable.Seq[ManagementRouteProvider] = 
loadRouteProviders()
 
-  private val bindingFuture = new AtomicReference[Future[Http.ServerBinding]]()
-  private val selfUriPromise = Promise[Uri]() // TODO has to keep config as 
well as the Uri, so we can reject 2nd calls with diff uri
+  private val bindingFuture = new 
AtomicReference[(ManagementRouteProviderSettings, Future[Http.ServerBinding])]()
+  private val selfUriPromise = Promise[Uri]()
 
   private def providerSettings: ManagementRouteProviderSettings = {
     // port is on purpose never inferred from protocol, because this HTTP 
endpoint is not the "main" one for the app
@@ -126,48 +126,59 @@ final class PekkoManagement(implicit private[pekko] val 
system: ExtendedActorSys
    */
   def start(transformSettings: ManagementRouteProviderSettings => 
ManagementRouteProviderSettings): Future[Uri] = {
     val serverBindingPromise = Promise[Http.ServerBinding]()
-    if (bindingFuture.compareAndSet(null, serverBindingPromise.future)) {
+    val effectiveProviderSettings = transformSettings(providerSettings)
+    if (bindingFuture.compareAndSet(null, (effectiveProviderSettings, 
serverBindingPromise.future))) {
       try {
-        val effectiveBindHostname = settings.Http.EffectiveBindHostname
-        val effectiveBindPort = settings.Http.EffectiveBindPort
-        val effectiveProviderSettings = transformSettings(providerSettings)
+        start(effectiveProviderSettings, serverBindingPromise)
+      } catch {
+        case NonFatal(ex) =>
+          log.warning(ex.getMessage)
+          Future.failed(new IllegalArgumentException("Failed to start Pekko 
Management HTTP endpoint.", ex))
+      }
+    } else {
+      val (configForExistingBinding, _) = bindingFuture.get()
+      if (configForExistingBinding == effectiveProviderSettings)
+        selfUriPromise.future
+      else
+        Future.failed(
+          new IllegalStateException("Management extension already started with 
different configuration parameters"))
+    }
+  }
 
-        // TODO instead of binding to hardcoded things here, discovery could 
also be used for this binding!
-        // Basically: "give me the SRV host/port for the port called 
`pekko-bootstrap`"
-        // discovery.lookup("_pekko-bootstrap" + 
".effective-name.default").find(myaddress)
-        // ----
-        // FIXME -- think about the style of how we want to make these 
available
+  private def start(effectiveProviderSettings: ManagementRouteProviderSettings,
+      serverBindingPromise: Promise[Http.ServerBinding]): Future[Uri] = {
+    val effectiveBindHostname = settings.Http.EffectiveBindHostname
+    val effectiveBindPort = settings.Http.EffectiveBindPort
 
-        log.info("Binding Pekko Management (HTTP) endpoint to: {}:{}", 
effectiveBindHostname, effectiveBindPort)
+    // TODO instead of binding to hardcoded things here, discovery could also 
be used for this binding!
+    // Basically: "give me the SRV host/port for the port called 
`pekko-bootstrap`"
+    // discovery.lookup("_pekko-bootstrap" + 
".effective-name.default").find(myaddress)
+    // ----
+    // FIXME -- think about the style of how we want to make these available
 
-        val combinedRoutes = prepareCombinedRoutes(effectiveProviderSettings)
+    log.info("Binding Pekko Management (HTTP) endpoint to: {}:{}", 
effectiveBindHostname, effectiveBindPort)
 
-        val baseBuilder = Http()
-          .newServerAt(effectiveBindHostname, effectiveBindPort)
-          .withSettings(ServerSettings(system).withRemoteAddressHeader(true))
+    val combinedRoutes = prepareCombinedRoutes(effectiveProviderSettings)
 
-        val securedBuilder = effectiveProviderSettings.httpsConnectionContext 
match {
-          case Some(httpsContext) => baseBuilder.enableHttps(httpsContext)
-          case None               => baseBuilder
-        }
-        val serverFutureBinding = securedBuilder.bind(combinedRoutes)
-
-        serverBindingPromise.completeWith(serverFutureBinding).future.flatMap 
{ binding =>
-          val boundPort = binding.localAddress.getPort
-          log.info(
-            
ManagementLogMarker.boundHttp(s"$effectiveBindHostname:$boundPort"),
-            "Bound Pekko Management (HTTP) endpoint to: {}:{}",
-            effectiveBindHostname,
-            boundPort)
-          
selfUriPromise.success(effectiveProviderSettings.selfBaseUri.withPort(boundPort)).future
-        }
+    val baseBuilder = Http()
+      .newServerAt(effectiveBindHostname, effectiveBindPort)
+      .withSettings(ServerSettings(system).withRemoteAddressHeader(true))
 
-      } catch {
-        case NonFatal(ex) =>
-          log.warning(ex.getMessage)
-          Future.failed(new IllegalArgumentException("Failed to start Pekko 
Management HTTP endpoint.", ex))
-      }
-    } else selfUriPromise.future
+    val securedBuilder = effectiveProviderSettings.httpsConnectionContext 
match {
+      case Some(httpsContext) => baseBuilder.enableHttps(httpsContext)
+      case None               => baseBuilder
+    }
+    val serverFutureBinding = securedBuilder.bind(combinedRoutes)
+
+    serverBindingPromise.completeWith(serverFutureBinding).future.flatMap { 
binding =>
+      val boundPort = binding.localAddress.getPort
+      log.info(
+        ManagementLogMarker.boundHttp(s"$effectiveBindHostname:$boundPort"),
+        "Bound Pekko Management (HTTP) endpoint to: {}:{}",
+        effectiveBindHostname,
+        boundPort)
+      
selfUriPromise.success(effectiveProviderSettings.selfBaseUri.withPort(boundPort)).future
+    }
   }
 
   private def prepareCombinedRoutes(providerSettings: 
ManagementRouteProviderSettings): Route = {
@@ -194,7 +205,7 @@ final class PekkoManagement(implicit private[pekko] val 
system: ExtendedActorSys
             case provided: Credentials.Provided => 
Optional.of(ProvidedCredentials(provided))
             case _                              => Optional.empty()
           }
-          authenticateBasicAsync(realm = "secured", c => 
auth.apply(credsToJava(c)).asScala.map(_.toScala)).optional
+          authenticateBasicAsync(realm = "secured", c => 
auth.apply(credsToJava(c)).asScala.map(_.toScala))
             .apply(_ => inner)
 
         case (Some(_), Some(_)) =>
@@ -224,7 +235,7 @@ final class PekkoManagement(implicit private[pekko] val 
system: ExtendedActorSys
     if (binding == null) {
       Future.successful(Done)
     } else if (bindingFuture.compareAndSet(binding, null)) {
-      binding.flatMap(_.unbind()).map((_: Any) => Done)
+      binding._2.flatMap(_.unbind()).map((_: Any) => Done)
     } else stop() // retry, CAS was not successful, someone else completed the 
stop()
   }
 
diff --git 
a/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
 
b/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
new file mode 100644
index 00000000..4df87d18
--- /dev/null
+++ 
b/management/src/test/scala/org/apache/pekko/management/PekkoManagementSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.management
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.testkit.TestKit
+import org.apache.pekko.management.scaladsl.PekkoManagement
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class PekkoManagementSpec extends TestKit(ActorSystem("PekkoManagementSpec"))
+    with AnyWordSpecLike
+    with BeforeAndAfterAll
+    with Matchers {
+
+  val config = ConfigFactory.parseString(
+    """
+      |pekko.remote.log-remote-lifecycle-events = off
+      |pekko.remote.netty.tcp.port = 0
+      |pekko.remote.artery.canonical.port = 0
+      |
+      |pekko.management.http.port = 0
+      |#pekko.loglevel = DEBUG
+    """.stripMargin)
+
+  val mgmt = PekkoManagement(system)
+
+  "Pekko Management" should {
+    "successfully start" in {
+      // Starting twice with the same config actually starts once:
+      val started = mgmt.start(_.withReadOnly(true)).futureValue
+      val started2 = mgmt.start(_.withReadOnly(true)).futureValue
+      started should be(started2)
+
+      // But starting with a different config fails:
+      val e = mgmt.start(_.withReadOnly(false)).failed.futureValue
+      e.getMessage should be("Management extension already started with 
different configuration parameters")
+    }
+  }
+
+  override def afterAll(): Unit = {
+    mgmt.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to