cfmcgrady commented on PR #1807:
URL:
https://github.com/apache/incubator-celeborn/pull/1807#issuecomment-1677729733
a test case to reproduce this bug.
```diff
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index 305f83bc..0353fdb8 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
@@ -272,6 +273,7 @@ public class SortBasedPusher extends MemoryConsumer {
executorService.submit(
() -> {
try {
+ SortBasedPusher.setPushFlag();
pushData();
asyncPushing = false;
} catch (IOException ie) {
@@ -456,4 +458,10 @@ public class SortBasedPusher extends MemoryConsumer {
public long getUsed() {
return super.getUsed();
}
+
+ public final static AtomicBoolean pushFlag = new AtomicBoolean(false);
+
+ public static void setPushFlag() {
+ pushFlag.set(true);
+ }
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
index 5288ac7a..3d21728c 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
@@ -17,14 +17,14 @@
package org.apache.celeborn.tests.spark
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
-
import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
+import org.apache.spark.shuffle.celeborn.SortBasedPusher
class CelebornPipelineSortSuite extends AnyFunSuite
with SparkTestBase
@@ -57,4 +57,32 @@ class CelebornPipelineSortSuite extends AnyFunSuite
ss.stop()
}
+
+
+ test("celeborn spark integration test - pipeline bug") {
+ val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[8]")
+ .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}",
"true")
+
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}",
"true")
+ .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_MEMORY_THRESHOLD.key}",
"128m")
+
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
+ .getOrCreate()
+ val t = new Thread() {
+ override def run(): Unit = {
+ while(!SortBasedPusher.pushFlag.get()) {}
+ println("try cancel job.")
+ Thread.sleep(1000)
+ ss.sparkContext.cancelJobGroup("test pipeline")
+ }
+ }
+ t.start()
+ ss.sparkContext.setJobGroup("test pipeline", "test pipeline", true)
+ try {
+ ss.sparkContext.parallelize(1 to 24000, 8).flatMap(_ => (1 to
4800).iterator.map(n => n)).repartition(128).count()
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ }
+ }
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]