This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new a209d9eea [CELEBORN-2020] Support http authentication for Celeborn CLI
a209d9eea is described below

commit a209d9eea9d464d1648623666f11b9549a383cc7
Author: Wang, Fei <[email protected]>
AuthorDate: Fri May 30 00:28:41 2025 -0700

    [CELEBORN-2020] Support http authentication for Celeborn CLI
    
    ### What changes were proposed in this pull request?
    
    Support http authentication for Celeborn CLI.
    
    ### Why are the changes needed?
    
    Current CLI does not work if the authentication is enabled for master or 
worker.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, a new option.
    
    ### How was this patch tested?
    
    UT.
    
    Closes #3300 from turboFei/cli_auth.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 5f58fb1e3e0fecc721db4e38292b3f6d2624a1af)
    Signed-off-by: Wang, Fei <[email protected]>
---
 cli/pom.xml                                        |  7 ++++
 .../apache/celeborn/cli/common/CommonOptions.scala | 17 +++++++++
 .../celeborn/cli/master/MasterSubcommandImpl.scala | 41 +++++++++++++---------
 .../celeborn/cli/worker/WorkerSubcommandImpl.scala | 26 ++++++++------
 .../celeborn/cli/TestCelebornCliCommands.scala     | 29 +++++++++++++--
 docs/celeborn_cli.md                               | 10 ++++++
 6 files changed, 102 insertions(+), 28 deletions(-)

diff --git a/cli/pom.xml b/cli/pom.xml
index 6c3af7029..b2bebee05 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -64,6 +64,13 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
       <artifactId>celeborn-master_${scala.binary.version}</artifactId>
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
index 69c243f94..4cd0b2db1 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.cli.common
 
+import java.util.{Collections, Map => JMap}
+
+import org.apache.commons.lang3.StringUtils
 import picocli.CommandLine.{Command, Option, Spec}
 import picocli.CommandLine.Model.CommandSpec
 
@@ -78,4 +81,18 @@ class CommonOptions {
     description = Array("The application Id list seperated by comma."))
   private[cli] var apps: String = _
 
+  @Option(
+    names = Array("--auth-header"),
+    paramLabel = "authHeader",
+    description = Array("The http `Authorization` header for authentication. " 
+
+      "It should be in the format of `Bearer <token>` or `Basic 
<base64-encoded-credentials>`."))
+  private[cli] var authHeader: String = _
+
+  private[cli] def getAuthHeader: JMap[String, String] = {
+    if (StringUtils.isNotBlank(authHeader)) {
+      Collections.singletonMap("Authorization", authHeader)
+    } else {
+      Collections.emptyMap()
+    }
+  }
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index d0bf7c574..5f0e288b2 100644
--- 
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -60,18 +60,20 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
       runRemoveClusterAlias
   }
 
-  private[master] def runShowMastersInfo: MasterInfoResponse = 
masterApi.getMasterGroupInfo
+  private[master] def runShowMastersInfo: MasterInfoResponse =
+    masterApi.getMasterGroupInfo(commonOptions.getAuthHeader)
 
   private[master] def runShowClusterApps: ApplicationsHeartbeatResponse =
-    applicationApi.getApplications
+    applicationApi.getApplications(commonOptions.getAuthHeader)
 
-  private[master] def runShowClusterShuffles: ShufflesResponse = 
shuffleApi.getShuffles
+  private[master] def runShowClusterShuffles: ShufflesResponse =
+    shuffleApi.getShuffles(commonOptions.getAuthHeader)
 
   private[master] def runExcludeWorkers: HandleResponse = {
     val workerIds = getWorkerIds
     val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
     logInfo(s"Sending exclude worker requests to master for the following 
workers: $workerIds")
-    workerApi.excludeWorker(excludeWorkerRequest)
+    workerApi.excludeWorker(excludeWorkerRequest, commonOptions.getAuthHeader)
   }
 
   private[master] def runRemoveExcludedWorkers: HandleResponse = {
@@ -79,7 +81,7 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     val removeExcludeWorkerRequest = new 
ExcludeWorkerRequest().remove(workerIds)
     logInfo(
       s"Sending remove exclude worker requests to master for the following 
workers: $workerIds")
-    workerApi.excludeWorker(removeExcludeWorkerRequest)
+    workerApi.excludeWorker(removeExcludeWorkerRequest, 
commonOptions.getAuthHeader)
   }
 
   private[master] def runRemoveWorkersUnavailableInfo: HandleResponse = {
@@ -88,7 +90,9 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
       new RemoveWorkersUnavailableInfoRequest().workers(workerIds)
     logInfo(
       s"Sending remove workers unavailable info requests to master for the 
following workers: $workerIds")
-    workerApi.removeWorkersUnavailableInfo(removeWorkersUnavailableInfoRequest)
+    workerApi.removeWorkersUnavailableInfo(
+      removeWorkersUnavailableInfoRequest,
+      commonOptions.getAuthHeader)
   }
 
   private[master] def runSendWorkerEvent: HandleResponse = {
@@ -106,7 +110,7 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     val sendWorkerEventRequest =
       new SendWorkerEventRequest().workers(workerIds).eventType(eventType)
     logInfo(s"Sending workerEvent $eventType to workers: $workerIds")
-    workerApi.sendWorkerEvent(sendWorkerEventRequest)
+    workerApi.sendWorkerEvent(sendWorkerEventRequest, 
commonOptions.getAuthHeader)
   }
 
   private[master] def runShowWorkerEventInfo: WorkerEventsResponse = 
workerApi.getWorkerEvents
@@ -162,11 +166,13 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
   }
 
   private[master] def runShowLifecycleManagers: HostnamesResponse =
-    applicationApi.getApplicationHostNames
+    applicationApi.getApplicationHostNames(commonOptions.getAuthHeader)
 
-  private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers
+  private[master] def runShowWorkers: WorkersResponse =
+    workerApi.getWorkers(commonOptions.getAuthHeader)
 
-  private[master] def runShowWorkersTopology: TopologyResponse = 
workerApi.getWorkersTopology()
+  private[master] def runShowWorkersTopology: TopologyResponse =
+    workerApi.getWorkersTopology(commonOptions.getAuthHeader)
 
   private[master] def getWorkerIds: util.List[WorkerId] = {
     val workerIds = commonOptions.workerIds
@@ -192,15 +198,17 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
       .asJava
   }
 
-  private[master] def runShowConf: ConfResponse = confApi.getConf
+  private[master] def runShowConf: ConfResponse = 
confApi.getConf(commonOptions.getAuthHeader)
 
   private[master] def runShowDynamicConf: DynamicConfigResponse =
     confApi.getDynamicConf(
       commonOptions.configLevel,
       commonOptions.configTenant,
-      commonOptions.configName)
+      commonOptions.configName,
+      commonOptions.getAuthHeader)
 
-  private[master] def runShowThreadDump: ThreadStackResponse = 
defaultApi.getThreadDump
+  private[master] def runShowThreadDump: ThreadStackResponse =
+    defaultApi.getThreadDump(commonOptions.getAuthHeader)
 
   private[master] def runAddClusterAlias: Unit = {
     val aliasToAdd = masterOptions.addClusterAlias
@@ -221,7 +229,8 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     logInfo(s"Cluster alias $aliasToRemove removed.")
   }
 
-  private[master] def runShowContainerInfo: ContainerInfo = 
defaultApi.getContainerInfo
+  private[master] def runShowContainerInfo: ContainerInfo =
+    defaultApi.getContainerInfo(commonOptions.getAuthHeader)
 
   override private[master] def reviseLostShuffles: HandleResponse = {
     if (StringUtils.isAnyBlank(commonOptions.apps, 
reviseLostShuffleOptions.shuffleIds)) {
@@ -241,7 +250,7 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
       reviseLostShuffleOptions.shuffleIds.split(",").map(Integer.valueOf): _*)
     val request =
       new ReviseLostShufflesRequest().appId(app).shuffleIds(shuffleIds)
-    applicationApi.reviseLostShuffles(request)
+    applicationApi.reviseLostShuffles(request, commonOptions.getAuthHeader)
   }
 
   override private[master] def deleteApps: HandleResponse = {
@@ -252,6 +261,6 @@ class MasterSubcommandImpl extends Runnable with 
MasterSubcommand {
     }
     val appIds = util.Arrays.asList[String](commonOptions.apps.split(","): _*)
     val request = new DeleteAppsRequest().apps(appIds)
-    applicationApi.deleteApps(request)
+    applicationApi.deleteApps(request, commonOptions.getAuthHeader)
   }
 }
diff --git 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
index dd542bdee..c459e5b4e 100644
--- 
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
+++ 
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -41,17 +41,20 @@ class WorkerSubcommandImpl extends Runnable with 
WorkerSubcommand {
     if (workerOptions.showThreadDump) log(runShowThreadDump)
   }
 
-  private[worker] def runShowWorkerInfo: WorkerInfoResponse = 
workerApi.getWorkerInfo
+  private[worker] def runShowWorkerInfo: WorkerInfoResponse =
+    workerApi.getWorkerInfo(commonOptions.getAuthHeader)
 
-  private[worker] def runShowAppsOnWorker: ApplicationsResponse = 
applicationApi.getApplicationList
+  private[worker] def runShowAppsOnWorker: ApplicationsResponse =
+    applicationApi.getApplicationList(commonOptions.getAuthHeader)
 
-  private[worker] def runShowShufflesOnWorker: ShufflesResponse = 
shuffleApi.getShuffles
+  private[worker] def runShowShufflesOnWorker: ShufflesResponse =
+    shuffleApi.getShuffles(commonOptions.getAuthHeader)
 
   private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse =
-    shuffleApi.getShufflePartitions
+    shuffleApi.getShufflePartitions(commonOptions.getAuthHeader)
 
   private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse =
-    workerApi.unavailablePeers()
+    workerApi.unavailablePeers(commonOptions.getAuthHeader)
 
   private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
 
@@ -63,18 +66,21 @@ class WorkerSubcommandImpl extends Runnable with 
WorkerSubcommand {
     val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType)
     val workerExitRequest: WorkerExitRequest = new 
WorkerExitRequest().`type`(workerExitType)
     logInfo(s"Sending worker exit type: ${workerExitType.getValue}")
-    workerApi.workerExit(workerExitRequest)
+    workerApi.workerExit(workerExitRequest, commonOptions.getAuthHeader)
   }
 
-  private[worker] def runShowConf: ConfResponse = confApi.getConf
+  private[worker] def runShowConf: ConfResponse = 
confApi.getConf(commonOptions.getAuthHeader)
 
   private[worker] def runShowDynamicConf: DynamicConfigResponse =
     confApi.getDynamicConf(
       commonOptions.configLevel,
       commonOptions.configTenant,
-      commonOptions.configName)
+      commonOptions.configName,
+      commonOptions.getAuthHeader)
 
-  private[worker] def runShowThreadDump: ThreadStackResponse = 
defaultApi.getThreadDump
+  private[worker] def runShowThreadDump: ThreadStackResponse =
+    defaultApi.getThreadDump(commonOptions.getAuthHeader)
 
-  private[worker] def runShowContainerInfo: ContainerInfo = 
defaultApi.getContainerInfo
+  private[worker] def runShowContainerInfo: ContainerInfo =
+    defaultApi.getContainerInfo(commonOptions.getAuthHeader)
 }
diff --git 
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala 
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index b65667355..eaee31beb 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -18,18 +18,39 @@
 package org.apache.celeborn.cli
 
 import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
+import java.util.Base64
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.cli.config.CliConfigManager
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import 
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
 UserDefineTokenAuthenticationProviderImpl}
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 import org.apache.celeborn.service.deploy.master.Master
 import org.apache.celeborn.service.deploy.worker.Worker
 
 class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature 
{
 
+  private val CELEBORN_ADMINISTER = "celeborn"
   private val celebornConf = new CelebornConf()
+    .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
+    .set(
+      CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
+      classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+    .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
+    .set(
+      CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
+      classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+    .set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
+    .set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
+
+  private val BASIC_AUTH_HEADER = HttpAuthSchemes.BASIC + " " + new String(
+    Base64.getEncoder.encode(
+      
s"$CELEBORN_ADMINISTER:${UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
+    StandardCharsets.UTF_8)
+
   protected var master: Master = _
   protected var worker: Worker = _
 
@@ -272,14 +293,18 @@ class TestCelebornCliCommands extends CelebornFunSuite 
with MiniClusterFeature {
     Array(
       "master",
       "--cluster",
-      "unit-test")
+      "unit-test",
+      "--auth-header",
+      BASIC_AUTH_HEADER)
   }
 
   private def prepareWorkerArgs(): Array[String] = {
     Array(
       "worker",
       "--hostport",
-      worker.connectionUrl)
+      worker.connectionUrl,
+      "--auth-header",
+      BASIC_AUTH_HEADER)
   }
 
   private def captureOutputAndValidateResponse(
diff --git a/docs/celeborn_cli.md b/docs/celeborn_cli.md
index 83e5e430e..91c4c3077 100644
--- a/docs/celeborn_cli.md
+++ b/docs/celeborn_cli.md
@@ -103,6 +103,11 @@ Usage: celeborn-cli master [-hV] [--apps=appId] 
[--cluster=cluster_alias]
       --add-cluster-alias=alias
                              Add alias to use in the cli for the given set of
                                masters
+      --auth-header=authHeader
+                             The http `Authorization` header for
+                               authentication. It should be in the format of
+                               `Bearer <token>` or `Basic
+                               <base64-encoded-credentials>`.
       --apps=appId           The application Id list seperated by comma.
       --cluster=cluster_alias
                              The alias of the cluster to use to query masters
@@ -179,6 +184,11 @@ Usage: celeborn-cli worker [-hV] [--apps=appId] 
[--cluster=cluster_alias]
                            --show-container-info | --show-dynamic-conf |
                            --show-thread-dump)
       --apps=appId           The application Id list seperated by comma.
+      --auth-header=authHeader
+                             The http `Authorization` header for
+                               authentication. It should be in the format of
+                               `Bearer <token>` or `Basic
+                               <base64-encoded-credentials>`.
       --cluster=cluster_alias
                              The alias of the cluster to use to query masters
       --config-level=level   The config level of the dynamic configs

Reply via email to