This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 0ece2f5b5 [CELEBORN-891] Remove pipeline feature for sort based writer
0ece2f5b5 is described below

commit 0ece2f5b59eb25c916de717a6cebbff188c0153c
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jan 1 10:42:17 2024 +0800

    [CELEBORN-891] Remove pipeline feature for sort based writer
    
    Remove pipeline feature for sort based writer
    
    The pipeline feature is added as part of CELEBORN-295, for performance. 
Eventually, an unresolvable issue that would crash the JVM was identified in 
https://github.com/apache/incubator-celeborn/pull/1807, and after discussion, 
we decided to delete this feature.
    
    No, the pipeline feature is disabled by default, there are no changes to 
users who use the default settings.
    
    Pass GA.
    
    Closes #2196 from pan3793/CELEBORN-891.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../spark/shuffle/celeborn/SortBasedPusher.java    | 158 +++++++--------------
 .../shuffle/celeborn/SortBasedPusherSuiteJ.java    |   2 -
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 136 +++++-------------
 .../shuffle/celeborn/SparkShuffleManager.java      |  24 ----
 .../celeborn/SortBasedShuffleWriterSuiteJ.java     |   1 -
 .../celeborn/CelebornShuffleManagerSuite.scala     |   2 -
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 133 +++++------------
 .../shuffle/celeborn/SparkShuffleManager.java      |  24 ----
 .../celeborn/SortBasedShuffleWriterSuiteJ.java     |   2 +-
 .../org/apache/celeborn/common/util/JavaUtils.java |   6 -
 .../org/apache/celeborn/common/CelebornConf.scala  |  16 +--
 docs/configuration/client.md                       |   3 +-
 .../tests/spark/CelebornPipelineSortSuite.scala    |  60 --------
 ...lineSortSuite.scala => CelebornSortSuite.scala} |   3 +-
 14 files changed, 124 insertions(+), 446 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index aff8e7ce3..3b051c3e7 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
@@ -69,12 +68,8 @@ public class SortBasedPusher extends MemoryConsumer {
   private final int numPartitions;
   private final Consumer<Integer> afterPush;
   private final LongAdder[] mapStatusLengths;
-  // this lock is shared between different SortBasedPushers to synchronize 
pushData
-  private final Object sharedPushLock;
-  private volatile boolean asyncPushing = false;
   private int[] shuffledPartitions = null;
   private int[] inversedShuffledPartitions = null;
-  private final ExecutorService executorService;
   private final SendBufferPool sendBufferPool;
 
   public SortBasedPusher(
@@ -91,8 +86,6 @@ public class SortBasedPusher extends MemoryConsumer {
       Consumer<Integer> afterPush,
       LongAdder[] mapStatusLengths,
       long pushSortMemoryThreshold,
-      Object sharedPushLock,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool) {
     super(
         memoryManager,
@@ -144,83 +137,74 @@ public class SortBasedPusher extends MemoryConsumer {
     int initialSize = Math.min((int) pushSortMemoryThreshold / 8, 1024 * 1024);
     this.inMemSorter = new ShuffleInMemorySorter(this, initialSize);
     this.peakMemoryUsedBytes = getMemoryUsage();
-    this.sharedPushLock = sharedPushLock;
-    this.executorService = executorService;
   }
 
   public long pushData() throws IOException {
-    // pushData should be synchronized between pushers
-    synchronized (sharedPushLock) {
-      final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
-          inMemSorter.getSortedIterator();
-
-      byte[] dataBuf = new byte[pushBufferMaxSize];
-      int offSet = 0;
-      int currentPartition = -1;
-      while (sortedRecords.hasNext()) {
-        sortedRecords.loadNext();
-        final int partition =
-            shuffledPartitions != null
-                ? 
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
-                : sortedRecords.packedRecordPointer.getPartitionId();
-        if (partition != currentPartition) {
-          if (currentPartition == -1) {
-            currentPartition = partition;
-          } else {
-            int bytesWritten =
-                shuffleClient.mergeData(
-                    shuffleId,
-                    mapId,
-                    attemptNumber,
-                    currentPartition,
-                    dataBuf,
-                    0,
-                    offSet,
-                    numMappers,
-                    numPartitions);
-            mapStatusLengths[currentPartition].add(bytesWritten);
-            afterPush.accept(bytesWritten);
-            currentPartition = partition;
-            offSet = 0;
-          }
-        }
-        final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
-        final Object recordPage = taskMemoryManager.getPage(recordPointer);
-        final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
-        int recordSize = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
-
-        if (offSet + recordSize > dataBuf.length) {
-          try {
-            dataPusher.addTask(partition, dataBuf, offSet);
-          } catch (InterruptedException e) {
-            TaskInterruptedHelper.throwTaskKillException();
-          }
+    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+        inMemSorter.getSortedIterator();
+
+    byte[] dataBuf = new byte[pushBufferMaxSize];
+    int offSet = 0;
+    int currentPartition = -1;
+    while (sortedRecords.hasNext()) {
+      sortedRecords.loadNext();
+      final int partition =
+          shuffledPartitions != null
+              ? 
inversedShuffledPartitions[sortedRecords.packedRecordPointer.getPartitionId()]
+              : sortedRecords.packedRecordPointer.getPartitionId();
+      if (partition != currentPartition) {
+        if (currentPartition == -1) {
+          currentPartition = partition;
+        } else {
+          int bytesWritten =
+              shuffleClient.mergeData(
+                  shuffleId,
+                  mapId,
+                  attemptNumber,
+                  currentPartition,
+                  dataBuf,
+                  0,
+                  offSet,
+                  numMappers,
+                  numPartitions);
+          mapStatusLengths[currentPartition].add(bytesWritten);
+          afterPush.accept(bytesWritten);
+          currentPartition = partition;
           offSet = 0;
         }
-
-        long recordReadPosition = recordOffsetInPage + UAO_SIZE;
-        Platform.copyMemory(
-            recordPage,
-            recordReadPosition,
-            dataBuf,
-            Platform.BYTE_ARRAY_OFFSET + offSet,
-            recordSize);
-        offSet += recordSize;
       }
-      if (offSet > 0) {
+      final long recordPointer = 
sortedRecords.packedRecordPointer.getRecordPointer();
+      final Object recordPage = taskMemoryManager.getPage(recordPointer);
+      final long recordOffsetInPage = 
taskMemoryManager.getOffsetInPage(recordPointer);
+      int recordSize = UnsafeAlignedOffset.getSize(recordPage, 
recordOffsetInPage);
+
+      if (offSet + recordSize > dataBuf.length) {
         try {
-          dataPusher.addTask(currentPartition, dataBuf, offSet);
+          dataPusher.addTask(partition, dataBuf, offSet);
         } catch (InterruptedException e) {
           TaskInterruptedHelper.throwTaskKillException();
         }
+        offSet = 0;
       }
 
-      long freedBytes = freeMemory();
-      inMemSorter.freeMemory();
-      taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
-
-      return freedBytes;
+      long recordReadPosition = recordOffsetInPage + UAO_SIZE;
+      Platform.copyMemory(
+          recordPage, recordReadPosition, dataBuf, Platform.BYTE_ARRAY_OFFSET 
+ offSet, recordSize);
+      offSet += recordSize;
     }
+    if (offSet > 0) {
+      try {
+        dataPusher.addTask(currentPartition, dataBuf, offSet);
+      } catch (InterruptedException e) {
+        TaskInterruptedHelper.throwTaskKillException();
+      }
+    }
+
+    long freedBytes = freeMemory();
+    inMemSorter.freeMemory();
+    taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
+
+    return freedBytes;
   }
 
   public boolean insertRecord(
@@ -271,38 +255,6 @@ public class SortBasedPusher extends MemoryConsumer {
     return true;
   }
 
-  public void triggerPush() throws IOException {
-    asyncPushing = true;
-    dataPusher.checkException();
-    executorService.submit(
-        () -> {
-          try {
-            pushData();
-            asyncPushing = false;
-          } catch (IOException ie) {
-            dataPusher.setException(ie);
-          }
-        });
-  }
-
-  /**
-   * Since this method and pushData() are synchronized When this method 
returns, it means pushData
-   * has released lock
-   *
-   * @throws IOException
-   */
-  public void waitPushFinish() throws IOException {
-    dataPusher.checkException();
-    while (asyncPushing) {
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        logger.error("SortBasedPusher thread interrupted while waiting push 
finished.");
-        TaskInterruptedHelper.throwTaskKillException();
-      }
-    }
-  }
-
   private void growPointerArrayIfNecessary() throws IOException {
     assert (inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index bc86f9db7..0962c98c4 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -97,8 +97,6 @@ public class SortBasedPusherSuiteJ {
             /*afterPush=*/ null,
             /*mapStatusLengths=*/ null,
             /*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("1m"),
-            /*sharedPushLock=*/ null,
-            /*executorService=*/ null,
             SendBufferPool.get(4, 30, 60));
 
     // default page size == 2 MiB
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 988cdc090..0985c9b69 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 
 import scala.Option;
@@ -70,9 +69,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private final long pushBufferMaxSize;
   private final Object globalPushLock = new Object();
-  private final boolean pipelined;
-  private SortBasedPusher[] pushers = new SortBasedPusher[2];
-  private SortBasedPusher currentPusher;
+  private SortBasedPusher pusher;
   private long peakMemoryUsedBytes = 0;
 
   private final OpenByteArrayOutputStream serBuffer;
@@ -100,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       TaskContext taskContext,
       CelebornConf conf,
       ShuffleClient client,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this.mapId = taskContext.partitionId();
@@ -126,50 +122,23 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     tmpRecords = new long[numPartitions];
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
-    pipelined = conf.clientPushSortPipelineEnabled();
-
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i] =
-            new SortBasedPusher(
-                taskContext.taskMemoryManager(),
-                shuffleClient,
-                taskContext,
-                shuffleId,
-                mapId,
-                taskContext.attemptNumber(),
-                taskContext.taskAttemptId(),
-                numMappers,
-                numPartitions,
-                conf,
-                writeMetrics::incBytesWritten,
-                mapStatusLengths,
-                conf.clientPushSortMemoryThreshold() / 2,
-                globalPushLock,
-                executorService,
-                sendBufferPool);
-      }
-      currentPusher = pushers[0];
-    } else {
-      currentPusher =
-          new SortBasedPusher(
-              taskContext.taskMemoryManager(),
-              shuffleClient,
-              taskContext,
-              shuffleId,
-              mapId,
-              taskContext.attemptNumber(),
-              taskContext.taskAttemptId(),
-              numMappers,
-              numPartitions,
-              conf,
-              writeMetrics::incBytesWritten,
-              mapStatusLengths,
-              conf.clientPushSortMemoryThreshold(),
-              globalPushLock,
-              null,
-              sendBufferPool);
-    }
+
+    pusher =
+        new SortBasedPusher(
+            taskContext.taskMemoryManager(),
+            shuffleClient,
+            taskContext,
+            shuffleId,
+            mapId,
+            taskContext.attemptNumber(),
+            taskContext.taskAttemptId(),
+            numMappers,
+            numPartitions,
+            conf,
+            writeMetrics::incBytesWritten,
+            mapStatusLengths,
+            conf.clientPushSortMemoryThreshold(),
+            sendBufferPool);
   }
 
   @Override
@@ -224,12 +193,12 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
           if (!success) {
             throw new IOException("Unable to push after switching pusher!");
@@ -240,37 +209,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
-  private void pushAndSwitch() throws IOException {
+  private void doPush() throws IOException {
     long start = System.nanoTime();
-    if (pipelined) {
-      currentPusher.triggerPush();
-      currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
-      currentPusher.waitPushFinish();
-    } else {
-      currentPusher.pushData();
-    }
+    pusher.pushData();
     writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
   private void updatePeakMemoryUsed() {
-    // sorter can be null if this writer is closed
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-
-        if (pushers[i] != null) {
-          long mem = pushers[i].getPeakMemoryUsedBytes();
-          if (mem > peakMemoryUsedBytes) {
-            peakMemoryUsedBytes = mem;
-          }
-        }
-      }
-    } else {
-      if (currentPusher != null) {
-        long mem = currentPusher.getPeakMemoryUsedBytes();
-        if (mem > peakMemoryUsedBytes) {
-          peakMemoryUsedBytes = mem;
-        }
-      }
+    long mem = pusher.getPeakMemoryUsedBytes();
+    if (mem > peakMemoryUsedBytes) {
+      peakMemoryUsedBytes = mem;
     }
   }
 
@@ -299,16 +247,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 serBuffer.getBuf(),
                 Platform.BYTE_ARRAY_OFFSET,
                 serializedRecordSize,
                 partitionId,
                 false);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   serBuffer.getBuf(),
                   Platform.BYTE_ARRAY_OFFSET,
                   serializedRecordSize,
@@ -341,23 +289,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException {
-    if (pipelined) {
-      logger.info(
-          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
-    } else {
-      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
-    }
+    logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
     long pushStartTime = System.nanoTime();
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        pusher.waitPushFinish();
-        pusher.pushData();
-        pusher.close();
-      }
-    } else {
-      currentPusher.pushData();
-      currentPusher.close();
-    }
+    pusher.pushData();
+    pusher.close();
     writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
 
     shuffleClient.pushMergedData(shuffleId, mapId, 
taskContext.attemptNumber());
@@ -404,11 +339,4 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       shuffleClient.cleanup(shuffleId, mapId, taskContext.attemptNumber());
     }
   }
-
-  // Added in SPARK-32917, for Spark 3.2 and above
-  @SuppressWarnings("MissingOverride")
-  public long[] getPartitionLengths() {
-    throw new UnsupportedOperationException(
-        "Celeborn is not compatible with Spark push mode, please set 
spark.shuffle.push.enabled to false");
-  }
 }
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 05f4bf733..cda992b29 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -19,8 +19,6 @@ package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import scala.Int;
 
@@ -37,7 +35,6 @@ import org.apache.celeborn.client.LifecycleManager;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.reflect.DynMethods;
 
 public class SparkShuffleManager implements ShuffleManager {
@@ -61,9 +58,6 @@ public class SparkShuffleManager implements ShuffleManager {
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
-
   private long sendBufferPoolCheckInterval;
   private long sendBufferPoolExpireTimeout;
 
@@ -75,15 +69,6 @@ public class SparkShuffleManager implements ShuffleManager {
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
     this.sendBufferPoolCheckInterval = 
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
     this.sendBufferPoolExpireTimeout = 
celebornConf.clientPushSendBufferPoolExpireTimeout();
   }
@@ -206,8 +191,6 @@ public class SparkShuffleManager implements ShuffleManager {
         shuffleIdTracker.track(h.shuffleId(), shuffleId);
 
         if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
               shuffleId,
               h.dependency(),
@@ -215,7 +198,6 @@ public class SparkShuffleManager implements ShuffleManager {
               context,
               celebornConf,
               client,
-              pushThread,
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout));
         } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           return new HashBasedShuffleWriter<>(
@@ -258,12 +240,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     return _sortShuffleManager.getReader(handle, startPartition, endPartition, 
context);
   }
 
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
   private int executorCores(SparkConf conf) {
     if (Utils.isLocalMaster(conf)) {
       // SparkContext.numDriverCores is package private.
diff --git 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 74ebab2b9..b56bda1f8 100644
--- 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -40,7 +40,6 @@ public class SortBasedShuffleWriterSuiteJ extends 
CelebornShuffleWriterSuiteBase
         context,
         conf,
         client,
-        null,
         SendBufferPool.get(4, 30, 60));
   }
 }
diff --git 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
index 36876d6bd..cb6c3e0ab 100644
--- 
a/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
+++ 
b/client-spark/spark-2/src/test/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleManagerSuite.scala
@@ -38,7 +38,6 @@ class SparkShuffleManagerSuite extends Logging {
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
-      .set("spark.shuffle.useOldFetchProtocol", "true")
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
       .setAppName("test")
     val sc = new SparkContext(conf)
@@ -58,7 +57,6 @@ class SparkShuffleManagerSuite extends Logging {
       .set(s"spark.${CelebornConf.MASTER_ENDPOINTS.key}", "localhost:9097")
       .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
       .set("spark.shuffle.service.enabled", "false")
-      .set("spark.shuffle.useOldFetchProtocol", "true")
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
       .setAppName("test")
     val sc = new SparkContext(conf)
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2de03a569..cbb8ec725 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.LongAdder;
 
 import scala.Option;
@@ -69,11 +68,8 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private final int numPartitions;
 
   private final long pushBufferMaxSize;
-  // this lock is shared between different SortBasedPushers to synchronize 
pushData
-  private final Object sharedPushLock = new Object();
-  private final boolean pipelined;
-  private final SortBasedPusher[] pushers = new SortBasedPusher[2];
-  private SortBasedPusher currentPusher;
+
+  private final SortBasedPusher pusher;
   private long peakMemoryUsedBytes = 0;
 
   private final OpenByteArrayOutputStream serBuffer;
@@ -101,7 +97,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       CelebornConf conf,
       ShuffleClient client,
       ShuffleWriteMetricsReporter metrics,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this.mapId = taskContext.partitionId();
@@ -126,50 +121,23 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     tmpRecords = new long[numPartitions];
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
-    pipelined = conf.clientPushSortPipelineEnabled();
-
-    if (pipelined) {
-      for (int i = 0; i < pushers.length; i++) {
-        pushers[i] =
-            new SortBasedPusher(
-                taskContext.taskMemoryManager(),
-                shuffleClient,
-                taskContext,
-                shuffleId,
-                mapId,
-                taskContext.attemptNumber(),
-                taskContext.taskAttemptId(),
-                numMappers,
-                numPartitions,
-                conf,
-                writeMetrics::incBytesWritten,
-                mapStatusLengths,
-                conf.clientPushSortMemoryThreshold() / 2,
-                sharedPushLock,
-                executorService,
-                sendBufferPool);
-      }
-      currentPusher = pushers[0];
-    } else {
-      currentPusher =
-          new SortBasedPusher(
-              taskContext.taskMemoryManager(),
-              shuffleClient,
-              taskContext,
-              shuffleId,
-              mapId,
-              taskContext.attemptNumber(),
-              taskContext.taskAttemptId(),
-              numMappers,
-              numPartitions,
-              conf,
-              writeMetrics::incBytesWritten,
-              mapStatusLengths,
-              conf.clientPushSortMemoryThreshold(),
-              sharedPushLock,
-              null,
-              sendBufferPool);
-    }
+
+    pusher =
+        new SortBasedPusher(
+            taskContext.taskMemoryManager(),
+            shuffleClient,
+            taskContext,
+            shuffleId,
+            mapId,
+            taskContext.attemptNumber(),
+            taskContext.taskAttemptId(),
+            numMappers,
+            numPartitions,
+            conf,
+            writeMetrics::incBytesWritten,
+            mapStatusLengths,
+            conf.clientPushSortMemoryThreshold(),
+            sendBufferPool);
   }
 
   public SortBasedShuffleWriter(
@@ -178,7 +146,6 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       CelebornConf conf,
       ShuffleClient client,
       ShuffleWriteMetricsReporter metrics,
-      ExecutorService executorService,
       SendBufferPool sendBufferPool)
       throws IOException {
     this(
@@ -189,28 +156,13 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         conf,
         client,
         metrics,
-        executorService,
         sendBufferPool);
   }
 
   private void updatePeakMemoryUsed() {
-    // sorter can be null if this writer is closed
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        if (pusher != null) {
-          long mem = pusher.getPeakMemoryUsedBytes();
-          if (mem > peakMemoryUsedBytes) {
-            peakMemoryUsedBytes = mem;
-          }
-        }
-      }
-    } else {
-      if (currentPusher != null) {
-        long mem = currentPusher.getPeakMemoryUsedBytes();
-        if (mem > peakMemoryUsedBytes) {
-          peakMemoryUsedBytes = mem;
-        }
-      }
+    long mem = pusher.getPeakMemoryUsedBytes();
+    if (mem > peakMemoryUsedBytes) {
+      peakMemoryUsedBytes = mem;
     }
   }
 
@@ -275,12 +227,12 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   row.getBaseObject(), row.getBaseOffset(), rowSize, 
partitionId, true);
           if (!success) {
             throw new CelebornIOException("Unable to push after switching 
pusher!");
@@ -291,15 +243,9 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
-  private void pushAndSwitch() throws IOException {
+  private void doPush() throws IOException {
     long start = System.nanoTime();
-    if (pipelined) {
-      currentPusher.triggerPush();
-      currentPusher = (currentPusher == pushers[0] ? pushers[1] : pushers[0]);
-      currentPusher.waitPushFinish();
-    } else {
-      currentPusher.pushData();
-    }
+    pusher.pushData();
     writeMetrics.incWriteTime(System.nanoTime() - start);
   }
 
@@ -322,16 +268,16 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
       } else {
         boolean success =
-            currentPusher.insertRecord(
+            pusher.insertRecord(
                 serBuffer.getBuf(),
                 Platform.BYTE_ARRAY_OFFSET,
                 serializedRecordSize,
                 partitionId,
                 false);
         if (!success) {
-          pushAndSwitch();
+          doPush();
           success =
-              currentPusher.insertRecord(
+              pusher.insertRecord(
                   serBuffer.getBuf(),
                   Platform.BYTE_ARRAY_OFFSET,
                   serializedRecordSize,
@@ -364,23 +310,10 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException {
-    if (pipelined) {
-      logger.info(
-          "Memory used {}", Utils.bytesToString((pushers[0].getUsed() + 
pushers[1].getUsed())));
-    } else {
-      logger.info("Memory used {}", 
Utils.bytesToString(currentPusher.getUsed()));
-    }
+    logger.info("Memory used {}", Utils.bytesToString(pusher.getUsed()));
     long pushStartTime = System.nanoTime();
-    if (pipelined) {
-      for (SortBasedPusher pusher : pushers) {
-        pusher.waitPushFinish();
-        pusher.pushData();
-        pusher.close();
-      }
-    } else {
-      currentPusher.pushData();
-      currentPusher.close();
-    }
+    pusher.pushData();
+    pusher.close();
 
     shuffleClient.pushMergedData(shuffleId, mapId, 
taskContext.attemptNumber());
     writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 450abe7d2..a9b714fb6 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -19,8 +19,6 @@ package org.apache.spark.shuffle.celeborn;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.spark.*;
 import org.apache.spark.launcher.SparkLauncher;
@@ -36,7 +34,6 @@ import org.apache.celeborn.client.LifecycleManager;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.ShuffleMode;
-import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.reflect.DynMethods;
 
 /**
@@ -85,9 +82,6 @@ public class SparkShuffleManager implements ShuffleManager {
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
-  private final ExecutorService[] asyncPushers;
-  private AtomicInteger pusherIdx = new AtomicInteger(0);
-
   private long sendBufferPoolCheckInterval;
   private long sendBufferPoolExpireTimeout;
 
@@ -105,15 +99,6 @@ public class SparkShuffleManager implements ShuffleManager {
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.cores = executorCores(conf);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
-    if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())
-        && celebornConf.clientPushSortPipelineEnabled()) {
-      asyncPushers = new ExecutorService[cores];
-      for (int i = 0; i < asyncPushers.length; i++) {
-        asyncPushers[i] = 
ThreadUtils.newDaemonSingleThreadExecutor("async-pusher-" + i);
-      }
-    } else {
-      asyncPushers = null;
-    }
     this.sendBufferPoolCheckInterval = 
celebornConf.clientPushSendBufferPoolExpireCheckInterval();
     this.sendBufferPoolExpireTimeout = 
celebornConf.clientPushSendBufferPoolExpireTimeout();
   }
@@ -248,8 +233,6 @@ public class SparkShuffleManager implements ShuffleManager {
         shuffleIdTracker.track(h.shuffleId(), shuffleId);
 
         if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
-          ExecutorService pushThread =
-              celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
           return new SortBasedShuffleWriter<>(
               shuffleId,
               h.dependency(),
@@ -258,7 +241,6 @@ public class SparkShuffleManager implements ShuffleManager {
               celebornConf,
               shuffleClient,
               metrics,
-              pushThread,
               SendBufferPool.get(cores, sendBufferPoolCheckInterval, 
sendBufferPoolExpireTimeout));
         } else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           SendBufferPool pool =
@@ -389,12 +371,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     }
   }
 
-  private ExecutorService getPusherThread() {
-    ExecutorService pusherThread = asyncPushers[pusherIdx.get() % 
asyncPushers.length];
-    pusherIdx.incrementAndGet();
-    return pusherThread;
-  }
-
   private int executorCores(SparkConf conf) {
     if (Utils.isLocalMaster(conf)) {
       // SparkContext.numDriverCores is package private.
diff --git 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
index 2b1ea681b..1764f727b 100644
--- 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java
@@ -36,6 +36,6 @@ public class SortBasedShuffleWriterSuiteJ extends 
CelebornShuffleWriterSuiteBase
       ShuffleWriteMetricsReporter metrics)
       throws IOException {
     return new SortBasedShuffleWriter<Integer, String, String>(
-        handle, context, conf, client, metrics, null, SendBufferPool.get(4, 
30, 60));
+        handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
   }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
index 2580ac300..df032a5cb 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/JavaUtils.java
@@ -49,12 +49,6 @@ import org.apache.celeborn.common.network.util.ByteUnit;
 public class JavaUtils {
   private static final Logger logger = 
LoggerFactory.getLogger(JavaUtils.class);
 
-  /**
-   * Define a default value for driver memory here since this value is 
referenced across the code
-   * base and nearly all files already use Utils.scala
-   */
-  public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
   /** Closes the given object, ignoring IOExceptions. */
   public static void closeQuietly(Closeable closeable) {
     try {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index de79714c7..e2d34c18d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -824,7 +824,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
   def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
   def clientPushSortMemoryThreshold: Long = 
get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
-  def clientPushSortPipelineEnabled: Boolean = 
get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
   def clientPushSortRandomizePartitionIdEnabled: Boolean =
     get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
   def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
@@ -3810,24 +3809,11 @@ object CelebornConf extends Logging {
       .longConf
       .createWithDefault(Int.MaxValue)
 
-  val CLIENT_PUSH_SORT_PIPELINE_ENABLED: ConfigEntry[Boolean] =
-    buildConf("celeborn.client.spark.push.sort.pipeline.enabled")
-      .withAlternative("celeborn.push.sort.pipeline.enabled")
-      .categories("client")
-      .doc("Whether to enable pipelining for sort based shuffle writer. If 
true, double buffering" +
-        " will be used to pipeline push")
-      .version("0.3.0")
-      .booleanConf
-      .createWithDefault(false)
-
   val CLIENT_PUSH_SORT_MEMORY_THRESHOLD: ConfigEntry[Long] =
     buildConf("celeborn.client.spark.push.sort.memory.threshold")
       .withAlternative("celeborn.push.sortMemory.threshold")
       .categories("client")
-      .doc("When SortBasedPusher use memory over the threshold, will trigger 
push data. If the" +
-        s" pipeline push feature is enabled 
(`${CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}=true`)," +
-        " the SortBasedPusher will trigger a data push when the memory usage 
exceeds half of the" +
-        " threshold(by default, 32m).")
+      .doc("When SortBasedPusher use memory over the threshold, will trigger 
push data.")
       .version("0.3.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("64m")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 7c171ed3f..76a3aefbb 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -102,8 +102,7 @@ license: |
 | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
Whether to filter excluded worker when register shuffle. | 0.4.0 | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of 
one shuffle can be allocated on. Will choose the smaller positive one from 
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 
0.3.1 | 
 | celeborn.client.spark.fetch.throwsFetchFailure | false | client throws 
FetchFailedException instead of CelebornIOException | 0.4.0 | 
-| celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. If the 
pipeline push feature is enabled 
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher 
will trigger a data push when the memory usage exceeds half of the threshold(by 
default, 32m). | 0.3.0 | 
-| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable 
pipelining for sort based shuffle writer. If true, double buffering will be 
used to pipeline push | 0.3.0 | 
+| celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. | 0.3.0 
| 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is 
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you 
have changed UnsafeRow's memory layout set this to false. | 0.2.2 | 
 | celeborn.client.spark.shuffle.forceFallback.enabled | false | Whether force 
fallback shuffle to Spark's default. | 0.3.0 | 
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | Celeborn will only accept shuffle of partition number lower than 
this configuration value. | 0.3.0 | 
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
deleted file mode 100644
index 5288ac7ab..000000000
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornPipelineSortSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.celeborn.tests.spark
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.funsuite.AnyFunSuite
-
-import org.apache.celeborn.client.ShuffleClient
-import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.protocol.ShuffleMode
-
-class CelebornPipelineSortSuite extends AnyFunSuite
-  with SparkTestBase
-  with BeforeAndAfterEach {
-
-  override def beforeEach(): Unit = {
-    ShuffleClient.reset()
-  }
-
-  override def afterEach(): Unit = {
-    System.gc()
-  }
-
-  test("celeborn spark integration test - pipeline sort") {
-    val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
-      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"true")
-      
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"true")
-    val ss = SparkSession.builder()
-      .config(updateSparkConf(sparkConf, ShuffleMode.SORT))
-      .getOrCreate()
-    val value = Range(1, 10000).mkString(",")
-    val tuples = ss.sparkContext.parallelize(1 to 10000, 2)
-      .map { i => (i, value) }.groupByKey(16).collect()
-
-    // verify result
-    assert(tuples.length == 10000)
-    for (elem <- tuples) {
-      assert(elem._2.mkString(",").equals(value))
-    }
-
-    ss.stop()
-  }
-}
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
similarity index 94%
rename from 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
rename to 
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
index 0da01ee58..e4b2dd574 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornNonPipelineSortSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala
@@ -26,7 +26,7 @@ import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.protocol.ShuffleMode
 
-class CelebornNonPipelineSortSuite extends AnyFunSuite
+class CelebornSortSuite extends AnyFunSuite
   with SparkTestBase
   with BeforeAndAfterEach {
 
@@ -40,7 +40,6 @@ class CelebornNonPipelineSortSuite extends AnyFunSuite
 
   test("celeborn spark integration test - non pipeline sort") {
     val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
-      .set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_PIPELINE_ENABLED.key}", 
"false")
       
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", 
"false")
     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
     val combineResult = combine(sparkSession)


Reply via email to