This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1ae8eb714 [CELEBORN-655][SPARK] Rename newAppId to appUniqueId
1ae8eb714 is described below
commit 1ae8eb7145e11343f7c3d2e095d2b51b220b8563
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jun 8 22:14:20 2023 +0800
[CELEBORN-655][SPARK] Rename newAppId to appUniqueId
### What changes were proposed in this pull request?
Rename variable `newAppId` to `appUniqueId` in Spark client.
### Why are the changes needed?
Make the variable name intuitive.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1565 from pan3793/CELEBORN-655.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../shuffle/celeborn/HashBasedShuffleWriter.java | 2 +-
.../spark/shuffle/celeborn/RssShuffleManager.java | 23 ++++++++++++++--------
.../apache/spark/shuffle/celeborn/SparkUtils.java | 2 +-
.../spark/shuffle/celeborn/RssShuffleHandle.scala | 2 +-
.../spark/shuffle/celeborn/RssShuffleReader.scala | 2 +-
.../shuffle/celeborn/HashBasedShuffleWriter.java | 2 +-
.../spark/shuffle/celeborn/RssShuffleManager.java | 17 ++++++++--------
.../apache/spark/shuffle/celeborn/SparkUtils.java | 2 +-
.../spark/shuffle/celeborn/RssShuffleHandle.scala | 2 +-
.../spark/shuffle/celeborn/RssShuffleReader.scala | 2 +-
10 files changed, 32 insertions(+), 24 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 9da0438fa..dcb46106d 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -109,7 +109,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throws IOException {
this.mapId = mapId;
this.dep = handle.dependency();
- this.appId = handle.newAppId();
+ this.appId = handle.appUniqueId();
this.shuffleId = dep.shuffleId();
SerializerInstance serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index cf975d983..9049cc3ce 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -47,7 +47,8 @@ public class RssShuffleManager implements ShuffleManager {
private final SparkConf conf;
private final CelebornConf celebornConf;
private final int cores;
- private String newAppId;
+ // either be "{appId}_{appAttemptId}" or "{appId}"
+ private String appUniqueId;
private LifecycleManager lifecycleManager;
private ShuffleClient rssShuffleClient;
@@ -114,11 +115,11 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
- // Note: generate newAppId at driver side, make sure dependency.rdd.context
+ // Note: generate app unique id at driver side, make sure
dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
- newAppId = SparkUtils.genNewAppId(dependency.rdd().context());
- initializeLifecycleManager(newAppId);
+ appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
+ initializeLifecycleManager(appUniqueId);
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
@@ -127,7 +128,7 @@ public class RssShuffleManager implements ShuffleManager {
return sortShuffleManager().registerShuffle(shuffleId, numMaps,
dependency);
} else {
return new RssShuffleHandle<>(
- newAppId,
+ appUniqueId,
lifecycleManager.getRssMetaServiceHost(),
lifecycleManager.getRssMetaServicePort(),
lifecycleManager.getUserIdentifier(),
@@ -142,13 +143,13 @@ public class RssShuffleManager implements ShuffleManager {
if (sortShuffleIds.contains(shuffleId)) {
return sortShuffleManager().unregisterShuffle(shuffleId);
}
- if (newAppId == null) {
+ if (appUniqueId == null) {
return true;
}
if (rssShuffleClient == null) {
return false;
}
- return rssShuffleClient.unregisterShuffle(newAppId, shuffleId, isDriver());
+ return rssShuffleClient.unregisterShuffle(appUniqueId, shuffleId,
isDriver());
}
@Override
@@ -183,7 +184,13 @@ public class RssShuffleManager implements ShuffleManager {
ExecutorService pushThread =
celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
- h.dependency(), h.newAppId(), h.numMaps(), context,
celebornConf, client, pushThread);
+ h.dependency(),
+ h.appUniqueId(),
+ h.numMaps(),
+ context,
+ celebornConf,
+ client,
+ pushThread);
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
return new HashBasedShuffleWriter<>(
h, mapId, context, celebornConf, client,
SendBufferPool.get(cores));
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 90e3e140e..b14b9f02f 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -107,7 +107,7 @@ public class SparkUtils {
return tmpCelebornConf;
}
- public static String genNewAppId(SparkContext context) {
+ public static String appUniqueId(SparkContext context) {
if (context.applicationAttemptId().isDefined()) {
return context.applicationId() + "_" +
context.applicationAttemptId().get();
} else {
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
index cd95054fc..4d3974e9e 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
@@ -23,7 +23,7 @@ import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.celeborn.common.identity.UserIdentifier
class RssShuffleHandle[K, V, C](
- val newAppId: String,
+ val appUniqueId: String,
val rssMetaServiceHost: String,
val rssMetaServicePort: Int,
val userIdentifier: UserIdentifier,
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index 04ef2810a..3d3e0e700 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -63,7 +63,7 @@ class RssShuffleReader[K, C](
if (handle.numMaps > 0) {
val start = System.currentTimeMillis()
val inputStream = essShuffleClient.readPartition(
- handle.newAppId,
+ handle.appUniqueId,
handle.shuffleId,
partitionId,
context.attemptNumber(),
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index f2e2f6cda..7b32c47b5 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -117,7 +117,7 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throws IOException {
this.mapId = taskContext.partitionId();
this.dep = handle.dependency();
- this.appId = handle.newAppId();
+ this.appId = handle.appUniqueId();
this.shuffleId = dep.shuffleId();
SerializerInstance serializer = dep.serializer().newInstance();
this.partitioner = dep.partitioner();
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 2496afb3d..9fbba3b4a 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -45,7 +45,8 @@ public class RssShuffleManager implements ShuffleManager {
private final SparkConf conf;
private final CelebornConf celebornConf;
private final int cores;
- private String newAppId;
+ // either be "{appId}_{appAttemptId}" or "{appId}"
+ private String appUniqueId;
private LifecycleManager lifecycleManager;
private ShuffleClient rssShuffleClient;
@@ -112,11 +113,11 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
- // Note: generate newAppId at driver side, make sure dependency.rdd.context
+ // Note: generate app unique id at driver side, make sure
dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
- newAppId = SparkUtils.genNewAppId(dependency.rdd().context());
- initializeLifecycleManager(newAppId);
+ appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
+ initializeLifecycleManager(appUniqueId);
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
@@ -125,7 +126,7 @@ public class RssShuffleManager implements ShuffleManager {
return sortShuffleManager().registerShuffle(shuffleId, dependency);
} else {
return new RssShuffleHandle<>(
- newAppId,
+ appUniqueId,
lifecycleManager.getRssMetaServiceHost(),
lifecycleManager.getRssMetaServicePort(),
lifecycleManager.getUserIdentifier(),
@@ -140,13 +141,13 @@ public class RssShuffleManager implements ShuffleManager {
if (sortShuffleIds.contains(shuffleId)) {
return sortShuffleManager().unregisterShuffle(shuffleId);
}
- if (newAppId == null) {
+ if (appUniqueId == null) {
return true;
}
if (rssShuffleClient == null) {
return false;
}
- return rssShuffleClient.unregisterShuffle(newAppId, shuffleId, isDriver());
+ return rssShuffleClient.unregisterShuffle(appUniqueId, shuffleId,
isDriver());
}
@Override
@@ -186,7 +187,7 @@ public class RssShuffleManager implements ShuffleManager {
celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
return new SortBasedShuffleWriter<>(
h.dependency(),
- h.newAppId(),
+ h.appUniqueId(),
h.numMappers(),
context,
celebornConf,
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index ae5d86463..1a680299e 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -86,7 +86,7 @@ public class SparkUtils {
return tmpCelebornConf;
}
- public static String genNewAppId(SparkContext context) {
+ public static String appUniqueId(SparkContext context) {
return context
.applicationAttemptId()
.map(id -> context.applicationId() + "_" + id)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
index 8ff6752e8..9ca9e2cc5 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
@@ -23,7 +23,7 @@ import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.celeborn.common.identity.UserIdentifier
class RssShuffleHandle[K, V, C](
- val newAppId: String,
+ val appUniqueId: String,
val rssMetaServiceHost: String,
val rssMetaServicePort: Int,
val userIdentifier: UserIdentifier,
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index afc6df4e7..f647dd29e 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -80,7 +80,7 @@ class RssShuffleReader[K, C](
if (handle.numMappers > 0) {
val start = System.currentTimeMillis()
val inputStream = rssShuffleClient.readPartition(
- handle.newAppId,
+ handle.appUniqueId,
handle.shuffleId,
partitionId,
context.attemptNumber(),