This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 87b64391e [CELEBORN-1152] fix GetShuffleId RPC NPE for empty shuffle
87b64391e is described below
commit 87b64391ea0bbf33114f5b56a2a71b0e8de4fbdd
Author: Erik.fang <[email protected]>
AuthorDate: Mon Dec 11 20:13:26 2023 +0800
[CELEBORN-1152] fix GetShuffleId RPC NPE for empty shuffle
### What changes were proposed in this pull request?
In [celeborn-955](https://github.com/apache/incubator-celeborn/pull/1924),
GetShuffleId RPC was introduced to generate a celeborn shuffle id from app
shuffle id to support spark stage rerun
GetShuffleId RPC assumes that Shuffle Write operation always happens before
Shuffle Read operation, but this is not true for empty shuffle data in
celeborn, which causes GetShuffleId RPC to throw NPE and fail the Job
This PR fixes this bug
### Why are the changes needed?
to avoid spark job failure with empty shuffle data
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
a new test case is included for empty shuffle data
Closes #2136 from ErikFang/fix-GetShuffleId-RPC-NPE-for-empty-shuffle.
Lead-authored-by: Erik.fang <[email protected]>
Co-authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/ShuffleClientImpl.java | 4 +++
.../apache/celeborn/client/LifecycleManager.scala | 36 ++++++++++++++++------
.../org/apache/celeborn/common/util/Utils.scala | 2 ++
.../tests/spark/CelebornFetchFailureSuite.scala | 21 +++++++++++++
4 files changed, 53 insertions(+), 10 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index fe7a403ef..c5f463b19 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1629,6 +1629,10 @@ public class ShuffleClientImpl extends ShuffleClient {
int endMapIndex,
MetricsCallback metricsCallback)
throws IOException {
+ if (partitionId == Utils$.MODULE$.UNKNOWN_APP_SHUFFLE_ID()) {
+ logger.warn("Shuffle data is empty for shuffle {}:
UNKNOWN_APP_SHUFFLE_ID.", shuffleId);
+ return CelebornInputStream.empty();
+ }
ReduceFileGroups fileGroups = loadFileGroup(shuffleId, partitionId);
if (fileGroups.partitionGroups.isEmpty()
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ed308840a..fa6face39 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -48,6 +48,7 @@ import
org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNet
import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils,
Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
+import org.apache.celeborn.common.util.Utils.UNKNOWN_APP_SHUFFLE_ID
object LifecycleManager {
// shuffle id -> partition id -> partition locations
@@ -653,16 +654,31 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
appShuffleId: Int,
appShuffleIdentifier: String,
isWriter: Boolean): Unit = {
- val shuffleIds = shuffleIdMapping.computeIfAbsent(
- appShuffleId,
- new function.Function[Int,
scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
- override def apply(id: Int)
- : scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)] =
{
- val newShuffleId = shuffleIdGenerator.getAndIncrement()
- logInfo(s"generate new shuffleId $newShuffleId for appShuffleId
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
- scala.collection.mutable.LinkedHashMap(appShuffleIdentifier ->
(newShuffleId, true))
- }
- })
+ val shuffleIds =
+ if (isWriter) {
+ shuffleIdMapping.computeIfAbsent(
+ appShuffleId,
+ new function.Function[
+ Int,
+ scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() {
+ override def apply(id: Int)
+ : scala.collection.mutable.LinkedHashMap[String, (Int,
Boolean)] = {
+ val newShuffleId = shuffleIdGenerator.getAndIncrement()
+ logInfo(s"generate new shuffleId $newShuffleId for appShuffleId
$appShuffleId appShuffleIdentifier $appShuffleIdentifier")
+ scala.collection.mutable.LinkedHashMap(appShuffleIdentifier ->
(newShuffleId, true))
+ }
+ })
+ } else {
+ shuffleIdMapping.get(appShuffleId)
+ }
+
+ if (shuffleIds == null) {
+ logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data
for this shuffle")
+ val pbGetShuffleIdResponse =
+
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
+ context.reply(pbGetShuffleIdResponse)
+ return
+ }
def isAllMaptaskEnd(shuffleId: Int): Boolean = {
!commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 47c26ae09..10aff0d20 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1020,6 +1020,8 @@ object Utils extends Logging {
val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
val COMPATIBLE_HDFS_REGEX = "^[a-zA-Z0-9]+://.*"
+ val UNKNOWN_APP_SHUFFLE_ID = -1
+
def isHdfsPath(path: String): Boolean = {
path.matches(COMPATIBLE_HDFS_REGEX)
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
index 07c6b35eb..ebb916d1a 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala
@@ -220,4 +220,25 @@ class CelebornFetchFailureSuite extends AnyFunSuite
sparkSession.stop()
}
+
+ test("celeborn spark integration test - empty shuffle data") {
+ val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2,3]")
+ val sparkSession = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .config("spark.sql.shuffle.partitions", 2)
+ .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false)
+ .config("spark.celeborn.shuffle.enabled", "true")
+ .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true")
+ .getOrCreate()
+
+ sparkSession.sql("create table if not exists t_1 (a bigint) using parquet")
+ sparkSession.sql("create table if not exists t_2 (a bigint) using parquet")
+ sparkSession.sql("create table if not exists t_3 (a bigint) using parquet")
+ val df1 = sparkSession.table("t_1")
+ val df2 = sparkSession.table("t_2")
+ val df3 = sparkSession.table("t_3")
+ df1.union(df2).union(df3).count()
+
+ sparkSession.stop()
+ }
}