This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 87b64391e [CELEBORN-1152] fix GetShuffleId RPC NPE for empty shuffle
87b64391e is described below

commit 87b64391ea0bbf33114f5b56a2a71b0e8de4fbdd
Author: Erik.fang <[email protected]>
AuthorDate: Mon Dec 11 20:13:26 2023 +0800

    [CELEBORN-1152] fix GetShuffleId RPC NPE for empty shuffle
    
    ### What changes were proposed in this pull request?
    
    In [celeborn-955](https://github.com/apache/incubator-celeborn/pull/1924),  
GetShuffleId RPC was introduced to generate a celeborn shuffle id from app 
shuffle id to support spark stage rerun
    GetShuffleId RPC assumes that Shuffle Write operation always happens before 
Shuffle Read operation, but this is not true for empty shuffle data in 
celeborn, which causes GetShuffleId RPC to throw NPE and fail the Job
    This PR fixes this bug
    
    ### Why are the changes needed?
    to avoid spark job failure with empty shuffle data
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    a new test case is included for empty shuffle data
    
    Closes #2136 from ErikFang/fix-GetShuffleId-RPC-NPE-for-empty-shuffle.
    
    Lead-authored-by: Erik.fang <[email protected]>
    Co-authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/ShuffleClientImpl.java  |  4 +++
 .../apache/celeborn/client/LifecycleManager.scala  | 36 ++++++++++++++++------
 .../org/apache/celeborn/common/util/Utils.scala    |  2 ++
 .../tests/spark/CelebornFetchFailureSuite.scala    | 21 +++++++++++++
 4 files changed, 53 insertions(+), 10 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index fe7a403ef..c5f463b19 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1629,6 +1629,10 @@ public class ShuffleClientImpl extends ShuffleClient {
       int endMapIndex,
       MetricsCallback metricsCallback)
       throws IOException {
+    if (partitionId == Utils$.MODULE$.UNKNOWN_APP_SHUFFLE_ID()) {
+      logger.warn("Shuffle data is empty for shuffle {}: 
UNKNOWN_APP_SHUFFLE_ID.", shuffleId);
+      return CelebornInputStream.empty();
+    }
     ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
 
     if (fileGroups.partitionGroups.isEmpty()
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ed308840a..fa6face39 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -48,6 +48,7 @@ import 
org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNet
 import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, 
Utils}
 // Can Remove this if celeborn don't support scala211 in future
 import org.apache.celeborn.common.util.FunctionConverter._
+import org.apache.celeborn.common.util.Utils.UNKNOWN_APP_SHUFFLE_ID
 
 object LifecycleManager {
   // shuffle id -> partition id -> partition locations
@@ -653,16 +654,31 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       appShuffleId: Int,
       appShuffleIdentifier: String,
       isWriter: Boolean): Unit = {
-    val shuffleIds = shuffleIdMapping.computeIfAbsent(
-      appShuffleId,
-      new function.Function[Int, 
scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
-        override def apply(id: Int)
-            : scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)] = 
{
-          val newShuffleId = shuffleIdGenerator.getAndIncrement()
-          logInfo(s"generate new shuffleId $newShuffleId for appShuffleId 
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
-          scala.collection.mutable.LinkedHashMap(appShuffleIdentifier -> 
(newShuffleId, true))
-        }
-      })
+    val shuffleIds =
+      if (isWriter) {
+        shuffleIdMapping.computeIfAbsent(
+          appShuffleId,
+          new function.Function[
+            Int,
+            scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
+            override def apply(id: Int)
+                : scala.collection.mutable.LinkedHashMap[String, (Int, 
Boolean)] = {
+              val newShuffleId = shuffleIdGenerator.getAndIncrement()
+              logInfo(s"generate new shuffleId $newShuffleId for appShuffleId 
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
+              scala.collection.mutable.LinkedHashMap(appShuffleIdentifier -> 
(newShuffleId, true))
+            }
+          })
+      } else {
+        shuffleIdMapping.get(appShuffleId)
+      }
+
+    if (shuffleIds == null) {
+      logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data 
for this shuffle")
+      val pbGetShuffleIdResponse =
+        
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
+      context.reply(pbGetShuffleIdResponse)
+      return
+    }
 
     def isAllMaptaskEnd(shuffleId: Int): Boolean = {
       !commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 47c26ae09..10aff0d20 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1020,6 +1020,8 @@ object Utils extends Logging {
   val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
   val COMPATIBLE_HDFS_REGEX = "^[a-zA-Z0-9]+://.*"
 
+  val UNKNOWN_APP_SHUFFLE_ID = -1
+
   def isHdfsPath(path: String): Boolean = {
     path.matches(COMPATIBLE_HDFS_REGEX)
   }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
index 07c6b35eb..ebb916d1a 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
@@ -220,4 +220,25 @@ class CelebornFetchFailureSuite extends AnyFunSuite
 
     sparkSession.stop()
   }
+
+  test("celeborn spark integration test - empty shuffle data") {
+    val sparkConf = new 
SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
+    val sparkSession = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .config("spark.sql.shuffle.partitions", 2)
+      .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
+      .config("spark.celeborn.shuffle.enabled", "true")
+      .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+      .getOrCreate()
+
+    sparkSession.sql("create table if not exists t_1 (a bigint) using parquet")
+    sparkSession.sql("create table if not exists t_2 (a bigint) using parquet")
+    sparkSession.sql("create table if not exists t_3 (a bigint) using parquet")
+    val df1 = sparkSession.table("t_1")
+    val df2 = sparkSession.table("t_2")
+    val df3 = sparkSession.table("t_3")
+    df1.union(df2).union(df3).count()
+
+    sparkSession.stop()
+  }
 }

Reply via email to