This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 782393af0 [CELEBORN-1748] Deprecate identity provider configs tied
with quota
782393af0 is described below
commit 782393af05a6567505e7be541ab91a9817c29c66
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Dec 4 09:28:40 2024 +0800
[CELEBORN-1748] Deprecate identity provider configs tied with quota
### What changes were proposed in this pull request?
Deprecate identity configs related with quota –
```
"celeborn.quota.identity.provider"
"celeborn.quota.identity.user-specific.tenant"
"celeborn.quota.identity.user-specific.userName"
```
In favour of identity configs independent of quota
```
"celeborn.identity.provider"
"celeborn.identity.user-specific.tenant"
"celeborn.identity.user-specific.userName"
```
### Why are the changes needed?
Current identity configs are tied with quota but identity should be free of
quota because other pieces like tags are also using it. In future other new
components can also make use of identity.
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Existing UTs
Closes #2952 from s0nskar/fix_identity.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../hadoop/mapred/CelebornMapOutputCollector.java | 3 +-
.../task/reduce/CelebornShuffleConsumer.java | 2 +-
.../org/apache/celeborn/common/CelebornConf.scala | 85 +++++++++++++---------
.../common/identity/DefaultIdentityProvider.scala | 4 +-
.../common/identity/IdentityProvider.scala | 2 +-
.../apache/celeborn/common/util/UtilsSuite.scala | 2 +-
docs/configuration/client.md | 6 +-
docs/configuration/quota.md | 3 -
8 files changed, 61 insertions(+), 46 deletions(-)
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
index 1c2f57809..334636818 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornMapOutputCollector.java
@@ -66,8 +66,7 @@ public class CelebornMapOutputCollector<K extends Object, V
extends Object>
applicationAttemptId,
IOBufferSize);
UserIdentifier userIdentifier =
- new UserIdentifier(
- celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName());
+ new UserIdentifier(celebornConf.userSpecificTenant(),
celebornConf.userSpecificUserName());
final float spiller = jobConf.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,
(float) 0.8);
int pushSize = (int) ((IOBufferSize << 20) * spiller);
diff --git
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
index 052b340ab..778e243fa 100644
---
a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
+++
b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java
@@ -80,7 +80,7 @@ public class CelebornShuffleConsumer<K, V>
lmPort,
celebornConf,
new UserIdentifier(
- celebornConf.quotaUserSpecificTenant(),
celebornConf.quotaUserSpecificUserName()));
+ celebornConf.userSpecificTenant(),
celebornConf.userSpecificUserName()));
this.merger =
new MergeManagerImpl<>(
reduceId,
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 32cf2ebfa..cbc66c404 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -28,7 +28,7 @@ import scala.util.Try
import scala.util.matching.Regex
import
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
-import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
IdentityProvider}
+import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
HadoopBasedIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
import org.apache.celeborn.common.network.util.ByteUnit
@@ -884,11 +884,15 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// Quota //
// //////////////////////////////////////////////////////
def quotaEnabled: Boolean = get(QUOTA_ENABLED)
- def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
- def quotaUserSpecificTenant: String = get(QUOTA_USER_SPECIFIC_TENANT)
- def quotaUserSpecificUserName: String = get(QUOTA_USER_SPECIFIC_USERNAME)
def quotaInterruptShuffleEnabled: Boolean =
get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)
+ // //////////////////////////////////////////////////////
+ // Identity //
+ // //////////////////////////////////////////////////////
+ def identityProviderClass: String = get(IDENTITY_PROVIDER)
+ def userSpecificTenant: String = get(USER_SPECIFIC_TENANT)
+ def userSpecificUserName: String = get(USER_SPECIFIC_USERNAME)
+
// //////////////////////////////////////////////////////
// Client //
// //////////////////////////////////////////////////////
@@ -1589,7 +1593,19 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.worker.congestionControl.high.watermark",
"0.6.0",
- "Please use
celeborn.worker.congestionControl.diskBuffer.high.watermark"))
+ "Please use
celeborn.worker.congestionControl.diskBuffer.high.watermark"),
+ DeprecatedConfig(
+ "celeborn.quota.identity.provider",
+ "0.6.0",
+ "Please use celeborn.identity.provider"),
+ DeprecatedConfig(
+ "celeborn.quota.identity.user-specific.tenant",
+ "0.6.0",
+ "Please use celeborn.identity.user-specific.tenant"),
+ DeprecatedConfig(
+ "celeborn.quota.identity.user-specific.userName",
+ "0.6.0",
+ "Please use celeborn.identity.user-specific.userName"))
Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
@@ -5324,6 +5340,37 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val IDENTITY_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.identity.provider")
+ .withAlternative("celeborn.quota.identity.provider")
+ .categories("client")
+ .doc(s"IdentityProvider class name. Default class is " +
+ s"`${classOf[DefaultIdentityProvider].getName}`. " +
+ s"Optional values: " +
+ s"${classOf[HadoopBasedIdentityProvider].getName} user name will be
obtained by UserGroupInformation.getUserName; " +
+ s"${classOf[DefaultIdentityProvider].getName} user name and tenant id
are default values or user-specific values.")
+ .version("0.6.0")
+ .stringConf
+ .createWithDefault(classOf[DefaultIdentityProvider].getName)
+
+ val USER_SPECIFIC_TENANT: ConfigEntry[String] =
+ buildConf("celeborn.identity.user-specific.tenant")
+ .withAlternative("celeborn.quota.identity.user-specific.tenant")
+ .categories("client")
+ .doc(s"Tenant id if ${IDENTITY_PROVIDER.key} is
${classOf[DefaultIdentityProvider].getName}.")
+ .version("0.6.0")
+ .stringConf
+ .createWithDefault(IdentityProvider.DEFAULT_TENANT_ID)
+
+ val USER_SPECIFIC_USERNAME: ConfigEntry[String] =
+ buildConf("celeborn.identity.user-specific.userName")
+ .withAlternative("celeborn.quota.identity.user-specific.userName")
+ .categories("client")
+ .doc(s"User name if ${IDENTITY_PROVIDER.key} is
${classOf[DefaultIdentityProvider].getName}.")
+ .version("0.6.0")
+ .stringConf
+ .createWithDefault(IdentityProvider.DEFAULT_USERNAME)
+
val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota", "master", "client")
@@ -5337,18 +5384,6 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
- val QUOTA_IDENTITY_PROVIDER: ConfigEntry[String] =
- buildConf("celeborn.quota.identity.provider")
- .categories("quota", "client")
- .doc(s"IdentityProvider class name. Default class is " +
- s"`${classOf[DefaultIdentityProvider].getName}`. " +
- s"Optional values: " +
- s"org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user
name will be obtained by UserGroupInformation.getUserName; " +
- s"org.apache.celeborn.common.identity.DefaultIdentityProvider user
name and tenant id are default values or user-specific values.")
- .version("0.2.0")
- .stringConf
- .createWithDefault(classOf[DefaultIdentityProvider].getName)
-
val CONTAINER_INFO_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.container.info.provider")
.categories("master", "worker")
@@ -5358,22 +5393,6 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault("org.apache.celeborn.server.common.container.DefaultContainerInfoProvider")
- val QUOTA_USER_SPECIFIC_TENANT: ConfigEntry[String] =
- buildConf("celeborn.quota.identity.user-specific.tenant")
- .categories("quota", "client")
- .doc(s"Tenant id if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider.")
- .version("0.3.0")
- .stringConf
- .createWithDefault(IdentityProvider.DEFAULT_TENANT_ID)
-
- val QUOTA_USER_SPECIFIC_USERNAME: ConfigEntry[String] =
- buildConf("celeborn.quota.identity.user-specific.userName")
- .categories("quota", "client")
- .doc(s"User name if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider.")
- .version("0.3.0")
- .stringConf
- .createWithDefault(IdentityProvider.DEFAULT_USERNAME)
-
val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
index 76b89b331..d9f1a10ad 100644
---
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
@@ -23,7 +23,7 @@ class DefaultIdentityProvider extends IdentityProvider {
override def provide(): UserIdentifier = {
val conf = new CelebornConf()
UserIdentifier(
- conf.quotaUserSpecificTenant,
- conf.quotaUserSpecificUserName)
+ conf.userSpecificTenant,
+ conf.userSpecificUserName)
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/identity/IdentityProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/identity/IdentityProvider.scala
index f734ffffe..5d4541840 100644
---
a/common/src/main/scala/org/apache/celeborn/common/identity/IdentityProvider.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/identity/IdentityProvider.scala
@@ -30,6 +30,6 @@ object IdentityProvider extends Logging {
val DEFAULT_USERNAME = "default"
def instantiate(conf: CelebornConf): IdentityProvider = {
- Utils.instantiate[IdentityProvider](conf.quotaIdentityProviderClass)
+ Utils.instantiate[IdentityProvider](conf.identityProviderClass)
}
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index 2c0000fd9..afc74707f 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -248,7 +248,7 @@ class UtilsSuite extends CelebornFunSuite {
test("test instantiate") {
val celebornConf = new CelebornConf()
-
assert(Utils.instantiate[DefaultIdentityProvider](celebornConf.quotaIdentityProviderClass)
+
assert(Utils.instantiate[DefaultIdentityProvider](celebornConf.identityProviderClass)
.isInstanceOf[DefaultIdentityProvider])
}
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 606c216b0..f035713eb 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -121,12 +121,12 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always
use spark built-in shuffle implementation. This configuration is deprecated,
consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. |
0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge.
This configuration only takes effect when
celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 |
celeborn.shuffle.writer |
| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable
stage rerun. If true, client throws FetchFailedException instead of
CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
+| celeborn.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0.6.0 | [...]
+| celeborn.identity.user-specific.tenant | default | false | Tenant id if
celeborn.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 |
celeborn.quota.identity.user-specific.tenant |
+| celeborn.identity.user-specific.userName | default | false | User name if
celeborn.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.6.0 |
celeborn.quota.identity.user-specific.userName |
| celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of
master nodes for celeborn clients to connect. Client uses resolver provided by
celeborn.master.endpoints.resolver to resolve the master endpoints. By default
Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver`
which take static master endpoints as input. Allowed pattern:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used [...]
| celeborn.master.endpoints.resolver |
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false |
Resolver class that can be used for discovering and updating the master
endpoints. This allows users to provide a custom master endpoint resolver
implementation. This is useful in environments where the master nodes might
change due to scaling operations or infrastructure updates. Clients need to
ensure that provided resolver class should be present in the classpath. | 0.6.0
| |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service when Master side checks that there is no enough quota for
current user. | 0.2.0 | |
-| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0 [...]
-| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
-| celeborn.quota.identity.user-specific.userName | default | false | User name
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable
interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
diff --git a/docs/configuration/quota.md b/docs/configuration/quota.md
index 9e362ae7e..b1fa4f940 100644
--- a/docs/configuration/quota.md
+++ b/docs/configuration/quota.md
@@ -20,9 +20,6 @@ license: |
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service when Master side checks that there is no enough quota for
current user. | 0.2.0 | |
-| celeborn.quota.identity.provider |
org.apache.celeborn.common.identity.DefaultIdentityProvider | false |
IdentityProvider class name. Default class is
`org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values:
org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will
be obtained by UserGroupInformation.getUserName;
org.apache.celeborn.common.identity.DefaultIdentityProvider user name and
tenant id are default values or user-specific values. | 0 [...]
-| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
-| celeborn.quota.identity.user-specific.userName | default | false | User name
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable
interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota
dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota
dynamic configuration for written disk file count. | 0.5.0 | |