This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b9408f995 [VL] Add uniffle integration (#3767)
b9408f995 is described below
commit b9408f9955a610a3ddf1ec1e56903a29e12314eb
Author: summaryzb <[email protected]>
AuthorDate: Thu Apr 11 17:30:26 2024 +0800
[VL] Add uniffle integration (#3767)
---
.github/workflows/velox_docker.yml | 61 +++++
.../clickhouse/CHSparkPlanExecApi.scala | 2 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 3 +-
cpp/core/jni/JniWrapper.cc | 17 ++
cpp/core/shuffle/Options.h | 2 +-
gluten-uniffle/package/pom.xml | 29 +++
gluten-uniffle/pom.xml | 89 +++++++
gluten-uniffle/velox/pom.xml | 62 +++++
.../gluten/uniffle/UniffleShuffleManager.java | 87 +++++++
.../writer/VeloxUniffleColumnarShuffleWriter.java | 264 +++++++++++++++++++++
.../spark/shuffle/writer/PartitionPusher.scala | 27 +++
package/pom.xml | 10 +
pom.xml | 10 +
.../scala/org/apache/gluten/GlutenConfig.scala | 5 +
.../apache/gluten/integration/tpc/TpcMixin.java | 7 +-
.../apache/gluten/integration/tpc/Constants.scala | 19 ++
tools/gluten-it/package/pom.xml | 14 ++
tools/gluten-it/pom.xml | 1 +
18 files changed, 705 insertions(+), 4 deletions(-)
diff --git a/.github/workflows/velox_docker.yml
b/.github/workflows/velox_docker.yml
index 4d65df5b2..17a67d535 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -21,6 +21,7 @@ on:
- '.github/workflows/velox_docker.yml'
- 'pom.xml'
- 'backends-velox/**'
+ - 'gluten-uniffle/**'
- 'gluten-celeborn/common/**'
- 'gluten-celeborn/package/**'
- 'gluten-celeborn/velox/**'
@@ -337,6 +338,66 @@ jobs:
--local --preset=velox --benchmark-type=ds --error-on-memleak
-s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1
\
--skip-data-gen --random-kill-tasks
+ run-tpc-test-centos8-uniffle:
+ needs: build-native-lib
+ strategy:
+ fail-fast: false
+ matrix:
+ spark: ["spark-3.2"]
+ runs-on: ubuntu-20.04
+ container: centos:8
+ steps:
+ - uses: actions/checkout@v2
+ - name: Download All Artifacts
+ uses: actions/download-artifact@v2
+ with:
+ name: velox-native-lib-${{github.sha}}
+ path: ./cpp/build/releases
+ - name: Update mirror list
+ run: |
+ sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-*
|| true
+ sed -i -e
"s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g"
/etc/yum.repos.d/CentOS-* || true
+ - name: Setup java and maven
+ run: |
+ yum update -y && yum install -y java-1.8.0-openjdk-devel wget git
+ wget
https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
+ tar -xvf apache-maven-3.8.8-bin.tar.gz
+ mv apache-maven-3.8.8 /usr/lib/maven
+ - name: Build for Spark ${{ matrix.spark }}
+ run: |
+ cd $GITHUB_WORKSPACE/ && \
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ mvn clean install -P${{ matrix.spark }} -Pbackends-velox
-Prss-uniffle -DskipTests
+ - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle
0.8.0
+ run: |
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
+ cd /opt && \
+ git clone -b branch-0.8
https://github.com/apache/incubator-uniffle.git && \
+ cd incubator-uniffle && \
+ sed -i '250d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ sed -i '228d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ sed -i '226d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ mvn clean install -Phadoop2.8 -DskipTests
+ cd /opt && \
+ wget -nv
https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz
&& \
+ tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv
/opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
+ wget -nv
https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz
&& \
+ tar xzf hadoop-2.8.5.tar.gz -C /opt/
+ rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
+ cp
/opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar
/opt/uniffle/jars/server/
+ rm -rf /opt/incubator-uniffle
+ cd /opt/uniffle && mkdir shuffle_data && \
+ bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' >
./bin/rss-env.sh" && \
+ bash -c "echo -e 'rss.coordinator.shuffle.nodes.max
1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
+ bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port
19997\nrss.jetty.http.port 19996\nrss.server.netty.port
19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type
MEMORY_LOCALFILE\nrss.coordinator.quorum
localhost:19999\nrss.server.flush.thread.alive
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
+ bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
+ cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install
-Pspark-3.2,rss-uniffle && \
+ GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
+ --local --preset=velox-with-uniffle --benchmark-type=h
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
+
run-tpc-test-ubuntu-2204-celeborn:
needs: build-native-lib
strategy:
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 57fd81ba0..791a635b0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -473,6 +473,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric],
classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows,
dataSize).asInstanceOf[Serializer]
+ } else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
+ throw new UnsupportedOperationException("temporarily uniffle not support
ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index a869ba23b..1e6de343b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -349,7 +349,8 @@ object BackendSettings extends BackendSettingsApi {
override def supportColumnarShuffleExec(): Boolean = {
GlutenConfig.getConf.isUseColumnarShuffleManager ||
- GlutenConfig.getConf.isUseCelebornShuffleManager
+ GlutenConfig.getConf.isUseCelebornShuffleManager ||
+ GlutenConfig.getConf.isUseUniffleShuffleManager
}
override def enableJoinKeysRewrite(): Boolean = false
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 51e1da652..6f5b2e332 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
std::move(celebornClient));
+ } else if (partitionWriterType == "uniffle") {
+ jclass unifflePartitionPusherClass =
+ createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/shuffle/writer/PartitionPusher;");
+ jmethodID unifflePushPartitionDataMethod =
+ getMethodIdOrError(env, unifflePartitionPusherClass,
"pushPartitionData", "(I[BI)I");
+ JavaVM* vm;
+ if (env->GetJavaVM(&vm) != JNI_OK) {
+ throw gluten::GlutenException("Unable to get JavaVM instance");
+ }
+ // rename CelebornClient RssClient
+ std::shared_ptr<CelebornClient> uniffleClient =
+ std::make_shared<CelebornClient>(vm, partitionPusher,
unifflePushPartitionDataMethod);
+ partitionWriter = std::make_unique<CelebornPartitionWriter>(
+ numPartitions,
+ std::move(partitionWriterOptions),
+ memoryManager->getArrowMemoryPool(),
+ std::move(uniffleClient));
} else {
throw gluten::GlutenException("Unrecognizable partition writer type: " +
partitionWriterType);
}
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 6531f150e..6a793daa3 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;
-enum PartitionWriterType { kLocal, kCeleborn };
+enum PartitionWriterType { kLocal, kCeleborn, kUniffle };
struct ShuffleReaderOptions {
arrow::Compression::type compressionType =
arrow::Compression::type::LZ4_FRAME;
diff --git a/gluten-uniffle/package/pom.xml b/gluten-uniffle/package/pom.xml
new file mode 100644
index 000000000..64c0fadd3
--- /dev/null
+++ b/gluten-uniffle/package/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>gluten-uniffle</artifactId>
+ <groupId>org.apache.gluten</groupId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>gluten-uniffle-package</artifactId>
+ <packaging>jar</packaging>
+ <name>Gluten Uniffle Package</name>
+
+ <profiles>
+ <profile>
+ <id>backends-velox</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-uniffle-velox</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
diff --git a/gluten-uniffle/pom.xml b/gluten-uniffle/pom.xml
new file mode 100644
index 000000000..ce38bdd29
--- /dev/null
+++ b/gluten-uniffle/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>gluten-parent</artifactId>
+ <groupId>org.apache.gluten</groupId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>gluten-uniffle</artifactId>
+ <packaging>pom</packaging>
+ <name>Gluten Uniffle</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark${spark.major.version}-shaded</artifactId>
+ <version>${uniffle.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>backends-velox</id>
+ <properties>
+ </properties>
+ <modules>
+ <module>velox</module>
+ <module>package</module>
+ </modules>
+ </profile>
+ </profiles>
+</project>
diff --git a/gluten-uniffle/velox/pom.xml b/gluten-uniffle/velox/pom.xml
new file mode 100755
index 000000000..19865fa69
--- /dev/null
+++ b/gluten-uniffle/velox/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>gluten-uniffle</artifactId>
+ <groupId>org.apache.gluten</groupId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>gluten-uniffle-velox</artifactId>
+ <packaging>jar</packaging>
+ <name>Gluten Uniffle Velox</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>backends-velox</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-data</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
new file mode 100644
index 000000000..9ae62f8b8
--- /dev/null
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.uniffle;
+
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandle;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
+import org.apache.uniffle.common.exception.RssException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UniffleShuffleManager extends RssShuffleManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniffleShuffleManager.class);
+
+ private boolean isDriver() {
+ return "driver".equals(SparkEnv.get().executorId());
+ }
+
+ public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
+ super(conf, isDriver);
+ conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssSparkConfig.RSS_ROW_BASED, "false");
+ }
+
+ @Override
+ public <K, V, C> ShuffleHandle registerShuffle(
+ int shuffleId, ShuffleDependency<K, V, C> dependency) {
+ return super.registerShuffle(shuffleId, dependency);
+ }
+
+ @Override
+ public <K, V> ShuffleWriter<K, V> getWriter(
+ ShuffleHandle handle, long mapId, TaskContext context,
ShuffleWriteMetricsReporter metrics) {
+ if (!(handle instanceof RssShuffleHandle)) {
+ throw new RssException("Unexpected ShuffleHandle:" +
handle.getClass().getName());
+ }
+ RssShuffleHandle<K, V, V> rssHandle = (RssShuffleHandle<K, V, V>) handle;
+ if (rssHandle.getDependency() instanceof ColumnarShuffleDependency) {
+ setPusherAppId(rssHandle);
+ String taskId = "" + context.taskAttemptId() + "_" +
context.attemptNumber();
+ ShuffleWriteMetrics writeMetrics;
+ if (metrics != null) {
+ writeMetrics = new WriteMetrics(metrics);
+ } else {
+ writeMetrics = context.taskMetrics().shuffleWriteMetrics();
+ }
+ return new VeloxUniffleColumnarShuffleWriter<>(
+ context.partitionId(),
+ rssHandle.getAppId(),
+ rssHandle.getShuffleId(),
+ taskId,
+ context.taskAttemptId(),
+ writeMetrics,
+ this,
+ sparkConf,
+ shuffleWriteClient,
+ rssHandle,
+ this::markFailedTask,
+ context);
+ } else {
+ return super.getWriter(handle, mapId, context, metrics);
+ }
+ }
+}
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
new file mode 100644
index 000000000..17cfce1c0
--- /dev/null
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.writer;
+
+import org.apache.gluten.GlutenConfig;
+import org.apache.gluten.columnarbatch.ColumnarBatches;
+import org.apache.gluten.memory.memtarget.MemoryTarget;
+import org.apache.gluten.memory.memtarget.Spiller;
+import org.apache.gluten.memory.memtarget.Spillers;
+import org.apache.gluten.memory.nmm.NativeMemoryManagers;
+import org.apache.gluten.vectorized.ShuffleWriterJniWrapper;
+import org.apache.gluten.vectorized.SplitResult;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.memory.SparkMemoryUtil;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.GlutenShuffleUtils;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.util.SparkResourceUtil;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.Option;
+import scala.Product2;
+import scala.collection.Iterator;
+
+public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K, V, V> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(VeloxUniffleColumnarShuffleWriter.class);
+
+ private long nativeShuffleWriter = -1L;
+
+ private boolean stopping = false;
+ private int compressThreshold =
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
+ private double reallocThreshold =
GlutenConfig.getConf().columnarShuffleReallocThreshold();
+ private String compressionCodec;
+ private int compressionLevel;
+ private int partitionId;
+
+ private ShuffleWriterJniWrapper jniWrapper =
ShuffleWriterJniWrapper.create();
+ private SplitResult splitResult;
+ private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
+ private int bufferSize;
+ private PartitionPusher partitionPusher;
+
+ private final ColumnarShuffleDependency<K, V, V> columnarDep;
+ private final SparkConf sparkConf;
+
+ private long availableOffHeapPerTask() {
+ return SparkMemoryUtil.getCurrentAvailableOffHeapMemory()
+ / SparkResourceUtil.getTaskSlots(sparkConf);
+ }
+
+ public VeloxUniffleColumnarShuffleWriter(
+ int partitionId,
+ String appId,
+ int shuffleId,
+ String taskId,
+ long taskAttemptId,
+ ShuffleWriteMetrics shuffleWriteMetrics,
+ RssShuffleManager shuffleManager,
+ SparkConf sparkConf,
+ ShuffleWriteClient shuffleWriteClient,
+ RssShuffleHandle<K, V, V> rssHandle,
+ Function<String, Boolean> taskFailureCallback,
+ TaskContext context) {
+ super(
+ appId,
+ shuffleId,
+ taskId,
+ taskAttemptId,
+ shuffleWriteMetrics,
+ shuffleManager,
+ sparkConf,
+ shuffleWriteClient,
+ rssHandle,
+ taskFailureCallback,
+ context);
+ columnarDep = (ColumnarShuffleDependency<K, V, V>)
rssHandle.getDependency();
+ this.partitionId = partitionId;
+ this.sparkConf = sparkConf;
+ bufferSize =
+ (int)
+ sparkConf.getSizeAsBytes(
+ RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(),
+ RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
+ if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
+ compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
+ }
+ compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf,
compressionCodec, null);
+ }
+
+ @Override
+ protected void writeImpl(Iterator<Product2<K, V>> records) throws
IOException {
+ if (!records.hasNext() && !isMemoryShuffleEnabled) {
+ super.sendCommit();
+ return;
+ }
+ // writer already init
+ partitionPusher = new PartitionPusher(this);
+ while (records.hasNext()) {
+ ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
+ if (cb.numRows() == 0 || cb.numCols() == 0) {
+ LOG.info("Skip ColumnarBatch of 0 rows or 0 cols");
+ } else {
+ long handle = ColumnarBatches.getNativeHandle(cb);
+ if (nativeShuffleWriter == -1) {
+ nativeShuffleWriter =
+ jniWrapper.makeForRSS(
+ columnarDep.nativePartitioning(),
+ nativeBufferSize,
+ // use field do this
+ compressionCodec,
+ compressionLevel,
+ compressThreshold,
+ GlutenConfig.getConf().columnarShuffleCompressionMode(),
+ bufferSize,
+ partitionPusher,
+ NativeMemoryManagers.create(
+ "UniffleShuffleWriter",
+ new Spiller() {
+ @Override
+ public long spill(MemoryTarget self, long size) {
+ if (nativeShuffleWriter == -1) {
+ throw new IllegalStateException(
+ "Fatal: spill() called before a shuffle
shuffle writer "
+ + "evaluator is created. This behavior
should be"
+ + "optimized by moving memory "
+ + "allocations from make() to
split()");
+ }
+ LOG.info(
+ "Gluten shuffle writer: Trying to push {}
bytes of data", size);
+ long pushed =
+ jniWrapper.nativeEvict(nativeShuffleWriter,
size, false);
+ LOG.info(
+ "Gluten shuffle writer: Pushed {} / {} bytes
of data",
+ pushed,
+ size);
+ return pushed;
+ }
+
+ @Override
+ public Set<Phase> applicablePhases() {
+ return Spillers.PHASE_SET_SPILL_ONLY;
+ }
+ })
+ .getNativeInstanceHandle(),
+ handle,
+ taskAttemptId,
+ GlutenShuffleUtils.getStartPartitionId(
+ columnarDep.nativePartitioning(), partitionId),
+ "uniffle",
+ reallocThreshold);
+ }
+ long startTime = System.nanoTime();
+ long bytes =
+ jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle,
availableOffHeapPerTask());
+ LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(),
bytes);
+ columnarDep.metrics().get("dataSize").get().add(bytes);
+ // this metric replace part of uniffle shuffle write time
+ columnarDep.metrics().get("splitTime").get().add(System.nanoTime() -
startTime);
+ columnarDep.metrics().get("numInputRows").get().add(cb.numRows());
+ columnarDep.metrics().get("inputBatches").get().add(1);
+ shuffleWriteMetrics.incRecordsWritten(cb.numRows());
+ }
+ }
+
+ long startTime = System.nanoTime();
+ LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+ if (nativeShuffleWriter == -1L) {
+ throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+ }
+ splitResult = jniWrapper.stop(nativeShuffleWriter);
+ columnarDep
+ .metrics()
+ .get("splitTime")
+ .get()
+ .add(
+ System.nanoTime()
+ - startTime
+ - splitResult.getTotalPushTime()
+ - splitResult.getTotalWriteTime()
+ - splitResult.getTotalCompressTime());
+
+ shuffleWriteMetrics.incBytesWritten(splitResult.getTotalBytesWritten());
+ shuffleWriteMetrics.incWriteTime(
+ splitResult.getTotalWriteTime() + splitResult.getTotalPushTime());
+ // partitionLengths is calculate in uniffle side
+
+ long pushMergedDataTime = System.nanoTime();
+ // clear all
+ sendRestBlockAndWait();
+ if (!isMemoryShuffleEnabled) {
+ super.sendCommit();
+ }
+ long writeDurationMs = System.nanoTime() - pushMergedDataTime;
+ shuffleWriteMetrics.incWriteTime(writeDurationMs);
+ LOG.info(
+ "Finish write shuffle with rest write {} ms",
+ TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
+ }
+
+ @Override
+ public Option<MapStatus> stop(boolean success) {
+ if (!stopping) {
+ stopping = true;
+ closeShuffleWriter();
+ return super.stop(success);
+ }
+ return Option.empty();
+ }
+
+ private void closeShuffleWriter() {
+ if (nativeShuffleWriter != -1) {
+ jniWrapper.close(nativeShuffleWriter);
+ nativeShuffleWriter = -1;
+ }
+ }
+
+ private void sendRestBlockAndWait() {
+ List<ShuffleBlockInfo> shuffleBlockInfos =
super.getBufferManager().clear();
+ super.processShuffleBlockInfos(shuffleBlockInfos);
+ // make checkBlockSendResult no arguments
+ super.internalCheckBlockSendResult();
+ }
+
+ public int doAddByte(int partitionId, byte[] data, int length) {
+ List<ShuffleBlockInfo> shuffleBlockInfos =
+ super.getBufferManager()
+ .addPartitionData(partitionId, data, length,
System.currentTimeMillis());
+ super.processShuffleBlockInfos(shuffleBlockInfos);
+ return length;
+ }
+}
diff --git
a/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
new file mode 100644
index 000000000..eb99fd23b
--- /dev/null
+++
b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.writer
+
+import java.io.IOException
+
+class PartitionPusher(val uniffleWriter: VeloxUniffleColumnarShuffleWriter[_,
_]) {
+
+ @throws[IOException]
+ def pushPartitionData(partitionId: Int, buffer: Array[Byte], length: Int):
Int = {
+ uniffleWriter.doAddByte(partitionId, buffer, length)
+ }
+}
diff --git a/package/pom.xml b/package/pom.xml
index 678f13b67..913621ee1 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -80,6 +80,16 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>rss-uniffle</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-uniffle-package</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<build>
diff --git a/pom.xml b/pom.xml
index b5833b3ad..b65f17314 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
<sparkshim.artifactId>spark-sql-columnar-shims-spark32</sparkshim.artifactId>
<celeborn.version>0.3.2-incubating</celeborn.version>
<arrow.version>14.0.1</arrow.version>
+ <uniffle.version>0.8.0</uniffle.version>
<arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
<hadoop.version>2.7.4</hadoop.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -237,6 +238,15 @@
<module>gluten-celeborn</module>
</modules>
</profile>
+ <profile>
+ <id>rss-uniffle</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>gluten-uniffle</module>
+ </modules>
+ </profile>
<profile>
<id>delta</id>
<activation>
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9039d2b8d..bc98ea25e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -121,6 +121,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
.getConfString("spark.shuffle.manager", "sort")
.contains("celeborn")
+ def isUseUniffleShuffleManager: Boolean =
+ conf
+ .getConfString("spark.shuffle.manager", "sort")
+ .contains("RssShuffleManager")
+
def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
index 15a8e66ca..c0313fe77 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
@@ -35,10 +35,10 @@ public class TpcMixin {
@CommandLine.Option(required = true, names = {"--benchmark-type"},
description = "TPC benchmark type: h, ds", defaultValue = "h")
private String benchmarkType;
- @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used:
vanilla, velox, velox-with-celeborn...", defaultValue = "velox")
+ @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used:
vanilla, velox, velox-with-celeborn, velox-with-uniffle...", defaultValue =
"velox")
private String preset;
- @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline
preset used: vanilla, velox, velox-with-celeborn...", defaultValue = "vanilla")
+ @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline
preset used: vanilla, velox, velox-with-celeborn, velox-with-uniffle...",
defaultValue = "vanilla")
private String baselinePreset;
@CommandLine.Option(names = {"--log-level"}, description = "Set log level: 0
for DEBUG, 1 for INFO, 2 for WARN", defaultValue = "2")
@@ -92,6 +92,9 @@ public class TpcMixin {
case "velox-with-celeborn":
conf = Constants.VELOX_WITH_CELEBORN_CONF();
break;
+ case "velox-with-uniffle":
+ conf = Constants.VELOX_WITH_UNIFFLE_CONF();
+ break;
default:
throw new IllegalArgumentException("Preset not found: " + preset);
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
index 44a025932..7564f6dce 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
@@ -58,6 +58,25 @@ object Constants {
.set("spark.celeborn.push.data.timeout", "600s")
.set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
+ val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
+ .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
+ .set("spark.sql.parquet.enableVectorizedReader", "true")
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager")
+ .set("spark.rss.coordinator.quorum", "localhost:19999")
+ .set("spark.rss.storage.type", "MEMORY_LOCALFILE")
+ .set("spark.rss.client.type", "GRPC_NETTY")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.shuffle.service.enabled", "false")
+ .set("spark.sql.adaptive.localShuffleReader.enabled", "false")
+ .set("spark.dynamicAllocation.enabled", "false")
+ .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
+
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
"0")
+ .set(
+ "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
+ "false"
+ )
+
@deprecated
val TYPE_MODIFIER_DATE_AS_DOUBLE: TypeModifier =
new TypeModifier(TypeUtils.typeAccepts(_, DateType), DoubleType) {
diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml
index 6852edcfe..3beef2df7 100644
--- a/tools/gluten-it/package/pom.xml
+++ b/tools/gluten-it/package/pom.xml
@@ -118,5 +118,19 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>rss-uniffle</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+
<artifactId>rss-client-spark${spark.major.version}-shaded</artifactId>
+ <version>${uniffle.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 128bf4b26..c0e2fc321 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -22,6 +22,7 @@
<scala.binary.version>2.12</scala.binary.version>
<spark.major.version>3</spark.major.version>
<celeborn.version>0.3.0-incubating</celeborn.version>
+ <uniffle.version>0.8.0</uniffle.version>
<gluten.version>1.2.0-SNAPSHOT</gluten.version>
<guava.version>32.0.1-jre</guava.version>
<tpch.version>1.1</tpch.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]