cfmcgrady commented on PR #1807:
URL: 
https://github.com/apache/incubator-celeborn/pull/1807#issuecomment-1677729733

   a test case to reproduce this bug.
   
   ```diff
   diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
   index 305f83bc..0353fdb8 100644
   --- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
   +++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
   @@ -21,6 +21,7 @@ import java.io.IOException;
    import java.util.LinkedList;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
   +import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.LongAdder;
    import java.util.function.Consumer;
    
   @@ -272,6 +273,7 @@ public class SortBasedPusher extends MemoryConsumer {
        executorService.submit(
            () -> {
              try {
   +            SortBasedPusher.setPushFlag();
                pushData();
                asyncPushing = false;
              } catch (IOException ie) {
   @@ -456,4 +458,10 @@ public class SortBasedPusher extends MemoryConsumer {
      public long getUsed() {
        return super.getUsed();
      }
   +
   +  public final static AtomicBoolean pushFlag = new AtomicBoolean(false);
   +
   +  public static void setPushFlag() {
   +    pushFlag.set(true);
   +  }
    }
   diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
   index 5288ac7a..3d21728c 100644
   --- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
   +++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
   @@ -17,14 +17,14 @@
    
    package org.apache.celeborn.tests.spark
    
   -import org.apache.spark.SparkConf
   +import org.apache.spark.{SparkConf, TaskContext}
    import org.apache.spark.sql.SparkSession
    import org.scalatest.BeforeAndAfterEach
    import org.scalatest.funsuite.AnyFunSuite
   -
    import org.apache.celeborn.client.ShuffleClient
    import org.apache.celeborn.common.CelebornConf
    import org.apache.celeborn.common.protocol.ShuffleMode
   +import org.apache.spark.shuffle.celeborn.SortBasedPusher
    
    class CelebornPipelineSortSuite extends AnyFunSuite
      with SparkTestBase
   @@ -57,4 +57,32 @@ class CelebornPipelineSortSuite extends AnyFunSuite
    
        ss.stop()
      }
   +
   +
   +  test("celeborn spark integration test - pipeline bug") {
   +    val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[8]")
   +      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"true")
   +      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"true")
   +      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_MEMORY_THRESHOLD.key}", 
"128m")
   +
   +    val ss = SparkSession.builder()
   +      .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
   +      .getOrCreate()
   +    val t = new Thread() {
   +      override def run(): Unit = {
   +        while(!SortBasedPusher.pushFlag.get()) {}
   +        println("try cancel job.")
   +        Thread.sleep(1000)
   +        ss.sparkContext.cancelJobGroup("test pipeline")
   +      }
   +    }
   +    t.start()
   +    ss.sparkContext.setJobGroup("test pipeline", "test pipeline", true)
   +    try {
   +      ss.sparkContext.parallelize(1 to 24000, 8).flatMap(_ => (1 to 
4800).iterator.map(n => n)).repartition(128).count()
   +    } catch {
   +      case e: Exception =>
   +        e.printStackTrace()
   +    }
   +  }
    }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to