This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5cea9cc7f [CELEBORN-1318] Support celeborn http authentication
5cea9cc7f is described below
commit 5cea9cc7f21577a9b660ec84761efdd82e0ef7e9
Author: Fei Wang <[email protected]>
AuthorDate: Thu Jun 20 10:35:12 2024 +0800
[CELEBORN-1318] Support celeborn http authentication
### What changes were proposed in this pull request?
Support celeborn master/worker http authentication.
### Why are the changes needed?
Authentication is needed for celeborn admin APIs.
### Does this PR introduce _any_ user-facing change?
Yes, introduce authentication related config items, but does not break the
current behavior.
### How was this patch tested?
Added UT for BASIC and Bearer authentication.
Closes #2440 from turboFei/http_auth.
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 132 ++++++++++++-
.../AnonymousAuthenticationProviderImpl.scala | 33 ++++
.../common/authentication/BasicPrincipal.scala | 40 ++++
.../common/authentication/HttpAuthSchemes.scala | 23 +++
.../PasswdAuthenticationProvider.scala | 37 ++++
.../TokenAuthenticationProvider.scala | 37 ++++
docs/configuration/master.md | 6 +
docs/configuration/worker.md | 6 +
.../ApiMasterResourceAuthenticationSuite.scala | 64 +++++++
.../celeborn/server/common/HttpService.scala | 11 +-
.../server/common/http/HttpAuthUtils.scala | 25 +++
.../authentication/AuthenticationAuditLogger.scala | 43 +++++
.../http/authentication/AuthenticationFilter.scala | 205 +++++++++++++++++++++
.../authentication/AuthenticationHandler.scala | 110 +++++++++++
.../BasicAuthenticationHandler.scala | 97 ++++++++++
.../BearerAuthenticationHandler.scala | 89 +++++++++
.../authentication/HttpAuthenticationFactory.scala | 92 +++++++++
.../SpnegoAuthenticationHandler.scala | 165 +++++++++++++++++
.../http/ApiBaseResourceAuthenticationSuite.scala | 119 ++++++++++++
.../server/common/http/ApiBaseResourceSuite.scala | 44 ++---
.../server/common/http/HttpTestHelper.scala | 1 +
...rDefinePasswordAuthenticationProviderImpl.scala | 41 +++++
...UserDefineTokenAuthenticationProviderImpl.scala | 40 ++++
.../worker/http/api/ApiWorkerResourceSuite.scala | 9 +-
.../ApiWorkerResourceAuthenticationSuite.scala | 43 +++++
25 files changed, 1484 insertions(+), 28 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 690758b77..742c16444 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.concurrent.duration._
import scala.util.Try
-import org.apache.celeborn.common.CelebornConf.MASTER_INTERNAL_ENDPOINTS
+import
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
@@ -2193,6 +2193,71 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
+ val MASTER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
+ buildConf("celeborn.master.http.auth.supportedSchemes")
+ .categories("master")
+ .version("0.6.0")
+ .doc("A comma-separated list of master http auth supported schemes." +
+ "<ul>" +
+ " <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
+ " <li>BASIC: User-defined password authentication, the concreted
implementation is" +
+ " configurable via `celeborn.master.http.auth.basic.provider`.</li>" +
+ " <li>BEARER: User-defined bearer token authentication, the concreted
implementation is" +
+ " configurable via `celeborn.master.http.auth.bearer.provider`.</li>" +
+ "</ul>")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val MASTER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
+ buildConf("celeborn.master.http.spnego.keytab")
+ .categories("master")
+ .version("0.6.0")
+ .doc("The keytab file for SPNego authentication.")
+ .stringConf
+ .createOptional
+
+ val MASTER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
+ buildConf("celeborn.master.http.spnego.principal")
+ .categories("master")
+ .version("0.6.0")
+ .doc("SPNego service principal, typical value would look like
HTTP/[email protected]." +
+ " SPNego service principal would be used when celeborn http
authentication is enabled." +
+ " This needs to be set only if SPNEGO is to be used in
authentication.")
+ .stringConf
+ .createOptional
+
+ val MASTER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
+ buildConf("celeborn.master.http.proxy.client.ip.header")
+ .categories("master")
+ .doc("The HTTP header to record the real client IP address. If your
server is behind a load" +
+ " balancer or other proxy, the server will see this load balancer or
proxy IP address as" +
+ " the client IP address, to get around this common issue, most load
balancers or proxies" +
+ " offer the ability to record the real remote IP address in an HTTP
header that will be" +
+ " added to the request for other devices to use. Note that, because
the header value can" +
+ " be specified to any IP address, so it will not be used for
authentication.")
+ .version("0.6.0")
+ .stringConf
+ .createWithDefault("X-Real-IP")
+
+ val MASTER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.master.http.auth.basic.provider")
+ .categories("master")
+ .version("0.6.0")
+ .doc("User-defined password authentication implementation of " +
+
"org.apache.celeborn.common.authentication.PasswdAuthenticationProvider")
+ .stringConf
+ .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
+ val MASTER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.master.http.auth.bearer.provider")
+ .categories("master")
+ .version("0.6.0")
+ .doc("User-defined token authentication implementation of " +
+
"org.apache.celeborn.common.authentication.TokenAuthenticationProvider")
+ .stringConf
+ .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
@@ -2795,6 +2860,71 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
+ val WORKER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
+ buildConf("celeborn.worker.http.auth.supportedSchemes")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("A comma-separated list of worker http auth supported schemes." +
+ "<ul>" +
+ " <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
+ " <li>BASIC: User-defined password authentication, the concreted
implementation is" +
+ " configurable via `celeborn.worker.http.auth.basic.provider`.</li>" +
+ " <li>BEARER: User-defined bearer token authentication, the concreted
implementation is" +
+ " configurable via `celeborn.worker.http.auth.bearer.provider`.</li>" +
+ "</ul>")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val WORKER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
+ buildConf("celeborn.worker.http.spnego.keytab")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("The keytab file for SPNego authentication.")
+ .stringConf
+ .createOptional
+
+ val WORKER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
+ buildConf("celeborn.worker.http.spnego.principal")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("SPNego service principal, typical value would look like
HTTP/[email protected]." +
+ " SPNego service principal would be used when celeborn http
authentication is enabled." +
+ " This needs to be set only if SPNEGO is to be used in
authentication.")
+ .stringConf
+ .createOptional
+
+ val WORKER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
+ buildConf("celeborn.worker.http.proxy.client.ip.header")
+ .categories("worker")
+ .doc("The HTTP header to record the real client IP address. If your
server is behind a load" +
+ " balancer or other proxy, the server will see this load balancer or
proxy IP address as" +
+ " the client IP address, to get around this common issue, most load
balancers or proxies" +
+ " offer the ability to record the real remote IP address in an HTTP
header that will be" +
+ " added to the request for other devices to use. Note that, because
the header value can" +
+ " be specified to any IP address, so it will not be used for
authentication.")
+ .version("0.6.0")
+ .stringConf
+ .createWithDefault("X-Real-IP")
+
+ val WORKER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.worker.http.auth.basic.provider")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("User-defined password authentication implementation of " +
+
"org.apache.celeborn.common.authentication.PasswdAuthenticationProvider")
+ .stringConf
+ .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
+ val WORKER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.worker.http.auth.bearer.provider")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("User-defined token authentication implementation of " +
+
"org.apache.celeborn.common.authentication.TokenAuthenticationProvider")
+ .stringConf
+ .createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
+
val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
b/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..de38c1b30
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/authentication/AnonymousAuthenticationProviderImpl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+class AnonymousAuthenticationProviderImpl extends PasswdAuthenticationProvider
+ with TokenAuthenticationProvider {
+ override def authenticate(user: String, password: String): Principal = {
+ // no-op authentication
+ new BasicPrincipal(user)
+ }
+
+ override def authenticate(token: String): Principal = {
+ // no-op authentication
+ new BasicPrincipal("anonymous")
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
b/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
new file mode 100644
index 000000000..750677cf2
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/authentication/BasicPrincipal.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+import java.util.Objects
+
+class BasicPrincipal(val name: String) extends Principal {
+ Objects.requireNonNull(name, "Principal name cannot be null")
+ override def getName: String = name
+
+ override def toString: String = name
+
+ override def hashCode(): Int = Objects.hash(name)
+
+ override def equals(o: Any): Boolean = {
+ if (this == o) {
+ true
+ } else if (o == null || getClass != o.getClass) {
+ false
+ } else {
+ Objects.equals(name, o.asInstanceOf[BasicPrincipal].name)
+ }
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
b/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
new file mode 100644
index 000000000..0a4bd479a
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/authentication/HttpAuthSchemes.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+object HttpAuthSchemes extends Enumeration {
+ type HttpAuthScheme = Value
+ val NEGOTIATE, BASIC, BEARER = Value
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
new file mode 100644
index 000000000..71aaad40c
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/authentication/PasswdAuthenticationProvider.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+trait PasswdAuthenticationProvider {
+
+ /**
+ * The authenticate method is called by the celeborn authentication layer
+ * to authenticate user & password for their requests.
+ * If a user is to be granted, return nothing/throw nothing.
+ * When a user is to be disallowed, throw an appropriate
[[SecurityException]].
+ *
+ * @param user The username received over the connection request
+ * @param password The password received over the connection request
+ *
+ * @throws SecurityException When a user is found to be invalid by the
implementation
+ */
+ @throws[SecurityException]
+ def authenticate(user: String, password: String): Principal
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
new file mode 100644
index 000000000..81729f26c
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/authentication/TokenAuthenticationProvider.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.authentication
+
+import java.security.Principal
+
+trait TokenAuthenticationProvider {
+
+ /**
+ * The authenticate method is called by the celeborn authentication layer
+ * to authenticate token for their requests.
+ * If the token is to be granted, return nothing/throw nothing.
+ * When the token is to be disallowed, throw an appropriate
[[SecurityException]].
+ *
+ * @param token The token received over the connection request.
+ * @return The identifier associated with the token
+ *
+ * @throws SecurityException When the token is found to be invalid by the
implementation
+ */
+ @throws[SecurityException]
+ def authenticate(token: String): Principal
+}
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 97aae4333..efa455451 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -43,10 +43,16 @@ license: |
| celeborn.master.heartbeat.application.timeout | 300s | false | Application
heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | <localhost> | false | Hostname for master to
bind. | 0.2.0 | |
+| celeborn.master.http.auth.basic.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined password authentication implementation of
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0
| |
+| celeborn.master.http.auth.bearer.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined token authentication implementation of
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 |
|
+| celeborn.master.http.auth.supportedSchemes | | false | A comma-separated
list of master http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI
authentication.</li> <li>BASIC: User-defined password authentication, the
concreted implementation is configurable via
`celeborn.master.http.auth.basic.provider`.</li> <li>BEARER: User-defined
bearer token authentication, the concreted implementation is configurable via
`celeborn.master.http.auth.bearer.provider`.</li></ul> | 0.6.0 | |
| celeborn.master.http.host | <localhost> | false | Master's http host.
| 0.4.0 |
celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host
|
| celeborn.master.http.idleTimeout | 30s | false | Master http server idle
timeout. | 0.5.0 | |
| celeborn.master.http.maxWorkerThreads | 200 | false | Maximum number of
threads in the master http worker thread pool. | 0.5.0 | |
| celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 |
celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port
|
+| celeborn.master.http.proxy.client.ip.header | X-Real-IP | false | The HTTP
header to record the real client IP address. If your server is behind a load
balancer or other proxy, the server will see this load balancer or proxy IP
address as the client IP address, to get around this common issue, most load
balancers or proxies offer the ability to record the real remote IP address in
an HTTP header that will be added to the request for other devices to use. Note
that, because the header v [...]
+| celeborn.master.http.spnego.keytab | <undefined> | false | The keytab
file for SPNego authentication. | 0.6.0 | |
+| celeborn.master.http.spnego.principal | <undefined> | false | SPNego
service principal, typical value would look like HTTP/[email protected]. SPNego
service principal would be used when celeborn http authentication is enabled.
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |
|
| celeborn.master.http.stopTimeout | 5s | false | Master http server stop
timeout. | 0.5.0 | |
| celeborn.master.internal.port | 8097 | false | Internal port on the master
where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 30d27f780..ef15cd7d0 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -86,10 +86,16 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s |
false | Interval for a Celeborn worker to flush committed file infos into Level
DB. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false
| Whether to call sync method to save committed file infos into Level DB to
handle OS crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's
graceful shutdown timeout time. | 0.2.0 | |
+| celeborn.worker.http.auth.basic.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined password authentication implementation of
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0
| |
+| celeborn.worker.http.auth.bearer.provider |
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl |
false | User-defined token authentication implementation of
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 |
|
+| celeborn.worker.http.auth.supportedSchemes | | false | A comma-separated
list of worker http auth supported schemes.<ul> <li>SPNEGO: Kerberos/GSSAPI
authentication.</li> <li>BASIC: User-defined password authentication, the
concreted implementation is configurable via
`celeborn.worker.http.auth.basic.provider`.</li> <li>BEARER: User-defined
bearer token authentication, the concreted implementation is configurable via
`celeborn.worker.http.auth.bearer.provider`.</li></ul> | 0.6.0 | |
| celeborn.worker.http.host | <localhost> | false | Worker's http host.
| 0.4.0 |
celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host
|
| celeborn.worker.http.idleTimeout | 30s | false | Worker http server idle
timeout. | 0.5.0 | |
| celeborn.worker.http.maxWorkerThreads | 200 | false | Maximum number of
threads in the worker http worker thread pool. | 0.5.0 | |
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 |
celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port
|
+| celeborn.worker.http.proxy.client.ip.header | X-Real-IP | false | The HTTP
header to record the real client IP address. If your server is behind a load
balancer or other proxy, the server will see this load balancer or proxy IP
address as the client IP address, to get around this common issue, most load
balancers or proxies offer the ability to record the real remote IP address in
an HTTP header that will be added to the request for other devices to use. Note
that, because the header v [...]
+| celeborn.worker.http.spnego.keytab | <undefined> | false | The keytab
file for SPNego authentication. | 0.6.0 | |
+| celeborn.worker.http.spnego.principal | <undefined> | false | SPNego
service principal, typical value would look like HTTP/[email protected]. SPNego
service principal would be used when celeborn http authentication is enabled.
This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 |
|
| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop
timeout. | 0.5.0 | |
| celeborn.worker.internal.port | 0 | false | Internal server port on the
Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling
via async_profiler in workers. | 0.5.0 | |
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..27907f5f4
--- /dev/null
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master
+
+import com.google.common.io.Files
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.server.common.HttpService
+import
org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
+
+class ApiMasterResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite {
+ private var master: Master = _
+
+ override protected def httpService: HttpService = master
+
+ def getTmpDir(): String = {
+ val tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ tmpDir.getAbsolutePath
+ }
+
+ override def beforeAll(): Unit = {
+ val randomMasterPort = Utils.selectRandomPort(1024, 65535)
+ val randomHttpPort = randomMasterPort + 1
+ celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
+ celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
+ celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
+ celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+ celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key,
randomHttpPort.toString)
+
+ val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+
+ val masterArgs = new MasterArguments(args, celebornConf)
+ master = new Master(celebornConf, masterArgs)
+ new Thread() {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ }.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ master.rpcEnv.shutdown()
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 6779d1249..87a621320 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -18,14 +18,18 @@
package org.apache.celeborn.server.common
import java.util
+import javax.servlet.DispatcherType
import scala.collection.JavaConverters._
+import org.eclipse.jetty.servlet.FilterHolder
+
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.server.common.http.HttpServer
import org.apache.celeborn.server.common.http.api.ApiRootResource
+import
org.apache.celeborn.server.common.http.authentication.{AuthenticationFilter,
HttpAuthenticationFactory}
import org.apache.celeborn.server.common.service.config.ConfigLevel
abstract class HttpService extends Service with Logging {
@@ -253,11 +257,16 @@ abstract class HttpService extends Service with Logging {
}
protected def startInternal(): Unit = {
- httpServer.addHandler(ApiRootResource.getServletHandler(this))
+ val contextHandler = ApiRootResource.getServletHandler(this)
+ val holder = new FilterHolder(new AuthenticationFilter(conf, serviceName))
+ contextHandler.addFilter(holder, "/*",
util.EnumSet.allOf(classOf[DispatcherType]))
+
httpServer.addHandler(HttpAuthenticationFactory.wrapHandler(contextHandler))
+
httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/",
"/swagger-static/")
httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger")
httpServer.addRedirectHandler("/help", "/swagger")
httpServer.addRedirectHandler("/docs", "/swagger")
+
if (metricsSystem.running) {
metricsSystem.getServletContextHandlers.foreach { handler =>
httpServer.addHandler(handler)
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
new file mode 100644
index 000000000..c475545c7
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpAuthUtils.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+object HttpAuthUtils {
+ // HTTP header used by the server endpoint during an authentication sequence.
+ val WWW_AUTHENTICATE_HEADER = "WWW-Authenticate"
+ // HTTP header used by the client endpoint during an authentication sequence.
+ val AUTHORIZATION_HEADER = "Authorization"
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
new file mode 100644
index 000000000..614719daf
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationAuditLogger.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.authentication.AuthenticationFilter._
+
+object AuthenticationAuditLogger extends Logging {
+ final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
+ override protected def initialValue: StringBuilder = new StringBuilder()
+ }
+
+ def audit(request: HttpServletRequest, response: HttpServletResponse): Unit
= {
+ val sb = AUDIT_BUFFER.get()
+ sb.setLength(0)
+
sb.append(s"user=${HTTP_CLIENT_IDENTIFIER.get()}(auth:${HTTP_AUTH_TYPE.get()})").append("\t")
+ sb.append(s"ip=${HTTP_CLIENT_IP_ADDRESS.get()}").append("\t")
+
sb.append(s"proxyIp=${HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.get()}").append("\t")
+ sb.append(s"method=${request.getMethod}").append("\t")
+ sb.append(s"uri=${request.getRequestURI}").append("\t")
+ sb.append(s"params=${request.getQueryString}").append("\t")
+ sb.append(s"protocol=${request.getProtocol}").append("\t")
+ sb.append(s"status=${response.getStatus}")
+ logInfo(sb.toString())
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
new file mode 100644
index 000000000..650ad268e
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationFilter.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.io.IOException
+import javax.security.sasl.AuthenticationException
+import javax.servlet.{Filter, FilterChain, FilterConfig, ServletException,
ServletRequest, ServletResponse}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import
org.apache.celeborn.common.authentication.HttpAuthSchemes.{HttpAuthScheme, _}
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.Service
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+
+class AuthenticationFilter(conf: CelebornConf, serviceName: String) extends
Filter with Logging {
+ import AuthenticationFilter._
+
+ private[authentication] val authSchemeHandlers =
+ new mutable.HashMap[HttpAuthScheme, AuthenticationHandler]()
+
+ private[authentication] def addAuthHandler(authHandler:
AuthenticationHandler): Unit = {
+ authHandler.init(conf)
+ if (authHandler.authenticationSupported) {
+ if (authSchemeHandlers.contains(authHandler.authScheme)) {
+ logWarning(s"Authentication handler has been defined for scheme
${authHandler.authScheme}")
+ } else {
+ logInfo(s"Add authentication handler
${authHandler.getClass.getSimpleName}" +
+ s" for scheme ${authHandler.authScheme}")
+ authSchemeHandlers.put(authHandler.authScheme, authHandler)
+ }
+ } else {
+ logWarning(s"The authentication handler
${authHandler.getClass.getSimpleName}" +
+ s" for scheme ${authHandler.authScheme} is not supported")
+ }
+ }
+
+ private val authSchemes: Seq[HttpAuthScheme] = serviceName match {
+ case Service.MASTER =>
+
conf.get(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES).map(HttpAuthSchemes.withName)
+ case Service.WORKER =>
+
conf.get(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES).map(HttpAuthSchemes.withName)
+ }
+ private val proxyClientIpHeader: String = serviceName match {
+ case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_PROXY_CLIENT_IP_HEADER)
+ case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_PROXY_CLIENT_IP_HEADER)
+ }
+
+ private def initAuthHandlers(): Unit = {
+ if (authSchemes.contains(HttpAuthSchemes.NEGOTIATE)) {
+ serviceName match {
+ case Service.MASTER =>
+ addAuthHandler(new SpnegoAuthenticationHandler(
+ conf.get(CelebornConf.MASTER_HTTP_SPNEGO_KEYTAB).getOrElse(""),
+ conf.get(CelebornConf.MASTER_HTTP_SPNEGO_PRINCIPAL).getOrElse("")))
+ case Service.WORKER =>
+ addAuthHandler(new SpnegoAuthenticationHandler(
+ conf.get(CelebornConf.WORKER_HTTP_SPNEGO_KEYTAB).getOrElse(""),
+ conf.get(CelebornConf.WORKER_HTTP_SPNEGO_PRINCIPAL).getOrElse("")))
+ }
+ }
+ if (authSchemes.contains(HttpAuthSchemes.BASIC)) {
+ serviceName match {
+ case Service.MASTER =>
+ addAuthHandler(new BasicAuthenticationHandler(
+ conf.get(CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER)))
+ case Service.WORKER =>
+ addAuthHandler(new BasicAuthenticationHandler(
+ conf.get(CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER)))
+ }
+ }
+ if (authSchemes.contains(HttpAuthSchemes.BEARER)) {
+ serviceName match {
+ case Service.MASTER =>
+ addAuthHandler(new BearerAuthenticationHandler(
+ conf.get(CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER)))
+ case Service.WORKER =>
+ addAuthHandler(new BearerAuthenticationHandler(
+ conf.get(CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER)))
+ }
+ }
+ }
+
+ override def init(filterConfig: FilterConfig): Unit = {
+ initAuthHandlers()
+ }
+
+ private[celeborn] def getMatchedHandler(authorization: String):
Option[AuthenticationHandler] = {
+ authSchemeHandlers.values.find(_.matchAuthScheme(authorization))
+ }
+
+ /**
+ * If the request has a valid authentication token it allows the request to
continue to the
+ * target resource, otherwise it triggers an authentication sequence using
the configured
+ * [[AuthenticationHandler]].
+ *
+ * @param request the request object.
+ * @param response the response object.
+ * @param filterChain the filter chain object.
+ * @throws IOException thrown if an IO error occurred.
+ * @throws ServletException thrown if a processing error occurred.
+ */
+ override def doFilter(
+ request: ServletRequest,
+ response: ServletResponse,
+ filterChain: FilterChain): Unit = {
+ val httpRequest = request.asInstanceOf[HttpServletRequest]
+ val httpResponse = response.asInstanceOf[HttpServletResponse]
+
+ if (authSchemeHandlers.isEmpty ||
BYPASS_API_PATHS.contains(httpRequest.getRequestURI)) {
+ filterChain.doFilter(request, response)
+ return
+ }
+
+ val authorization = httpRequest.getHeader(AUTHORIZATION_HEADER)
+ val matchedHandler = getMatchedHandler(authorization).orNull
+ HTTP_CLIENT_IP_ADDRESS.set(httpRequest.getRemoteAddr)
+
HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.set(httpRequest.getHeader(proxyClientIpHeader))
+
+ try {
+ if (matchedHandler == null) {
+ logDebug(s"No auth scheme matched for url:
${httpRequest.getRequestURL}")
+ httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+ httpResponse.sendError(
+ HttpServletResponse.SC_UNAUTHORIZED,
+ s"No auth scheme matched for $authorization")
+ } else {
+ HTTP_AUTH_TYPE.set(matchedHandler.authScheme.toString)
+ HTTP_CLIENT_IDENTIFIER.set(matchedHandler.authenticate(httpRequest,
httpResponse))
+ doFilter(filterChain, httpRequest, httpResponse)
+ }
+ } catch {
+ case e: AuthenticationException =>
+ httpResponse.setStatus(HttpServletResponse.SC_FORBIDDEN)
+ HTTP_CLIENT_IDENTIFIER.remove()
+ HTTP_CLIENT_IP_ADDRESS.remove()
+ HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
+ HTTP_AUTH_TYPE.remove()
+ httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, e.getMessage)
+ } finally {
+ AuthenticationAuditLogger.audit(httpRequest, httpResponse)
+ }
+ }
+
+ /**
+ * Delegates call to the servlet filter chain. Sub-classes my override this
+ * method to perform pre and post tasks.
+ *
+ * @param filterChain the filter chain object.
+ * @param request the request object.
+ * @param response the response object.
+ * @throws IOException thrown if an IO error occurred.
+ * @throws ServletException thrown if a processing error occurred.
+ */
+ @throws[IOException]
+ @throws[ServletException]
+ protected def doFilter(
+ filterChain: FilterChain,
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ filterChain.doFilter(request, response)
+ }
+
+ override def destroy(): Unit = {
+ if (authSchemeHandlers.nonEmpty) {
+ authSchemeHandlers.values.foreach(_.destroy())
+ authSchemeHandlers.clear()
+ }
+ }
+}
+
+object AuthenticationFilter {
+ private val BYPASS_API_PATHS = Set("/openapi.json", "/openapi.yaml")
+
+ final val HTTP_CLIENT_IP_ADDRESS = new ThreadLocal[String]() {
+ override protected def initialValue: String = null
+ }
+ final val HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS = new ThreadLocal[String]() {
+ override protected def initialValue: String = null
+ }
+ final val HTTP_CLIENT_IDENTIFIER = new ThreadLocal[String]() {
+ override protected def initialValue: String = null
+ }
+ final val HTTP_AUTH_TYPE = new ThreadLocal[String]() {
+ override protected def initialValue(): String = null
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
new file mode 100644
index 000000000..8e4076536
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/AuthenticationHandler.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.security.sasl.AuthenticationException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes.HttpAuthScheme
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+
+trait AuthenticationHandler {
+
+ /**
+ * HTTP header prefix used during the authentication sequence.
+ */
+ val authScheme: HttpAuthScheme
+
+ /**
+ * Initializes the authentication handler instance.
+ */
+ def init(conf: CelebornConf): Unit
+
+ /**
+ * Whether this authentication handler is configured well and support
authentication.
+ */
+ def authenticationSupported: Boolean
+
+ /**
+ * Destroys the authentication handler instance.
+ * <p>
+ * This method is invoked by the [[AuthenticationFilter.destroy]] method.
+ */
+ def destroy(): Unit
+
+ /**
+ * Performs an authentication step for the given HTTP client request.
+ * <p>
+ * This method is invoked by the [[AuthenticationFilter]] only if the HTTP
client request is
+ * not yet authenticated.
+ * <p>
+ * Depending upon the authentication mechanism being implemented, a
particular HTTP client may
+ * end up making a sequence of invocations before authentication is
successfully established
+ * (this is the case of Kerberos SPNEGO).
+ * <p>
+ * This method must return a user name only if the the HTTP client request
has been successfully
+ * and fully authenticated.
+ * <p>
+ * If the HTTP client request has not been completely authenticated, this
method must take over
+ * the corresponding HTTP response and it must return <code>null</code>.
+ *
+ * @param request the HTTP client request.
+ * @param response the HTTP client response.
+ * @return the user name
+ * @throws AuthenticationException thrown if an Authentication error
occurred.
+ */
+ def authenticate(request: HttpServletRequest, response:
HttpServletResponse): String
+
+ /**
+ * This method checks if the specified <code>authorization</code> belongs to
the
+ * auth schema of current authentication handler.
+ * @param authorization Authentication header value which is to be compared
with the
+ * authentication scheme.
+ */
+ def matchAuthScheme(authorization: String): Boolean = {
+ if (authorization == null || authorization.isEmpty) {
+ false
+ } else {
+ authorization.trim.regionMatches(true, 0, authScheme.toString, 0,
authScheme.toString.length)
+ }
+ }
+
+ /**
+ * Get the decoded authorization value after auth scheme in Authentication
header.
+ */
+ def getAuthorization(request: HttpServletRequest): String = {
+ val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+ // each http request must have an Authorization header
+ if (authHeader == null || authHeader.isEmpty) {
+ throw new AuthenticationException("Authorization header received from
the client is empty.")
+ }
+
+ var authorization = authHeader.substring(authScheme.toString.length).trim
+ // For thrift http spnego authorization, its format is 'NEGOTIATE :
$token', see HIVE-26353
+ if (authorization.startsWith(":")) {
+ authorization = authorization.stripPrefix(":").trim
+ }
+ // Authorization header must have a payload
+ if (authorization == null || authorization.isEmpty) {
+ throw new AuthenticationException(
+ "Authorization header received from the client does not contain any
data.")
+ }
+ authorization
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
new file mode 100644
index 000000000..72350443b
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BasicAuthenticationHandler.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import
org.apache.celeborn.common.authentication.{AnonymousAuthenticationProviderImpl,
PasswdAuthenticationProvider}
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.{AUTHORIZATION_HEADER,
WWW_AUTHENTICATE_HEADER}
+
+class BasicAuthenticationHandler(providerClass: String) extends
AuthenticationHandler with Logging {
+
+ private var conf: CelebornConf = _
+
+ private val allowAnonymous =
classOf[AnonymousAuthenticationProviderImpl].getName == providerClass
+ override val authScheme: HttpAuthScheme = BASIC
+
+ override def init(conf: CelebornConf): Unit = {
+ this.conf = conf
+ }
+
+ override def authenticationSupported: Boolean = {
+ Option(providerClass).exists { _ =>
+ try {
+
Class.forName(providerClass).isAssignableFrom(classOf[PasswdAuthenticationProvider])
+ true
+ } catch {
+ case _: Throwable => false
+ }
+ }
+ }
+
+ override def matchAuthScheme(authorization: String): Boolean = {
+ if (authorization == null || authorization.isEmpty) {
+ allowAnonymous
+ } else {
+ super.matchAuthScheme(authorization)
+ }
+ }
+
+ override def getAuthorization(request: HttpServletRequest): String = {
+ val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+ if (allowAnonymous && (authHeader == null || authHeader.isEmpty)) {
+ ""
+ } else {
+ super.getAuthorization(request)
+ }
+ }
+
+ override def authenticate(
+ request: HttpServletRequest,
+ response: HttpServletResponse): String = {
+ var authUser: String = null
+
+ val authorization = getAuthorization(request)
+ val inputToken = Option(authorization).map(a =>
Base64.getDecoder.decode(a.getBytes()))
+ .getOrElse(Array.empty[Byte])
+ val creds = new String(inputToken, StandardCharsets.UTF_8).split(":")
+
+ if (allowAnonymous) {
+ authUser =
creds.take(1).headOption.filterNot(_.isEmpty).getOrElse("anonymous")
+ } else {
+ if (creds.size < 2 || creds(0).trim.isEmpty || creds(1).trim.isEmpty) {
+ response.setHeader(WWW_AUTHENTICATE_HEADER, authScheme.toString)
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+ } else {
+ val Seq(user, password) = creds.toSeq.take(2)
+ val passwdAuthenticationProvider = HttpAuthenticationFactory
+ .getPasswordAuthenticationProvider(providerClass, conf)
+ authUser = passwdAuthenticationProvider.authenticate(user,
password).getName
+ response.setStatus(HttpServletResponse.SC_OK)
+ }
+ }
+ authUser
+ }
+
+ override def destroy(): Unit = {}
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
new file mode 100644
index 000000000..63ae38c9d
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/BearerAuthenticationHandler.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.celeborn.common.CelebornConf
+import
org.apache.celeborn.common.authentication.{AnonymousAuthenticationProviderImpl,
TokenAuthenticationProvider}
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.{AUTHORIZATION_HEADER,
WWW_AUTHENTICATE_HEADER}
+
+class BearerAuthenticationHandler(providerClass: String)
+ extends AuthenticationHandler with Logging {
+ private var conf: CelebornConf = _
+ private val allowAnonymous =
classOf[AnonymousAuthenticationProviderImpl].getName == providerClass
+ override val authScheme: HttpAuthScheme = BEARER
+
+ override def init(conf: CelebornConf): Unit = {
+ this.conf = conf
+ }
+
+ override def authenticationSupported: Boolean = {
+ Option(providerClass).exists { _ =>
+ try {
+
Class.forName(providerClass).isAssignableFrom(classOf[TokenAuthenticationProvider])
+ true
+ } catch {
+ case _: Throwable => false
+ }
+ }
+ }
+
+ override def matchAuthScheme(authorization: String): Boolean = {
+ if (authorization == null || authorization.isEmpty) {
+ allowAnonymous
+ } else {
+ super.matchAuthScheme(authorization)
+ }
+ }
+
+ override def getAuthorization(request: HttpServletRequest): String = {
+ val authHeader = request.getHeader(AUTHORIZATION_HEADER)
+ if (allowAnonymous && (authHeader == null || authHeader.isEmpty)) {
+ ""
+ } else {
+ super.getAuthorization(request)
+ }
+ }
+
+ override def authenticate(
+ request: HttpServletRequest,
+ response: HttpServletResponse): String = {
+ var principal: String = null
+ val inputToken = Option(getAuthorization(request))
+ .map(a => Base64.getDecoder.decode(a.getBytes()))
+ .getOrElse(Array.empty[Byte])
+
+ if (!allowAnonymous && inputToken.isEmpty) {
+ response.setHeader(WWW_AUTHENTICATE_HEADER, authScheme.toString)
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+ } else {
+ principal = HttpAuthenticationFactory
+ .getTokenAuthenticationProvider(providerClass, conf)
+ .authenticate(new String(inputToken, StandardCharsets.UTF_8)).getName
+ response.setStatus(HttpServletResponse.SC_OK)
+ }
+ principal
+ }
+
+ override def destroy(): Unit = {}
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
new file mode 100644
index 000000000..aa7e05d0f
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/HttpAuthenticationFactory.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.eclipse.jetty.server.{Handler, Request}
+import org.eclipse.jetty.server.handler.HandlerWrapper
+
+import org.apache.celeborn.common.CelebornConf
+import
org.apache.celeborn.common.authentication.{PasswdAuthenticationProvider,
TokenAuthenticationProvider}
+import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.reflect.DynConstructors
+
+object HttpAuthenticationFactory {
+ def wrapHandler(handler: Handler): HandlerWrapper = {
+ new HandlerWrapper {
+ _handler = handler
+
+ override def handle(
+ target: String,
+ baseRequest: Request,
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+
+ try {
+ handler.handle(target, baseRequest, request, response)
+ } finally {
+ AuthenticationFilter.HTTP_CLIENT_IDENTIFIER.remove()
+ AuthenticationFilter.HTTP_CLIENT_IP_ADDRESS.remove()
+ AuthenticationFilter.HTTP_PROXY_HEADER_CLIENT_IP_ADDRESS.remove()
+ AuthenticationFilter.HTTP_AUTH_TYPE.remove()
+ }
+ }
+
+ override def doStart(): Unit = {
+ super.doStart()
+ handler.start()
+ }
+
+ override def doStop(): Unit = {
+ handler.stop()
+ super.doStop()
+ }
+ }
+ }
+
+ def getPasswordAuthenticationProvider(
+ providerClass: String,
+ conf: CelebornConf): PasswdAuthenticationProvider = {
+ createAuthenticationProvider(providerClass,
classOf[PasswdAuthenticationProvider], conf)
+ }
+
+ def getTokenAuthenticationProvider(
+ providerClass: String,
+ conf: CelebornConf): TokenAuthenticationProvider = {
+ createAuthenticationProvider(providerClass,
classOf[TokenAuthenticationProvider], conf)
+ }
+
+ private def createAuthenticationProvider[T](
+ className: String,
+ expected: Class[T],
+ conf: CelebornConf): T = {
+ try {
+ DynConstructors.builder(expected)
+ .impl(className, classOf[CelebornConf])
+ .impl(className)
+ .buildChecked[T]()
+ .newInstance(conf)
+ } catch {
+ case e: Exception =>
+ throw new CelebornException(
+ s"$className must extend of ${expected.getName}",
+ e)
+ }
+ }
+}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
new file mode 100644
index 000000000..3862f62b2
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/authentication/SpnegoAuthenticationHandler.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.io.{File, IOException}
+import java.security.{PrivilegedActionException, PrivilegedExceptionAction}
+import java.util.Base64
+import javax.security.auth.Subject
+import javax.security.auth.kerberos.{KerberosPrincipal, KeyTab}
+import javax.security.sasl.AuthenticationException
+import javax.servlet.ServletException
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import org.apache.hadoop.security.authentication.util.KerberosName
+import
org.apache.hadoop.security.authentication.util.KerberosUtil.{getTokenServerName,
GSS_KRB5_MECH_OID, GSS_SPNEGO_MECH_OID, NT_GSS_KRB5_PRINCIPAL_OID}
+import org.ietf.jgss.{GSSContext, GSSCredential, GSSManager, Oid}
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import org.apache.celeborn.common.authentication.HttpAuthSchemes._
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.WWW_AUTHENTICATE_HEADER
+
+class SpnegoAuthenticationHandler(keytab: String, principal: String) extends
AuthenticationHandler
+ with Logging {
+ private var gssManager: GSSManager = _
+ private var conf: CelebornConf = _
+ private var serverSubject = new Subject()
+
+ override val authScheme: HttpAuthScheme = HttpAuthSchemes.NEGOTIATE
+
+ override def authenticationSupported: Boolean = {
+ keytab.nonEmpty && principal.nonEmpty
+ }
+
+ override def init(conf: CelebornConf): Unit = {
+ this.conf = conf
+ if (authenticationSupported) {
+ val keytabFile = new File(keytab)
+ if (!keytabFile.exists()) {
+ throw new ServletException(s"Keytab[$keytab] does not exists")
+ }
+ if (!principal.startsWith("HTTP/")) {
+ throw new ServletException(s"SPNEGO principal[$principal] does not
start with HTTP/")
+ }
+
+ logInfo(s"Using keytab $keytab, for principal $principal")
+ serverSubject.getPrivateCredentials().add(KeyTab.getInstance(keytabFile))
+ serverSubject.getPrincipals.add(new KerberosPrincipal(principal))
+
+ // TODO: support to config kerberos.name.rules and
kerberos.rule.mechanism
+ // set default rules if no rules set, otherwise it will throw exception
+ // when parse the kerberos name
+ if (!KerberosName.hasRulesBeenSet) {
+ KerberosName.setRules("DEFAULT")
+ }
+
+ try {
+ gssManager = Subject.doAs(
+ serverSubject,
+ new PrivilegedExceptionAction[GSSManager] {
+ override def run(): GSSManager = {
+ GSSManager.getInstance()
+ }
+ })
+ } catch {
+ case e: PrivilegedActionException => throw e.getException
+ case e: Exception => throw new ServletException(e)
+ }
+ }
+ }
+
+ override def destroy(): Unit = {
+ serverSubject = null
+ }
+
+ override def authenticate(
+ request: HttpServletRequest,
+ response: HttpServletResponse): String = {
+ var authUser: String = null
+ val authorization = getAuthorization(request)
+ val clientToken = Base64.getDecoder.decode(authorization)
+ try {
+ val serverPrincipal = getTokenServerName(clientToken)
+ if (!serverPrincipal.startsWith("HTTP/")) {
+ throw new IllegalArgumentException(
+ s"Invalid server principal $serverPrincipal decoded from client
request")
+ }
+ authUser = Subject.doAs(
+ serverSubject,
+ new PrivilegedExceptionAction[String] {
+ override def run(): String = {
+ runWithPrincipal(serverPrincipal, clientToken, response)
+ }
+ })
+ } catch {
+ case ex: PrivilegedActionException =>
+ ex.getException match {
+ case ioe: IOException =>
+ throw ioe
+ case e: Exception => throw new AuthenticationException("SPNEGO
authentication failed", e)
+ }
+
+ case e: Exception => throw new AuthenticationException("SPNEGO
authentication failed", e)
+ }
+ authUser
+ }
+
+ def runWithPrincipal(
+ serverPrincipal: String,
+ clientToken: Array[Byte],
+ response: HttpServletResponse): String = {
+ var gssContext: GSSContext = null
+ var gssCreds: GSSCredential = null
+ var authUser: String = null
+ try {
+ logDebug(s"SPNEGO initialized with server principal $serverPrincipal")
+ gssCreds = gssManager.createCredential(
+ gssManager.createName(serverPrincipal, NT_GSS_KRB5_PRINCIPAL_OID),
+ GSSCredential.INDEFINITE_LIFETIME,
+ Array[Oid](GSS_SPNEGO_MECH_OID, GSS_KRB5_MECH_OID),
+ GSSCredential.ACCEPT_ONLY)
+ gssContext = gssManager.createContext(gssCreds)
+ val serverToken = gssContext.acceptSecContext(clientToken, 0,
clientToken.length)
+ if (serverToken != null && serverToken.nonEmpty) {
+ val authenticate = Base64.getEncoder.encodeToString(serverToken)
+ response.setHeader(WWW_AUTHENTICATE_HEADER, s"$NEGOTIATE
$authenticate")
+ }
+ if (!gssContext.isEstablished) {
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+ logDebug("SPNEGO in progress")
+ } else {
+ val clientPrincipal = gssContext.getSrcName.toString
+ val kerberosName = new KerberosName(clientPrincipal)
+ val userName = kerberosName.getShortName
+ authUser = userName
+ response.setStatus(HttpServletResponse.SC_OK)
+ logDebug(s"SPNEGO completed for client principal $clientPrincipal")
+ }
+ } finally {
+ if (gssContext != null) {
+ gssContext.dispose()
+ }
+ if (gssCreds != null) {
+ gssCreds.dispose()
+ }
+ }
+ authUser
+ }
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..8b7921777
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceAuthenticationSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import javax.servlet.http.HttpServletResponse
+import javax.ws.rs.core.MediaType
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.authentication.HttpAuthSchemes
+import
org.apache.celeborn.server.common.http.HttpAuthUtils.AUTHORIZATION_HEADER
+import
org.apache.celeborn.server.common.http.authentication.{UserDefinePasswordAuthenticationProviderImpl,
UserDefineTokenAuthenticationProviderImpl}
+
+abstract class ApiBaseResourceAuthenticationSuite extends HttpTestHelper {
+ celebornConf
+ .set(CelebornConf.METRICS_ENABLED.key, "true")
+ .set(
+ CelebornConf.METRICS_CONF.key,
+
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
+ .set(CelebornConf.MASTER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC",
"BEARER"))
+ .set(
+ CelebornConf.MASTER_HTTP_AUTH_BASIC_PROVIDER,
+ classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ .set(
+ CelebornConf.MASTER_HTTP_AUTH_BEARER_PROVIDER,
+ classOf[UserDefineTokenAuthenticationProviderImpl].getName)
+ .set(CelebornConf.WORKER_HTTP_AUTH_SUPPORTED_SCHEMES, Seq("BASIC",
"BEARER"))
+ .set(
+ CelebornConf.WORKER_HTTP_AUTH_BASIC_PROVIDER,
+ classOf[UserDefinePasswordAuthenticationProviderImpl].getName)
+ .set(
+ CelebornConf.WORKER_HTTP_AUTH_BEARER_PROVIDER,
+ classOf[UserDefineTokenAuthenticationProviderImpl].getName)
+
+ def basicAuthorizationHeader(user: String, password: String): String =
+ HttpAuthSchemes.BASIC + " " + new String(
+ Base64.getEncoder.encode(s"$user:$password".getBytes()),
+ StandardCharsets.UTF_8)
+
+ def bearerAuthorizationHeader(token: String): String =
+ HttpAuthSchemes.BEARER + " " + new String(
+ Base64.getEncoder.encode(token.getBytes()),
+ StandardCharsets.UTF_8)
+
+ Seq("conf", "listDynamicConfigs", "workerInfo", "shuffle",
"applications").foreach { api =>
+ test(s"API $api authentication") {
+ var response = webTarget.path(api)
+ .request(MediaType.TEXT_PLAIN)
+ .header(
+ AUTHORIZATION_HEADER,
+ basicAuthorizationHeader(
+ "user",
+ UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD))
+ .get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
+
+ response = webTarget.path(api)
+ .request(MediaType.TEXT_PLAIN)
+ .header(AUTHORIZATION_HEADER, basicAuthorizationHeader("user",
"invalid"))
+ .get()
+ assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
+
+ response = webTarget.path(api)
+ .request(MediaType.TEXT_PLAIN)
+ .header(
+ AUTHORIZATION_HEADER,
+
bearerAuthorizationHeader(UserDefineTokenAuthenticationProviderImpl.VALID_TOKEN))
+ .get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
+
+ response = webTarget.path(api)
+ .request(MediaType.TEXT_PLAIN)
+ .header(AUTHORIZATION_HEADER, bearerAuthorizationHeader("bad_token"))
+ .get()
+ assert(HttpServletResponse.SC_FORBIDDEN == response.getStatus)
+ }
+ }
+
+ test("swagger api do not need authentication") {
+ Seq("swagger", "docs", "help").foreach { path =>
+ val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("swagger-ui"))
+ }
+ Seq(
+ "openapi.json" -> MediaType.APPLICATION_JSON,
+ "openapi.yaml" -> "application/yaml").foreach { case (path, mediaType) =>
+ val response = webTarget.path(path).request(mediaType).get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("/conf"))
+ }
+ }
+
+ test("metrics api do not need authentication") {
+ var response =
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+
+ response =
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("\"name\" :
\"jvm.memory.heap.max\""))
+ }
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
index 632d06c79..28e1ed9ed 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -17,6 +17,7 @@
package org.apache.celeborn.server.common.http
+import javax.servlet.http.HttpServletResponse
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
@@ -29,13 +30,13 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
test("ping") {
val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
assert(response.readEntity(classOf[String]) == "pong")
}
test("conf") {
val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("listDynamicConfigs") {
@@ -43,57 +44,56 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
.queryParam("LEVEL", "TENANT")
.request(MediaType.TEXT_PLAIN)
.get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("workerInfo") {
val response =
webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("threadDump") {
val response =
webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("shuffle") {
val response =
webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("applications") {
val response =
webTarget.path("applications").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("listTopDiskUsedApps") {
val response =
webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
- }
-
- test("openapi.json") {
- val response =
webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get()
- assert(200 == response.getStatus)
- assert(response.readEntity(classOf[String]).contains("/conf"))
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("swagger") {
Seq("swagger", "docs", "help").foreach { path =>
val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
assert(response.readEntity(classOf[String]).contains("swagger-ui"))
}
+ Seq(
+ "openapi.json" -> MediaType.APPLICATION_JSON,
+ "openapi.yaml" -> "application/yaml").foreach { case (path, mediaType) =>
+ val response = webTarget.path(path).request(mediaType).get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
+ assert(response.readEntity(classOf[String]).contains("/conf"))
+ }
}
- test("metrics/prometheus") {
- val response =
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
- assert(200 == response.getStatus)
+ test("metrics") {
+ var response =
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
- }
- test("metrics/json") {
- val response =
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
- assert(200 == response.getStatus)
+ response =
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
+ assert(HttpServletResponse.SC_OK == response.getStatus)
assert(response.readEntity(classOf[String]).contains("\"name\" :
\"jvm.memory.heap.max\""))
}
}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
index 8674fc39c..6a6ee9242 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala
@@ -64,6 +64,7 @@ trait HttpTestHelper extends AnyFunSuite
override def beforeAll(): Unit = {
super.beforeAll()
restApiBaseSuite.setUp()
+ Thread.sleep(1000) // sleep for http server initialization
}
override def afterAll(): Unit = {
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..7009c1302
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefinePasswordAuthenticationProviderImpl.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.security.Principal
+import javax.security.sasl.AuthenticationException
+
+import org.apache.celeborn.common.authentication.{BasicPrincipal,
PasswdAuthenticationProvider}
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.authentication.UserDefinePasswordAuthenticationProviderImpl.VALID_PASSWORD
+
+class UserDefinePasswordAuthenticationProviderImpl
+ extends PasswdAuthenticationProvider with Logging {
+ override def authenticate(user: String, password: String): Principal = {
+ if (password == VALID_PASSWORD) {
+ logInfo(s"Success log in of user: $user")
+ new BasicPrincipal(user)
+ } else {
+ throw new AuthenticationException("Username or password is not valid!")
+ }
+ }
+}
+
+object UserDefinePasswordAuthenticationProviderImpl {
+ val VALID_PASSWORD = "password"
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
new file mode 100644
index 000000000..ab14d9a72
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/authentication/UserDefineTokenAuthenticationProviderImpl.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http.authentication
+
+import java.security.Principal
+import javax.security.sasl.AuthenticationException
+
+import org.apache.celeborn.common.authentication.{BasicPrincipal,
TokenAuthenticationProvider}
+import org.apache.celeborn.common.internal.Logging
+import
org.apache.celeborn.server.common.http.authentication.UserDefineTokenAuthenticationProviderImpl.VALID_TOKEN
+
+class UserDefineTokenAuthenticationProviderImpl extends
TokenAuthenticationProvider with Logging {
+ override def authenticate(token: String): Principal = {
+ if (token == VALID_TOKEN) {
+ logInfo(s"Success log in of token: $token")
+ new BasicPrincipal("user")
+ } else {
+ throw new AuthenticationException("Token is not valid!")
+ }
+ }
+}
+
+object UserDefineTokenAuthenticationProviderImpl {
+ val VALID_TOKEN = "token"
+}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
index ccf5697d4..408e39436 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala
@@ -17,6 +17,7 @@
package org.apache.celeborn.service.deploy.worker.http.api
+import javax.servlet.http.HttpServletResponse
import javax.ws.rs.core.MediaType
import org.apache.celeborn.server.common.HttpService
@@ -44,22 +45,22 @@ class ApiWorkerResourceSuite extends ApiBaseResourceSuite
with MiniClusterFeatur
test("listPartitionLocationInfo") {
val response =
webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("unavailablePeers") {
val response =
webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("isShutdown") {
val response =
webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("isRegistered") {
val response =
webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
- assert(200 == response.getStatus)
+ assert(HttpServletResponse.SC_OK == response.getStatus)
}
test("isDecommissioning") {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
new file mode 100644
index 000000000..36ea0708f
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage
+
+import org.apache.celeborn.common.util.CelebornExitKind
+import org.apache.celeborn.server.common.HttpService
+import
org.apache.celeborn.server.common.http.{ApiBaseResourceAuthenticationSuite,
ApiBaseResourceSuite}
+import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
+
+class ApiWorkerResourceAuthenticationSuite extends
ApiBaseResourceAuthenticationSuite {
+ private var worker: Worker = _
+ override protected def httpService: HttpService = worker
+
+ override def beforeAll(): Unit = {
+ val workerArgs = new WorkerArguments(Array(), celebornConf)
+ worker = new Worker(celebornConf, workerArgs)
+ worker.metricsSystem.start()
+ worker.startHttpServer()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ worker.metricsSystem.stop()
+ worker.rpcEnv.shutdown()
+ worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ }
+}