This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new aa25cfa66 [#1086] [Doc] Simplify the Gluten code and add the doc
(#1322)
aa25cfa66 is described below
commit aa25cfa6698a688a00ee38483c04f18ad924016b
Author: summaryzb <[email protected]>
AuthorDate: Mon Nov 27 09:51:22 2023 +0800
[#1086] [Doc] Simplify the Gluten code and add the doc (#1322)
* gluten integrate for branch-0.8
* spotless check
* add WriteBufferManagerTest test
* todo
* remove addPartition method, add some docs
---
README.md | 10 ++++++++++
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++++---
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/README.md b/README.md
index a8134d69c..989c3b4d0 100644
--- a/README.md
+++ b/README.md
@@ -258,6 +258,16 @@ After apply the patch and rebuild spark, add following
configuration in spark co
spark.dynamicAllocation.enabled true
```
+### Support Spark Columnar Shuffle with Gluten
+To support spark columnar shuffle with Uniffle, use Gluten client
+refer to [Gluten Project](https://github.com/oap-project/gluten)
+
+Update Spark conf to enable integration of Uniffle with Gluten:
+ ```
+ spark.plugins io.glutenproject.GlutenPlugin
+ spark.shuffle.manager
org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager
+ ```
+
### Deploy MapReduce Client
1. Add client jar to the classpath of each NodeManager, e.g.,
<HADOOP>/share/hadoop/mapreduce/
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 330f56c8d..6efdfa346 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -86,7 +86,6 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
private final Set<ShuffleServerInfo> shuffleServersForData;
private final long[] partitionLengths;
- private final boolean isMemoryShuffleEnabled;
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
@@ -94,6 +93,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
protected final long taskAttemptId;
protected final ShuffleWriteMetrics shuffleWriteMetrics;
+ protected final boolean isMemoryShuffleEnabled;
private final BlockingQueue<Object> finishEventQueue = new
LinkedBlockingQueue<>();
@@ -213,7 +213,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
}
- private void writeImpl(Iterator<Product2<K, V>> records) {
+ protected void writeImpl(Iterator<Product2<K, V>> records) throws
IOException {
List<ShuffleBlockInfo> shuffleBlockInfos;
boolean isCombine = shuffleDependency.mapSideCombine();
Function1<V, C> createCombiner = null;
@@ -243,7 +243,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
processShuffleBlockInfos(shuffleBlockInfos);
}
long checkStartTs = System.currentTimeMillis();
- checkBlockSendResult(blockIds);
+ internalCheckBlockSendResult();
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
@@ -309,6 +309,10 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
return futures;
}
+ protected void internalCheckBlockSendResult() {
+ checkBlockSendResult(blockIds);
+ }
+
@VisibleForTesting
protected void checkBlockSendResult(Set<Long> blockIds) {
boolean interrupted = false;