This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new a209d9eea [CELEBORN-2020] Support http authentication for Celeborn CLI
a209d9eea is described below
commit a209d9eea9d464d1648623666f11b9549a383cc7
Author: Wang, Fei <[email protected]>
AuthorDate: Fri May 30 00:28:41 2025 -0700
[CELEBORN-2020] Support http authentication for Celeborn CLI
### What changes were proposed in this pull request?
Support http authentication for Celeborn CLI.
### Why are the changes needed?
Current CLI does not work if the authentication is enabled for master or
worker.
### Does this PR introduce _any_ user-facing change?
Yes, a new option.
### How was this patch tested?
UT.
Closes #3300 from turboFei/cli_auth.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 5f58fb1e3e0fecc721db4e38292b3f6d2624a1af)
Signed-off-by: Wang, Fei <[email protected]>
---
cli/pom.xml | 7 ++++
.../apache/celeborn/cli/common/CommonOptions.scala | 17 +++++++++
.../celeborn/cli/master/MasterSubcommandImpl.scala | 41 +++++++++++++---------
.../celeborn/cli/worker/WorkerSubcommandImpl.scala | 26 ++++++++------
.../celeborn/cli/TestCelebornCliCommands.scala | 29 +++++++++++++--
docs/celeborn_cli.md | 10 ++++++
6 files changed, 102 insertions(+), 28 deletions(-)
diff --git a/cli/pom.xml b/cli/pom.xml
index 6c3af7029..b2bebee05 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -64,6 +64,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-master_${scala.binary.version}</artifactId>
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
index 69c243f94..4cd0b2db1 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala
@@ -17,6 +17,9 @@
package org.apache.celeborn.cli.common
+import java.util.{Collections, Map => JMap}
+
+import org.apache.commons.lang3.StringUtils
import picocli.CommandLine.{Command, Option, Spec}
import picocli.CommandLine.Model.CommandSpec
@@ -78,4 +81,18 @@ class CommonOptions {
description = Array("The application Id list seperated by comma."))
private[cli] var apps: String = _
+ @Option(
+ names = Array("--auth-header"),
+ paramLabel = "authHeader",
+ description = Array("The http `Authorization` header for authentication. "
+
+ "It should be in the format of `Bearer <token>` or `Basic
<base64-encoded-credentials>`."))
+ private[cli] var authHeader: String = _
+
+ private[cli] def getAuthHeader: JMap[String, String] = {
+ if (StringUtils.isNotBlank(authHeader)) {
+ Collections.singletonMap("Authorization", authHeader)
+ } else {
+ Collections.emptyMap()
+ }
+ }
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index d0bf7c574..5f0e288b2 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -60,18 +60,20 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
runRemoveClusterAlias
}
- private[master] def runShowMastersInfo: MasterInfoResponse =
masterApi.getMasterGroupInfo
+ private[master] def runShowMastersInfo: MasterInfoResponse =
+ masterApi.getMasterGroupInfo(commonOptions.getAuthHeader)
private[master] def runShowClusterApps: ApplicationsHeartbeatResponse =
- applicationApi.getApplications
+ applicationApi.getApplications(commonOptions.getAuthHeader)
- private[master] def runShowClusterShuffles: ShufflesResponse =
shuffleApi.getShuffles
+ private[master] def runShowClusterShuffles: ShufflesResponse =
+ shuffleApi.getShuffles(commonOptions.getAuthHeader)
private[master] def runExcludeWorkers: HandleResponse = {
val workerIds = getWorkerIds
val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
logInfo(s"Sending exclude worker requests to master for the following
workers: $workerIds")
- workerApi.excludeWorker(excludeWorkerRequest)
+ workerApi.excludeWorker(excludeWorkerRequest, commonOptions.getAuthHeader)
}
private[master] def runRemoveExcludedWorkers: HandleResponse = {
@@ -79,7 +81,7 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
val removeExcludeWorkerRequest = new
ExcludeWorkerRequest().remove(workerIds)
logInfo(
s"Sending remove exclude worker requests to master for the following
workers: $workerIds")
- workerApi.excludeWorker(removeExcludeWorkerRequest)
+ workerApi.excludeWorker(removeExcludeWorkerRequest,
commonOptions.getAuthHeader)
}
private[master] def runRemoveWorkersUnavailableInfo: HandleResponse = {
@@ -88,7 +90,9 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
new RemoveWorkersUnavailableInfoRequest().workers(workerIds)
logInfo(
s"Sending remove workers unavailable info requests to master for the
following workers: $workerIds")
- workerApi.removeWorkersUnavailableInfo(removeWorkersUnavailableInfoRequest)
+ workerApi.removeWorkersUnavailableInfo(
+ removeWorkersUnavailableInfoRequest,
+ commonOptions.getAuthHeader)
}
private[master] def runSendWorkerEvent: HandleResponse = {
@@ -106,7 +110,7 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
val sendWorkerEventRequest =
new SendWorkerEventRequest().workers(workerIds).eventType(eventType)
logInfo(s"Sending workerEvent $eventType to workers: $workerIds")
- workerApi.sendWorkerEvent(sendWorkerEventRequest)
+ workerApi.sendWorkerEvent(sendWorkerEventRequest,
commonOptions.getAuthHeader)
}
private[master] def runShowWorkerEventInfo: WorkerEventsResponse =
workerApi.getWorkerEvents
@@ -162,11 +166,13 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
}
private[master] def runShowLifecycleManagers: HostnamesResponse =
- applicationApi.getApplicationHostNames
+ applicationApi.getApplicationHostNames(commonOptions.getAuthHeader)
- private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers
+ private[master] def runShowWorkers: WorkersResponse =
+ workerApi.getWorkers(commonOptions.getAuthHeader)
- private[master] def runShowWorkersTopology: TopologyResponse =
workerApi.getWorkersTopology()
+ private[master] def runShowWorkersTopology: TopologyResponse =
+ workerApi.getWorkersTopology(commonOptions.getAuthHeader)
private[master] def getWorkerIds: util.List[WorkerId] = {
val workerIds = commonOptions.workerIds
@@ -192,15 +198,17 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
.asJava
}
- private[master] def runShowConf: ConfResponse = confApi.getConf
+ private[master] def runShowConf: ConfResponse =
confApi.getConf(commonOptions.getAuthHeader)
private[master] def runShowDynamicConf: DynamicConfigResponse =
confApi.getDynamicConf(
commonOptions.configLevel,
commonOptions.configTenant,
- commonOptions.configName)
+ commonOptions.configName,
+ commonOptions.getAuthHeader)
- private[master] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump
+ private[master] def runShowThreadDump: ThreadStackResponse =
+ defaultApi.getThreadDump(commonOptions.getAuthHeader)
private[master] def runAddClusterAlias: Unit = {
val aliasToAdd = masterOptions.addClusterAlias
@@ -221,7 +229,8 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
logInfo(s"Cluster alias $aliasToRemove removed.")
}
- private[master] def runShowContainerInfo: ContainerInfo =
defaultApi.getContainerInfo
+ private[master] def runShowContainerInfo: ContainerInfo =
+ defaultApi.getContainerInfo(commonOptions.getAuthHeader)
override private[master] def reviseLostShuffles: HandleResponse = {
if (StringUtils.isAnyBlank(commonOptions.apps,
reviseLostShuffleOptions.shuffleIds)) {
@@ -241,7 +250,7 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
reviseLostShuffleOptions.shuffleIds.split(",").map(Integer.valueOf): _*)
val request =
new ReviseLostShufflesRequest().appId(app).shuffleIds(shuffleIds)
- applicationApi.reviseLostShuffles(request)
+ applicationApi.reviseLostShuffles(request, commonOptions.getAuthHeader)
}
override private[master] def deleteApps: HandleResponse = {
@@ -252,6 +261,6 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
}
val appIds = util.Arrays.asList[String](commonOptions.apps.split(","): _*)
val request = new DeleteAppsRequest().apps(appIds)
- applicationApi.deleteApps(request)
+ applicationApi.deleteApps(request, commonOptions.getAuthHeader)
}
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
index dd542bdee..c459e5b4e 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala
@@ -41,17 +41,20 @@ class WorkerSubcommandImpl extends Runnable with
WorkerSubcommand {
if (workerOptions.showThreadDump) log(runShowThreadDump)
}
- private[worker] def runShowWorkerInfo: WorkerInfoResponse =
workerApi.getWorkerInfo
+ private[worker] def runShowWorkerInfo: WorkerInfoResponse =
+ workerApi.getWorkerInfo(commonOptions.getAuthHeader)
- private[worker] def runShowAppsOnWorker: ApplicationsResponse =
applicationApi.getApplicationList
+ private[worker] def runShowAppsOnWorker: ApplicationsResponse =
+ applicationApi.getApplicationList(commonOptions.getAuthHeader)
- private[worker] def runShowShufflesOnWorker: ShufflesResponse =
shuffleApi.getShuffles
+ private[worker] def runShowShufflesOnWorker: ShufflesResponse =
+ shuffleApi.getShuffles(commonOptions.getAuthHeader)
private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse =
- shuffleApi.getShufflePartitions
+ shuffleApi.getShufflePartitions(commonOptions.getAuthHeader)
private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse =
- workerApi.unavailablePeers()
+ workerApi.unavailablePeers(commonOptions.getAuthHeader)
private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
@@ -63,18 +66,21 @@ class WorkerSubcommandImpl extends Runnable with
WorkerSubcommand {
val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType)
val workerExitRequest: WorkerExitRequest = new
WorkerExitRequest().`type`(workerExitType)
logInfo(s"Sending worker exit type: ${workerExitType.getValue}")
- workerApi.workerExit(workerExitRequest)
+ workerApi.workerExit(workerExitRequest, commonOptions.getAuthHeader)
}
- private[worker] def runShowConf: ConfResponse = confApi.getConf
+ private[worker] def runShowConf: ConfResponse =
confApi.getConf(commonOptions.getAuthHeader)
private[worker] def runShowDynamicConf: DynamicConfigResponse =
confApi.getDynamicConf(
commonOptions.configLevel,
commonOptions.configTenant,
- commonOptions.configName)
+ commonOptions.configName,
+ commonOptions.getAuthHeader)
- private[worker] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump
+ private[worker] def runShowThreadDump: ThreadStackResponse =
+ defaultApi.getThreadDump(commonOptions.getAuthHeader)
- private[worker] def runShowContainerInfo: ContainerInfo =
defaultApi.getContainerInfo
+ private[worker] def runShowContainerInfo: ContainerInfo =
+ defaultApi.getContainerInfo(commonOptions.getAuthHeader)
}
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index b65667355..eaee31beb 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -18,18 +18,39 @@
package org.apache.celeborn.cli
import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
+import java.util.Base64
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.worker.Worker
class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature
{
+ private val CELEBORN_ADMINISTER = "celeborn"
private val celebornConf = new CelebornConf()
+ .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
+ .set(
+ CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
+ classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC"))
+ .set(
+ CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
+ classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ .set(CelebornConf.MASTER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
+ .set(CelebornConf.WORKER_HTTP_AUTH_ADMINISTERS, Seq(CELEBORN_ADMINISTER))
+
+ private val BASIC_AUTH_HEADER = HttpAuthSchemes.BASIC + " " + new String(
+ Base64.getEncoder.encode(
+
s"$CELEBORN_ADMINISTER:${UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD}".getBytes()),
+ StandardCharsets.UTF_8)
+
protected var master: Master = _
protected var worker: Worker = _
@@ -272,14 +293,18 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
Array(
"master",
"--cluster",
- "unit-test")
+ "unit-test",
+ "--auth-header",
+ BASIC_AUTH_HEADER)
}
private def prepareWorkerArgs(): Array[String] = {
Array(
"worker",
"--hostport",
- worker.connectionUrl)
+ worker.connectionUrl,
+ "--auth-header",
+ BASIC_AUTH_HEADER)
}
private def captureOutputAndValidateResponse(
diff --git a/docs/celeborn_cli.md b/docs/celeborn_cli.md
index 83e5e430e..91c4c3077 100644
--- a/docs/celeborn_cli.md
+++ b/docs/celeborn_cli.md
@@ -103,6 +103,11 @@ Usage: celeborn-cli master [-hV] [--apps=appId]
[--cluster=cluster_alias]
--add-cluster-alias=alias
Add alias to use in the cli for the given set of
masters
+ --auth-header=authHeader
+ The http `Authorization` header for
+ authentication. It should be in the format of
+ `Bearer <token>` or `Basic
+ <base64-encoded-credentials>`.
--apps=appId The application Id list seperated by comma.
--cluster=cluster_alias
The alias of the cluster to use to query masters
@@ -179,6 +184,11 @@ Usage: celeborn-cli worker [-hV] [--apps=appId]
[--cluster=cluster_alias]
--show-container-info | --show-dynamic-conf |
--show-thread-dump)
--apps=appId The application Id list seperated by comma.
+ --auth-header=authHeader
+ The http `Authorization` header for
+ authentication. It should be in the format of
+ `Bearer <token>` or `Basic
+ <base64-encoded-credentials>`.
--cluster=cluster_alias
The alias of the cluster to use to query masters
--config-level=level The config level of the dynamic configs