This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ae8eb714 [CELEBORN-655][SPARK] Rename newAppId to appUniqueId
1ae8eb714 is described below

commit 1ae8eb7145e11343f7c3d2e095d2b51b220b8563
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jun 8 22:14:20 2023 +0800

    [CELEBORN-655][SPARK] Rename newAppId to appUniqueId
    
    ### What changes were proposed in this pull request?
    
    Rename variable `newAppId` to `appUniqueId` in Spark client.
    
    ### Why are the changes needed?
    
    Make the variable name intuitive.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1565 from pan3793/CELEBORN-655.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  2 +-
 .../spark/shuffle/celeborn/RssShuffleManager.java  | 23 ++++++++++++++--------
 .../apache/spark/shuffle/celeborn/SparkUtils.java  |  2 +-
 .../spark/shuffle/celeborn/RssShuffleHandle.scala  |  2 +-
 .../spark/shuffle/celeborn/RssShuffleReader.scala  |  2 +-
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  2 +-
 .../spark/shuffle/celeborn/RssShuffleManager.java  | 17 ++++++++--------
 .../apache/spark/shuffle/celeborn/SparkUtils.java  |  2 +-
 .../spark/shuffle/celeborn/RssShuffleHandle.scala  |  2 +-
 .../spark/shuffle/celeborn/RssShuffleReader.scala  |  2 +-
 10 files changed, 32 insertions(+), 24 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 9da0438fa..dcb46106d 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -109,7 +109,7 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       throws IOException {
     this.mapId = mapId;
     this.dep = handle.dependency();
-    this.appId = handle.newAppId();
+    this.appId = handle.appUniqueId();
     this.shuffleId = dep.shuffleId();
     SerializerInstance serializer = dep.serializer().newInstance();
     this.partitioner = dep.partitioner();
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index cf975d983..9049cc3ce 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -47,7 +47,8 @@ public class RssShuffleManager implements ShuffleManager {
   private final SparkConf conf;
   private final CelebornConf celebornConf;
   private final int cores;
-  private String newAppId;
+  // either be "{appId}_{appAttemptId}" or "{appId}"
+  private String appUniqueId;
 
   private LifecycleManager lifecycleManager;
   private ShuffleClient rssShuffleClient;
@@ -114,11 +115,11 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public <K, V, C> ShuffleHandle registerShuffle(
       int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
-    // Note: generate newAppId at driver side, make sure dependency.rdd.context
+    // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
     // is the same SparkContext among different shuffleIds.
     // This method may be called many times.
-    newAppId = SparkUtils.genNewAppId(dependency.rdd().context());
-    initializeLifecycleManager(newAppId);
+    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
+    initializeLifecycleManager(appUniqueId);
 
     if (fallbackPolicyRunner.applyAllFallbackPolicy(
         lifecycleManager, dependency.partitioner().numPartitions())) {
@@ -127,7 +128,7 @@ public class RssShuffleManager implements ShuffleManager {
       return sortShuffleManager().registerShuffle(shuffleId, numMaps, 
dependency);
     } else {
       return new RssShuffleHandle<>(
-          newAppId,
+          appUniqueId,
           lifecycleManager.getRssMetaServiceHost(),
           lifecycleManager.getRssMetaServicePort(),
           lifecycleManager.getUserIdentifier(),
@@ -142,13 +143,13 @@ public class RssShuffleManager implements ShuffleManager {
     if (sortShuffleIds.contains(shuffleId)) {
       return sortShuffleManager().unregisterShuffle(shuffleId);
     }
-    if (newAppId == null) {
+    if (appUniqueId == null) {
       return true;
     }
     if (rssShuffleClient == null) {
       return false;
     }
-    return rssShuffleClient.unregisterShuffle(newAppId, shuffleId, isDriver());
+    return rssShuffleClient.unregisterShuffle(appUniqueId, shuffleId, 
isDriver());
   }
 
   @Override
@@ -183,7 +184,13 @@ public class RssShuffleManager implements ShuffleManager {
           ExecutorService pushThread =
               celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
-              h.dependency(), h.newAppId(), h.numMaps(), context, 
celebornConf, client, pushThread);
+              h.dependency(),
+              h.appUniqueId(),
+              h.numMaps(),
+              context,
+              celebornConf,
+              client,
+              pushThread);
         } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           return new HashBasedShuffleWriter<>(
               h, mapId, context, celebornConf, client, 
SendBufferPool.get(cores));
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 90e3e140e..b14b9f02f 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -107,7 +107,7 @@ public class SparkUtils {
     return tmpCelebornConf;
   }
 
-  public static String genNewAppId(SparkContext context) {
+  public static String appUniqueId(SparkContext context) {
     if (context.applicationAttemptId().isDefined()) {
       return context.applicationId() + "_" + 
context.applicationAttemptId().get();
     } else {
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
index cd95054fc..4d3974e9e 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
@@ -23,7 +23,7 @@ import org.apache.spark.shuffle.BaseShuffleHandle
 import org.apache.celeborn.common.identity.UserIdentifier
 
 class RssShuffleHandle[K, V, C](
-    val newAppId: String,
+    val appUniqueId: String,
     val rssMetaServiceHost: String,
     val rssMetaServicePort: Int,
     val userIdentifier: UserIdentifier,
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index 04ef2810a..3d3e0e700 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -63,7 +63,7 @@ class RssShuffleReader[K, C](
       if (handle.numMaps > 0) {
         val start = System.currentTimeMillis()
         val inputStream = essShuffleClient.readPartition(
-          handle.newAppId,
+          handle.appUniqueId,
           handle.shuffleId,
           partitionId,
           context.attemptNumber(),
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index f2e2f6cda..7b32c47b5 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -117,7 +117,7 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       throws IOException {
     this.mapId = taskContext.partitionId();
     this.dep = handle.dependency();
-    this.appId = handle.newAppId();
+    this.appId = handle.appUniqueId();
     this.shuffleId = dep.shuffleId();
     SerializerInstance serializer = dep.serializer().newInstance();
     this.partitioner = dep.partitioner();
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 2496afb3d..9fbba3b4a 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -45,7 +45,8 @@ public class RssShuffleManager implements ShuffleManager {
   private final SparkConf conf;
   private final CelebornConf celebornConf;
   private final int cores;
-  private String newAppId;
+  // either be "{appId}_{appAttemptId}" or "{appId}"
+  private String appUniqueId;
 
   private LifecycleManager lifecycleManager;
   private ShuffleClient rssShuffleClient;
@@ -112,11 +113,11 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public <K, V, C> ShuffleHandle registerShuffle(
       int shuffleId, ShuffleDependency<K, V, C> dependency) {
-    // Note: generate newAppId at driver side, make sure dependency.rdd.context
+    // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
     // is the same SparkContext among different shuffleIds.
     // This method may be called many times.
-    newAppId = SparkUtils.genNewAppId(dependency.rdd().context());
-    initializeLifecycleManager(newAppId);
+    appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
+    initializeLifecycleManager(appUniqueId);
 
     if (fallbackPolicyRunner.applyAllFallbackPolicy(
         lifecycleManager, dependency.partitioner().numPartitions())) {
@@ -125,7 +126,7 @@ public class RssShuffleManager implements ShuffleManager {
       return sortShuffleManager().registerShuffle(shuffleId, dependency);
     } else {
       return new RssShuffleHandle<>(
-          newAppId,
+          appUniqueId,
           lifecycleManager.getRssMetaServiceHost(),
           lifecycleManager.getRssMetaServicePort(),
           lifecycleManager.getUserIdentifier(),
@@ -140,13 +141,13 @@ public class RssShuffleManager implements ShuffleManager {
     if (sortShuffleIds.contains(shuffleId)) {
       return sortShuffleManager().unregisterShuffle(shuffleId);
     }
-    if (newAppId == null) {
+    if (appUniqueId == null) {
       return true;
     }
     if (rssShuffleClient == null) {
       return false;
     }
-    return rssShuffleClient.unregisterShuffle(newAppId, shuffleId, isDriver());
+    return rssShuffleClient.unregisterShuffle(appUniqueId, shuffleId, 
isDriver());
   }
 
   @Override
@@ -186,7 +187,7 @@ public class RssShuffleManager implements ShuffleManager {
               celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
               h.dependency(),
-              h.newAppId(),
+              h.appUniqueId(),
               h.numMappers(),
               context,
               celebornConf,
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index ae5d86463..1a680299e 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -86,7 +86,7 @@ public class SparkUtils {
     return tmpCelebornConf;
   }
 
-  public static String genNewAppId(SparkContext context) {
+  public static String appUniqueId(SparkContext context) {
     return context
         .applicationAttemptId()
         .map(id -> context.applicationId() + "_" + id)
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
index 8ff6752e8..9ca9e2cc5 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleHandle.scala
@@ -23,7 +23,7 @@ import org.apache.spark.shuffle.BaseShuffleHandle
 import org.apache.celeborn.common.identity.UserIdentifier
 
 class RssShuffleHandle[K, V, C](
-    val newAppId: String,
+    val appUniqueId: String,
     val rssMetaServiceHost: String,
     val rssMetaServicePort: Int,
     val userIdentifier: UserIdentifier,
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index afc6df4e7..f647dd29e 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -80,7 +80,7 @@ class RssShuffleReader[K, C](
       if (handle.numMappers > 0) {
         val start = System.currentTimeMillis()
         val inputStream = rssShuffleClient.readPartition(
-          handle.newAppId,
+          handle.appUniqueId,
           handle.shuffleId,
           partitionId,
           context.attemptNumber(),

Reply via email to