This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 77e468161 [CELEBORN-891] Remove pipeline feature for sort based writer
77e468161 is described below

commit 77e468161d5ed828530c6832610cb366c6f224da
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 1 10:42:17 2024 +0800

    [CELEBORN-891] Remove pipeline feature for sort based writer
    
    ### What changes were proposed in this pull request?
    
    Remove pipeline feature for sort based writer
    
    ### Why are the changes needed?
    
    The pipeline feature is added as part of CELEBORN-295, for performance. 
Eventually, an unresolvable issue that would crash the JVM was identified in 
https://github.com/apache/incubator-celeborn/pull/1807, and after discussion, 
we decided to delete this feature.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the pipeline feature is disabled by default, there are no changes to 
users who use the default settings.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #2196 from pan3793/CELEBORN-891.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../spark/shuffle/celeborn/SortBasedPusher.java    | 158 +++++++--------------
 .../shuffle/celeborn/SortBasedPusherSuiteJ.java    |   2 -
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 134 +++++------------
 .../shuffle/celeborn/SparkShuffleManager.java      |  24 ----
 .../celeborn/SortBasedShuffleWriterSuiteJ.java     |   1 -
 .../celeborn/CelebornShuffleManagerSuite.scala     |   2 -
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 133 +++++------------
 .../shuffle/celeborn/SparkShuffleManager.java      |  25 ----
 .../celeborn/SortBasedShuffleWriterSuiteJ.java     |   2 +-
 .../org/apache/celeborn/common/util/JavaUtils.java |   6 -
 .../org/apache/celeborn/common/CelebornConf.scala  |  16 +--
 docs/configuration/client.md                       |   3 +-
 .../tests/spark/CelebornPipelineSortSuite.scala    |  60 --------
 ...lineSortSuite.scala => CelebornSortSuite.scala} |   3 +-
 14 files changed, 124 insertions(+), 445 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index aff8e7ce3..3b051c3e7 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
@@ -69,12 +68,8 @@ public class SortBasedPusher extends MemoryConsumer {
   private final int numPartitions;
   private final Consumer<Integer> afterPush;
   private final LongAdder[] mapStatusLengths;
-  // this lock is shared between different SortBasedPushers to synchronize 
pushData
-  private final Object sharedPushLock;
-  private volatile boolean asyncPushing = false;
   private int[] shuffledPartitions = null;
   private int[] inversedShuffledPartitions = null;
-  private final ExecutorService executorService;
   private final SendBufferPool sendBufferPool;
 
   public SortBasedPusher(
@@ -91,8 +86,6 @@ public class SortBasedPusher extends MemoryConsumer {
       Consumer<Integer> afterPush,
       LongAdder[] mapStatusLengths,
       long pushSortMemoryThreshold,
-      Object sharedPushLock,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool) {
     super(
         memoryManager,
@@ -144,83 +137,74 @@ public class SortBasedPusher extends MemoryConsumer {
     int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
     this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
     this.peakMemoryUsedBytes = getMemoryUsage();
-    this.sharedPushLock = sharedPushLock;
-    this.executorService = executorService;
   }
 
   public long pushData() throws IOException {
-    // pushData should be synchronized between pushers
-    synchronized (sharedPushLock) {
-      final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
-          inMemSorter.getSortedIterator();
-
-      byte[] dataBuf = new byte[pushBufferMaxSize];
-      int offSet = 0;
-      int currentPartition = -1;
-      while (sortedRecords.hasNext()) {
-        sortedRecords.loadNext();
-        final int partition =
-            shuffledPartitions != null
-                ? 
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
-                : sortedRecords.packedRecordPointer.getPartitionId();
-        if (partition != currentPartition) {
-          if (currentPartition == -1) {
-            currentPartition = partition;
-          } else {
-            int bytesWritten =
-                shuffleClient.mergeData(
-                    shuffleId,
-                    mapId,
-                    attemptNumber,
-                    currentPartition,
-                    dataBuf,
-                    0,
-                    offSet,
-                    numMappers,
-                    numPartitions);
-            mapStatusLengths[currentPartition].add(bytesWritten);
-            afterPush.accept(bytesWritten);
-            currentPartition = partition;
-            offSet = 0;
-          }
-        }
-        final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
-        final Object recordPage = taskMemoryManager.getPage(recordPointer);
-        final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
-        int recordSize = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
-
-        if (offSet + recordSize > dataBuf.length) {
-          try {
-            dataPusher.addTask(partition, dataBuf, offSet);
-          } catch (InterruptedException e) {
-            TaskInterruptedHelper.throwTaskKillException();
-          }
+    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+        inMemSorter.getSortedIterator();
+
+    byte[] dataBuf = new byte[pushBufferMaxSize];
+    int offSet = 0;
+    int currentPartition = -1;
+    while (sortedRecords.hasNext()) {
+      sortedRecords.loadNext();
+      final int partition =
+          shuffledPartitions != null
+              ? 
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+              : sortedRecords.packedRecordPointer.getPartitionId();
+      if (partition != currentPartition) {
+        if (currentPartition == -1) {
+          currentPartition = partition;
+        } else {
+          int bytesWritten =
+              shuffleClient.mergeData(
+                  shuffleId,
+                  mapId,
+                  attemptNumber,
+                  currentPartition,
+                  dataBuf,
+                  0,
+                  offSet,
+                  numMappers,
+                  numPartitions);
+          mapStatusLengths[currentPartition].add(bytesWritten);
+          afterPush.accept(bytesWritten);
+          currentPartition = partition;
           offSet = 0;
         }
-
-        long recordReadPosition = recordOffsetInPage + UAO_SIZE;
-        Platform.copyMemory(
-            recordPage,
-            recordReadPosition,
-            dataBuf,
-            Platform.BYTE_ARRAY_OFFSET + offSet,
-            recordSize);
-        offSet += recordSize;
       }
-      if (offSet > 0) {
+      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
+      final Object recordPage = taskMemoryManager.getPage(recordPointer);
+      final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
+      int recordSize = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
+
+      if (offSet + recordSize > dataBuf.length) {
         try {
-          dataPusher.addTask(currentPartition, dataBuf, offSet);
+          dataPusher.addTask(partition, dataBuf, offSet);
         } catch (InterruptedException e) {
           TaskInterruptedHelper.throwTaskKillException();
         }
+        offSet = 0;
       }
 
-      long freedBytes = freeMemory();
-      inMemSorter.freeMemory();
-      taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
-
-      return freedBytes;
+      long recordReadPosition = recordOffsetInPage + UAO_SIZE;
+      Platform.copyMemory(
+          recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET 
+ offSet, recordSize);
+      offSet += recordSize;
     }
+    if (offSet > 0) {
+      try {
+        dataPusher.addTask(currentPartition, dataBuf, offSet);
+      } catch (InterruptedException e) {
+        TaskInterruptedHelper.throwTaskKillException();
+      }
+    }
+
+    long freedBytes = freeMemory();
+    inMemSorter.freeMemory();
+    taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
+
+    return freedBytes;
   }
 
   public boolean insertRecord(
@@ -271,38 +255,6 @@ public class SortBasedPusher extends MemoryConsumer {
     return true;
   }
 
-  public void triggerPush() throws IOException {
-    asyncPushing = true;
-    dataPusher.checkException();
-    executorService.submit(
-        () -> {
-          try {
-            pushData();
-            asyncPushing = false;
-          } catch (IOException ie) {
-            dataPusher.setException(ie);
-          }
-        });
-  }
-
-  /**
-   * Since this method and pushData() are synchronized When this method 
returns, it means pushData
-   * has released lock
-   *
-   * @throws IOException
-   */
-  public void waitPushFinish() throws IOException {
-    dataPusher.checkException();
-    while (asyncPushing) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        logger.error("SortBasedPusher thread interrupted while waiting push 
finished.");
-        TaskInterruptedHelper.throwTaskKillException();
-      }
-    }
-  }
-
   private void growPointerArrayIfNecessary() throws IOException {
     assert (inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index bc86f9db7..0962c98c4 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -97,8 +97,6 @@ public class SortBasedPusherSuiteJ {
             /*afterPush=*/ null,
             /*mapStatusLengths=*/ null,
             /*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("1m"),
-            /*sharedPushLock=*/ null,
-            /*executorService=*/ null,
             SendBufferPool.get(4, 30, 60));
 
     // default page size == 2 MiB
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index daf251047..0985c9b69 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 
 import scala.Option;
@@ -70,9 +69,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private final long pushBufferMaxSize;
   private final Object globalPushLock = new Object();
-  private final boolean pipelined;
-  private SortBasedPusher[] pushers = new SortBasedPusher[2];
-  private SortBasedPusher currentPusher;
+  private SortBasedPusher pusher;
   private long peakMemoryUsedBytes = 0;
 
   private final OpenByteArrayOutputStream serBuffer;
@@ -100,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       TaskContext taskContext,
       CelebornConf conf,
       ShuffleClient client,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this.mapId = taskContext.partitionId();
@@ -126,50 +122,23 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     tmpRecords = new long[numPartitions];
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
-    pipelined = conf.clientPushSortPipelineEnabled();
-
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i] =
-            new SortBasedPusher(
-                taskContext.taskMemoryManager(),
-                shuffleClient,
-                taskContext,
-                shuffleId,
-                mapId,
-                taskContext.attemptNumber(),
-                taskContext.taskAttemptId(),
-                numMappers,
-                numPartitions,
-                conf,
-                writeMetrics::incBytesWritten,
-                mapStatusLengths,
-                conf.clientPushSortMemoryThreshold() / 2,
-                globalPushLock,
-                executorService,
-                sendBufferPool);
-      }
-      currentPusher = pushers[0];
-    } else {
-      currentPusher =
-          new SortBasedPusher(
-              taskContext.taskMemoryManager(),
-              shuffleClient,
-              taskContext,
-              shuffleId,
-              mapId,
-              taskContext.attemptNumber(),
-              taskContext.taskAttemptId(),
-              numMappers,
-              numPartitions,
-              conf,
-              writeMetrics::incBytesWritten,
-              mapStatusLengths,
-              conf.clientPushSortMemoryThreshold(),
-              globalPushLock,
-              null,
-              sendBufferPool);
-    }
+
+    pusher =
+        new SortBasedPusher(
+            taskContext.taskMemoryManager(),
+            shuffleClient,
+            taskContext,
+            shuffleId,
+            mapId,
+            taskContext.attemptNumber(),
+            taskContext.taskAttemptId(),
+            numMappers,
+            numPartitions,
+            conf,
+            writeMetrics::incBytesWritten,
+            mapStatusLengths,
+            conf.clientPushSortMemoryThreshold(),
+            sendBufferPool);
   }
 
   @Override
@@ -224,12 +193,12 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
           if (!success) {
             throw new IOException("Unable to push after switching pusher!");
@@ -240,37 +209,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
-  private void pushAndSwitch() throws IOException {
+  private void doPush() throws IOException {
     long start = System.nanoTime();
-    if (pipelined) {
-      currentPusher.triggerPush();
-      currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
-      currentPusher.waitPushFinish();
-    } else {
-      currentPusher.pushData();
-    }
+    pusher.pushData();
     writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void updatePeakMemoryUsed() {
-    // sorter can be null if this writer is closed
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-
-        if (pushers[i] != null) {
-          long mem = pushers[i].getPeakMemoryUsedBytes();
-          if (mem > peakMemoryUsedBytes) {
-            peakMemoryUsedBytes = mem;
-          }
-        }
-      }
-    } else {
-      if (currentPusher != null) {
-        long mem = currentPusher.getPeakMemoryUsedBytes();
-        if (mem > peakMemoryUsedBytes) {
-          peakMemoryUsedBytes = mem;
-        }
-      }
+    long mem = pusher.getPeakMemoryUsedBytes();
+    if (mem > peakMemoryUsedBytes) {
+      peakMemoryUsedBytes = mem;
     }
   }
 
@@ -299,16 +247,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 serBuffer.getBuf(),
                 Platform.BYTE_ARRAY_OFFSET,
                 serializedRecordSize,
                 partitionId,
                 false);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   serBuffer.getBuf(),
                   Platform.BYTE_ARRAY_OFFSET,
                   serializedRecordSize,
@@ -341,23 +289,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException {
-    if (pipelined) {
-      logger.info(
-          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
-    } else {
-      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
-    }
+    logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
     long pushStartTime = System.nanoTime();
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        pusher.waitPushFinish();
-        pusher.pushData();
-        pusher.close();
-      }
-    } else {
-      currentPusher.pushData();
-      currentPusher.close();
-    }
+    pusher.pushData();
+    pusher.close();
     writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
 
     shuffleClient.pushMergedData(shuffleId, mapId, 
taskContext.attemptNumber());
@@ -404,9 +339,4 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       shuffleClient.cleanup(shuffleId, mapId, taskContext.attemptNumber());
     }
   }
-
-  public long[] getPartitionLengths() {
-    throw new UnsupportedOperationException(
-        "Celeborn is not compatible with Spark push mode, please set 
spark.shuffle.push.enabled to false");
-  }
 }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 5b68db0f8..302e4398a 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import scala.Int;
 import scala.Option;
@@ -43,7 +41,6 @@ import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.client.security.CryptoUtils;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.reflect.DynMethods;
 
 public class SparkShuffleManager implements ShuffleManager {
@@ -67,9 +64,6 @@ public class SparkShuffleManager implements ShuffleManager {
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
-
   private long sendBufferPoolCheckInterval;
   private long sendBufferPoolExpireTimeout;
 
@@ -81,15 +75,6 @@ public class SparkShuffleManager implements ShuffleManager {
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
     this.sendBufferPoolCheckInterval = 
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
     this.sendBufferPoolExpireTimeout = 
celebornConf.clientPushSendBufferPoolExpireTimeout();
   }
@@ -235,8 +220,6 @@ public class SparkShuffleManager implements ShuffleManager {
         shuffleIdTracker.track(h.shuffleId(), shuffleId);
 
         if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
               shuffleId,
               h.dependency(),
@@ -244,7 +227,6 @@ public class SparkShuffleManager implements ShuffleManager {
               context,
               celebornConf,
               client,
-              pushThread,
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout));
         } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           return new HashBasedShuffleWriter<>(
@@ -287,12 +269,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     return _sortShuffleManager.getReader(handle, startPartition, endPartition, 
context);
   }
 
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
   private int executorCores(SparkConf conf) {
     if (Utils.isLocalMaster(conf)) {
       // SparkContext.numDriverCores is package private.
diff --git 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 74ebab2b9..b56bda1f8 100644
--- 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -40,7 +40,6 @@ public class SortBasedShuffleWriterSuiteJ extends 
CelebornShuffleWriterSuiteBase
         context,
         conf,
         client,
-        null,
         SendBufferPool.get(4, 30, 60));
   }
 }
diff --git 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 36876d6bd..cb6c3e0ab 100644
--- 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++ 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -38,7 +38,6 @@ class SparkShuffleManagerSuite extends Logging {
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
-      .set("spark.shuffle.useOldFetchProtocol", "true")
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
       .setAppName("test")
     val sc = new SparkContext(conf)
@@ -58,7 +57,6 @@ class SparkShuffleManagerSuite extends Logging {
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
-      .set("spark.shuffle.useOldFetchProtocol", "true")
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
       .setAppName("test")
     val sc = new SparkContext(conf)
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index a2f3cdfb7..2a755a70c 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 
 import scala.Option;
@@ -69,11 +68,8 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private final int numPartitions;
 
   private final long pushBufferMaxSize;
-  // this lock is shared between different SortBasedPushers to synchronize 
pushData
-  private final Object sharedPushLock = new Object();
-  private final boolean pipelined;
-  private final SortBasedPusher[] pushers = new SortBasedPusher[2];
-  private SortBasedPusher currentPusher;
+
+  private final SortBasedPusher pusher;
   private long peakMemoryUsedBytes = 0;
 
   private final OpenByteArrayOutputStream serBuffer;
@@ -101,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       CelebornConf conf,
       ShuffleClient client,
       ShuffleWriteMetricsReporter metrics,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this.mapId = taskContext.partitionId();
@@ -126,50 +121,23 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     tmpRecords = new long[numPartitions];
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
-    pipelined = conf.clientPushSortPipelineEnabled();
-
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i] =
-            new SortBasedPusher(
-                taskContext.taskMemoryManager(),
-                shuffleClient,
-                taskContext,
-                shuffleId,
-                mapId,
-                taskContext.attemptNumber(),
-                taskContext.taskAttemptId(),
-                numMappers,
-                numPartitions,
-                conf,
-                writeMetrics::incBytesWritten,
-                mapStatusLengths,
-                conf.clientPushSortMemoryThreshold() / 2,
-                sharedPushLock,
-                executorService,
-                sendBufferPool);
-      }
-      currentPusher = pushers[0];
-    } else {
-      currentPusher =
-          new SortBasedPusher(
-              taskContext.taskMemoryManager(),
-              shuffleClient,
-              taskContext,
-              shuffleId,
-              mapId,
-              taskContext.attemptNumber(),
-              taskContext.taskAttemptId(),
-              numMappers,
-              numPartitions,
-              conf,
-              writeMetrics::incBytesWritten,
-              mapStatusLengths,
-              conf.clientPushSortMemoryThreshold(),
-              sharedPushLock,
-              null,
-              sendBufferPool);
-    }
+
+    pusher =
+        new SortBasedPusher(
+            taskContext.taskMemoryManager(),
+            shuffleClient,
+            taskContext,
+            shuffleId,
+            mapId,
+            taskContext.attemptNumber(),
+            taskContext.taskAttemptId(),
+            numMappers,
+            numPartitions,
+            conf,
+            writeMetrics::incBytesWritten,
+            mapStatusLengths,
+            conf.clientPushSortMemoryThreshold(),
+            sendBufferPool);
   }
 
   public SortBasedShuffleWriter(
@@ -178,7 +146,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       CelebornConf conf,
       ShuffleClient client,
       ShuffleWriteMetricsReporter metrics,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this(
@@ -189,28 +156,13 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         conf,
         client,
         metrics,
-        executorService,
         sendBufferPool);
   }
 
   private void updatePeakMemoryUsed() {
-    // sorter can be null if this writer is closed
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        if (pusher != null) {
-          long mem = pusher.getPeakMemoryUsedBytes();
-          if (mem > peakMemoryUsedBytes) {
-            peakMemoryUsedBytes = mem;
-          }
-        }
-      }
-    } else {
-      if (currentPusher != null) {
-        long mem = currentPusher.getPeakMemoryUsedBytes();
-        if (mem > peakMemoryUsedBytes) {
-          peakMemoryUsedBytes = mem;
-        }
-      }
+    long mem = pusher.getPeakMemoryUsedBytes();
+    if (mem > peakMemoryUsedBytes) {
+      peakMemoryUsedBytes = mem;
     }
   }
 
@@ -275,12 +227,12 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
           if (!success) {
             throw new CelebornIOException("Unable to push after switching 
pusher!");
@@ -291,15 +243,9 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
-  private void pushAndSwitch() throws IOException {
+  private void doPush() throws IOException {
     long start = System.nanoTime();
-    if (pipelined) {
-      currentPusher.triggerPush();
-      currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
-      currentPusher.waitPushFinish();
-    } else {
-      currentPusher.pushData();
-    }
+    pusher.pushData();
     writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
@@ -322,16 +268,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 serBuffer.getBuf(),
                 Platform.BYTE_ARRAY_OFFSET,
                 serializedRecordSize,
                 partitionId,
                 false);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   serBuffer.getBuf(),
                   Platform.BYTE_ARRAY_OFFSET,
                   serializedRecordSize,
@@ -364,23 +310,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException {
-    if (pipelined) {
-      logger.info(
-          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
-    } else {
-      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
-    }
+    logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
     long pushStartTime = System.nanoTime();
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        pusher.waitPushFinish();
-        pusher.pushData();
-        pusher.close();
-      }
-    } else {
-      currentPusher.pushData();
-      currentPusher.close();
-    }
+    pusher.pushData();
+    pusher.close();
 
     shuffleClient.pushMergedData(shuffleId, mapId, 
taskContext.attemptNumber());
     writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index f7c0e5985..85aa4d6c1 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.spark.*;
 import org.apache.spark.internal.config.package$;
@@ -41,7 +39,6 @@ import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.client.security.CryptoUtils;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.reflect.DynMethods;
 
 /**
@@ -90,9 +87,6 @@ public class SparkShuffleManager implements ShuffleManager {
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
-
   private long sendBufferPoolCheckInterval;
   private long sendBufferPoolExpireTimeout;
 
@@ -110,16 +104,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = executorCores(conf);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
-    if ((ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-            || celebornConf.dynamicWriteModeEnabled())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
     this.sendBufferPoolCheckInterval = 
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
     this.sendBufferPoolExpireTimeout = 
celebornConf.clientPushSendBufferPoolExpireTimeout();
   }
@@ -305,8 +289,6 @@ public class SparkShuffleManager implements ShuffleManager {
         }
 
         if (ShuffleMode.SORT.equals(shuffleMode)) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
               shuffleId,
               h.dependency(),
@@ -315,7 +297,6 @@ public class SparkShuffleManager implements ShuffleManager {
               celebornConf,
               shuffleClient,
               metrics,
-              pushThread,
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout));
         } else if (ShuffleMode.HASH.equals(shuffleMode)) {
           SendBufferPool pool =
@@ -448,12 +429,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     }
   }
 
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
   private int executorCores(SparkConf conf) {
     if (Utils.isLocalMaster(conf)) {
       // SparkContext.numDriverCores is package private.
diff --git 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 2b1ea681b..1764f727b 100644
--- 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -36,6 +36,6 @@ public class SortBasedShuffleWriterSuiteJ extends 
CelebornShuffleWriterSuiteBase
       ShuffleWriteMetricsReporter metrics)
       throws IOException {
     return new SortBasedShuffleWriter<Integer, String, String>(
-        handle, context, conf, client, metrics, null, SendBufferPool.get(4, 
30, 60));
+        handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
   }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 2580ac300..df032a5cb 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -49,12 +49,6 @@ import org.apache.celeborn.common.network.util.ByteUnit;
 public class JavaUtils {
   private static final Logger logger = 
LoggerFactory.getLogger(JavaUtils.class);
 
-  /**
-   * Define a default value for driver memory here since this value is 
referenced across the code
-   * base and nearly all files already use Utils.scala
-   */
-  public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
   /** Closes the given object, ignoring IOExceptions. */
   public static void closeQuietly(Closeable closeable) {
     try {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3e779c702..589a8ab66 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -828,7 +828,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
   def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
   def clientPushSortMemoryThreshold: Long = 
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
-  def clientPushSortPipelineEnabled: Boolean = 
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
   def clientPushSortRandomizePartitionIdEnabled: Boolean =
     get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
   def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -3824,24 +3823,11 @@ object CelebornConf extends Logging {
       .longConf
       .createWithDefault(Int.MaxValue)
 
-  val CLIENT_PUSH_SORT_PIPELINE_ENABLED: ConfigEntry[Boolean] =
-    buildConf("celeborn.client.spark.push.sort.pipeline.enabled")
-      .withAlternative("celeborn.push.sort.pipeline.enabled")
-      .categories("client")
-      .doc("Whether to enable pipelining for sort based shuffle writer. If 
true, double buffering" +
-        " will be used to pipeline push")
-      .version("0.3.0")
-      .booleanConf
-      .createWithDefault(false)
-
   val CLIENT_PUSH_SORT_MEMORY_THRESHOLD: ConfigEntry[Long] =
     buildConf("celeborn.client.spark.push.sort.memory.threshold")
       .withAlternative("celeborn.push.sortMemory.threshold")
       .categories("client")
-      .doc("When SortBasedPusher use memory over the threshold, will trigger 
push data. If the" +
-        s" pipeline push feature is enabled 
(`${CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}=true`)," +
-        " the SortBasedPusher will trigger a data push when the memory usage 
exceeds half of the" +
-        " threshold(by default, 32m).")
+      .doc("When SortBasedPusher use memory over the threshold, will trigger 
push data.")
       .version("0.3.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("64m")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 70f5e54b0..95965b2aa 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -107,8 +107,7 @@ license: |
 | celeborn.client.spark.io.encryption.key |  | io encryption key | 0.4.0 | 
 | celeborn.client.spark.push.dynamicWriteMode.enabled | false | Whether to 
dynamically switch push write mode based on conditions.If true, shuffle mode 
will be only determined by partition count | 0.5.0 | 
 | celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | 
Threshold of shuffle partition number for dynamically switching push writer 
mode. When the shuffle partition number is greater than this value, use the 
sort-based shuffle writer for memory efficiency; otherwise use the hash-based 
shuffle writer for speed. This configuration only takes effect when 
celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | 
-| celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. If the 
pipeline push feature is enabled 
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher 
will trigger a data push when the memory usage exceeds half of the threshold(by 
default, 32m). | 0.3.0 | 
-| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable 
pipelining for sort based shuffle writer. If true, double buffering will be 
used to pipeline push | 0.3.0 | 
+| celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0 
| 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is 
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you 
have changed UnsafeRow's memory layout set this to false. | 0.2.2 | 
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force 
fallback shuffle to Spark's default. | 0.3.0 | 
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | Celeborn will only accept shuffle of partition number lower than 
this configuration value. | 0.3.0 | 
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
deleted file mode 100644
index 5288ac7ab..000000000
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.tests.spark
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.client.ShuffleClient
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.protocol.ShuffleMode
-
-class CelebornPipelineSortSuite extends AnyFunSuite
-  with SparkTestBase
-  with BeforeAndAfterEach {
-
-  override def beforeEach(): Unit = {
-    ShuffleClient.reset()
-  }
-
-  override def afterEach(): Unit = {
-    System.gc()
-  }
-
-  test("celeborn spark integration test - pipeline sort") {
-    val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
-      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"true")
-      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"true")
-    val ss = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
-      .getOrCreate()
-    val value = Range(1, 10000).mkString(",")
-    val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
-      .map { i => (i, value) }.groupByKey(16).collect()
-
-    // verify result
-    assert(tuples.length == 10000)
-    for (elem <- tuples) {
-      assert(elem._2.mkString(",").equals(value))
-    }
-
-    ss.stop()
-  }
-}
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
similarity index 94%
rename from 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
rename to 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
index 0da01ee58..e4b2dd574 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.protocol.ShuffleMode
 
-class CelebornNonPipelineSortSuite extends AnyFunSuite
+class CelebornSortSuite extends AnyFunSuite
   with SparkTestBase
   with BeforeAndAfterEach {
 
@@ -40,7 +40,6 @@ class CelebornNonPipelineSortSuite extends AnyFunSuite
 
   test("celeborn spark integration test - non pipeline sort") {
     val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
-      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"false")
       
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"false")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
     val combineResult = combine(sparkSession)


Reply via email to