This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bedb2ee0b [GLUTEN-6483] Support Uniffle 0.9.0 (#6484)
bedb2ee0b is described below
commit bedb2ee0ba4bf1bb071234368f5d095c6b912afd
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Aug 1 19:01:17 2024 +0800
[GLUTEN-6483] Support Uniffle 0.9.0 (#6484)
---
.github/workflows/velox_docker.yml | 37 +++++++++++-----------
.../gluten/uniffle/UniffleShuffleManager.java | 14 +-------
.../writer/VeloxUniffleColumnarShuffleWriter.java | 35 +++++++++++---------
pom.xml | 2 +-
tools/gluten-it/pom.xml | 2 +-
5 files changed, 42 insertions(+), 48 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index 913a55cbd..ee6bf9797 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -492,38 +492,39 @@ jobs:
wget
https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
- - name: Build for Spark ${{ matrix.spark }}
- run: |
- cd $GITHUB_WORKSPACE/ && \
- export MAVEN_HOME=/usr/lib/maven && \
- export PATH=${PATH}:${MAVEN_HOME}/bin && \
- $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox
-Puniffle -DskipTests
- - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle
0.8.0
+ - name: Build for Uniffle 0.9.0
run: |
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
cd /opt && \
- git clone -b branch-0.8
https://github.com/apache/incubator-uniffle.git && \
+ git clone -b v0.9.0 https://github.com/apache/incubator-uniffle.git
&& \
cd incubator-uniffle && \
- sed -i '250d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
- sed -i '228d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
- sed -i '226d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
- $MVN_CMD clean install -Phadoop2.8 -DskipTests
+ $MVN_CMD clean install -Phadoop2.8,spark3 -DskipTests
cd /opt && \
- wget -nv
https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz
&& \
- tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv
/opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
+ wget -nv
https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz
&& \
+ tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv
/opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \
wget -nv
https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz
&& \
tar xzf hadoop-2.8.5.tar.gz -C /opt/
- rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
- cp
/opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar
/opt/uniffle/jars/server/
rm -rf /opt/incubator-uniffle
cd /opt/uniffle && mkdir shuffle_data && \
bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' >
./bin/rss-env.sh" && \
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max
1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
- bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port
19997\nrss.jetty.http.port 19996\nrss.server.netty.port
19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type
MEMORY_LOCALFILE\nrss.coordinator.quorum
localhost:19999\nrss.server.flush.thread.alive
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
+ bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port
19997\nrss.rpc.server.type GRPC_NETTY\nrss.jetty.http.port
19996\nrss.server.netty.port 19995\nrss.storage.basePath
/opt/uniffle/shuffle_data\nrss.storage.type
MEMORY_LOCALFILE\nrss.coordinator.quorum
localhost:19999\nrss.server.flush.thread.alive
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
- cd $GITHUB_WORKSPACE/tools/gluten-it && $MVN_CMD clean install
-Pspark-3.2 -Puniffle && \
+ - name: Build for Spark ${{ matrix.spark }}
+ run: |
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ cd $GITHUB_WORKSPACE/ && \
+ $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox
-Puniffle -DskipTests
+ - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with
uniffle-0.9.0
+ run: |
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
+ cd $GITHUB_WORKSPACE/tools/gluten-it && \
+ $MVN_CMD clean install -Pspark-3.2 -Puniffle && \
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox-with-uniffle --benchmark-type=h
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
index f91141c1e..eb66dae90 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -16,9 +16,7 @@
*/
package org.apache.spark.shuffle.gluten.uniffle;
-import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ColumnarShuffleDependency;
@@ -36,21 +34,11 @@ import org.slf4j.LoggerFactory;
public class UniffleShuffleManager extends RssShuffleManager {
private static final Logger LOG =
LoggerFactory.getLogger(UniffleShuffleManager.class);
- private boolean isDriver() {
- return "driver".equals(SparkEnv.get().executorId());
- }
-
public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssSparkConfig.RSS_ROW_BASED.key(), "false");
}
- @Override
- public <K, V, C> ShuffleHandle registerShuffle(
- int shuffleId, ShuffleDependency<K, V, C> dependency) {
- return super.registerShuffle(shuffleId, dependency);
- }
-
@Override
public <K, V> ShuffleWriter<K, V> getWriter(
ShuffleHandle handle, long mapId, TaskContext context,
ShuffleWriteMetricsReporter metrics) {
@@ -62,7 +50,7 @@ public class UniffleShuffleManager extends RssShuffleManager {
ColumnarShuffleDependency<K, V, V> dependency =
(ColumnarShuffleDependency<K, V, V>) rssHandle.getDependency();
setPusherAppId(rssHandle);
- String taskId = "" + context.taskAttemptId() + "_" +
context.attemptNumber();
+ String taskId = context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleWriteMetrics writeMetrics;
if (metrics != null) {
writeMetrics = new WriteMetrics(metrics);
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index b84c9d4ee..a80e34fb1 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.SparkResourceUtil;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
private long nativeShuffleWriter = -1L;
private boolean stopping = false;
- private int compressThreshold =
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
- private double reallocThreshold =
GlutenConfig.getConf().columnarShuffleReallocThreshold();
+ private final int compressThreshold =
+ GlutenConfig.getConf().columnarShuffleCompressionThreshold();
+ private final double reallocThreshold =
GlutenConfig.getConf().columnarShuffleReallocThreshold();
private String compressionCodec;
- private int compressionLevel;
- private int partitionId;
+ private final int compressionLevel;
+ private final int partitionId;
- private Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
- private ShuffleWriterJniWrapper jniWrapper =
ShuffleWriterJniWrapper.create(runtime);
- private SplitResult splitResult;
- private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
- private int bufferSize;
- private PartitionPusher partitionPusher;
- private Boolean isSort;
+ private final Runtime runtime =
Runtimes.contextInstance("UniffleShuffleWriter");
+ private final ShuffleWriterJniWrapper jniWrapper =
ShuffleWriterJniWrapper.create(runtime);
+ private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
+ private final int bufferSize;
+ private final Boolean isSort;
private final ColumnarShuffleDependency<K, V, V> columnarDep;
private final SparkConf sparkConf;
@@ -125,13 +125,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
}
@Override
- protected void writeImpl(Iterator<Product2<K, V>> records) throws
IOException {
+ protected void writeImpl(Iterator<Product2<K, V>> records) {
if (!records.hasNext() && !isMemoryShuffleEnabled) {
super.sendCommit();
return;
}
// writer already init
- partitionPusher = new PartitionPusher(this);
+ PartitionPusher partitionPusher = new PartitionPusher(this);
while (records.hasNext()) {
ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
if (cb.numRows() == 0 || cb.numCols() == 0) {
@@ -194,7 +194,12 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
}
- splitResult = jniWrapper.stop(nativeShuffleWriter);
+ SplitResult splitResult;
+ try {
+ splitResult = jniWrapper.stop(nativeShuffleWriter);
+ } catch (IOException e) {
+ throw new RssException(e);
+ }
columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() -
startTime);
columnarDep
.metrics()
@@ -220,7 +225,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
- "Finish write shuffle with rest write {} ms",
+ "Finish write shuffle with rest write {} ms",
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}
diff --git a/pom.xml b/pom.xml
index bb59ad2a7..cbec5befb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
<celeborn.version>0.5.1</celeborn.version>
- <uniffle.version>0.8.0</uniffle.version>
+ <uniffle.version>0.9.0</uniffle.version>
<arrow.version>15.0.0</arrow.version>
<arrow-gluten.version>15.0.0-gluten</arrow-gluten.version>
<arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 43bc3ae09..5ab633252 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -22,7 +22,7 @@
<scala.binary.version>2.12</scala.binary.version>
<spark.major.version>3</spark.major.version>
<celeborn.version>0.3.2-incubating</celeborn.version>
- <uniffle.version>0.8.0</uniffle.version>
+ <uniffle.version>0.9.0</uniffle.version>
<gluten.version>1.2.0-SNAPSHOT</gluten.version>
<guava.version>32.0.1-jre</guava.version>
<tpch.version>1.1</tpch.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]