This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new aa25cfa66 [#1086] [Doc] Simplify the Gluten code and add the doc 
(#1322)
aa25cfa66 is described below

commit aa25cfa6698a688a00ee38483c04f18ad924016b
Author: summaryzb <[email protected]>
AuthorDate: Mon Nov 27 09:51:22 2023 +0800

    [#1086] [Doc] Simplify the Gluten code and add the doc (#1322)
    
    * gluten integrate for branch-0.8
    
    * spotless check
    
    * add WriteBufferManagerTest test
    
    * todo
    
    * remove  addPartition method, add some docs
---
 README.md                                                      | 10 ++++++++++
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++++---
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index a8134d69c..989c3b4d0 100644
--- a/README.md
+++ b/README.md
@@ -258,6 +258,16 @@ After apply the patch and rebuild spark, add following 
configuration in spark co
   spark.dynamicAllocation.enabled true
   ```
 
+### Support Spark Columnar Shuffle with Gluten
+To support spark columnar shuffle with Uniffle, use Gluten client
+refer to [Gluten Project](https://github.com/oap-project/gluten)
+
+Update Spark conf to enable integration of Uniffle with Gluten:
+  ```
+  spark.plugins io.glutenproject.GlutenPlugin
+  spark.shuffle.manager 
org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager
+  ```
+
 ### Deploy MapReduce Client
 
 1. Add client jar to the classpath of each NodeManager, e.g., 
<HADOOP>/share/hadoop/mapreduce/
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 330f56c8d..6efdfa346 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -86,7 +86,6 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
   private final Set<ShuffleServerInfo> shuffleServersForData;
   private final long[] partitionLengths;
-  private final boolean isMemoryShuffleEnabled;
   private final Function<String, Boolean> taskFailureCallback;
   private final Set<Long> blockIds = Sets.newConcurrentHashSet();
 
@@ -94,6 +93,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   protected final long taskAttemptId;
 
   protected final ShuffleWriteMetrics shuffleWriteMetrics;
+  protected final boolean isMemoryShuffleEnabled;
 
   private final BlockingQueue<Object> finishEventQueue = new 
LinkedBlockingQueue<>();
 
@@ -213,7 +213,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
   }
 
-  private void writeImpl(Iterator<Product2<K, V>> records) {
+  protected void writeImpl(Iterator<Product2<K, V>> records) throws 
IOException {
     List<ShuffleBlockInfo> shuffleBlockInfos;
     boolean isCombine = shuffleDependency.mapSideCombine();
     Function1<V, C> createCombiner = null;
@@ -243,7 +243,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       processShuffleBlockInfos(shuffleBlockInfos);
     }
     long checkStartTs = System.currentTimeMillis();
-    checkBlockSendResult(blockIds);
+    internalCheckBlockSendResult();
     long commitStartTs = System.currentTimeMillis();
     long checkDuration = commitStartTs - checkStartTs;
     if (!isMemoryShuffleEnabled) {
@@ -309,6 +309,10 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     return futures;
   }
 
+  protected void internalCheckBlockSendResult() {
+    checkBlockSendResult(blockIds);
+  }
+
   @VisibleForTesting
   protected void checkBlockSendResult(Set<Long> blockIds) {
     boolean interrupted = false;

Reply via email to