This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b9408f995 [VL] Add uniffle integration (#3767)
b9408f995 is described below

commit b9408f9955a610a3ddf1ec1e56903a29e12314eb
Author: summaryzb <[email protected]>
AuthorDate: Thu Apr 11 17:30:26 2024 +0800

    [VL] Add uniffle integration (#3767)
---
 .github/workflows/velox_docker.yml                 |  61 +++++
 .../clickhouse/CHSparkPlanExecApi.scala            |   2 +
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   3 +-
 cpp/core/jni/JniWrapper.cc                         |  17 ++
 cpp/core/shuffle/Options.h                         |   2 +-
 gluten-uniffle/package/pom.xml                     |  29 +++
 gluten-uniffle/pom.xml                             |  89 +++++++
 gluten-uniffle/velox/pom.xml                       |  62 +++++
 .../gluten/uniffle/UniffleShuffleManager.java      |  87 +++++++
 .../writer/VeloxUniffleColumnarShuffleWriter.java  | 264 +++++++++++++++++++++
 .../spark/shuffle/writer/PartitionPusher.scala     |  27 +++
 package/pom.xml                                    |  10 +
 pom.xml                                            |  10 +
 .../scala/org/apache/gluten/GlutenConfig.scala     |   5 +
 .../apache/gluten/integration/tpc/TpcMixin.java    |   7 +-
 .../apache/gluten/integration/tpc/Constants.scala  |  19 ++
 tools/gluten-it/package/pom.xml                    |  14 ++
 tools/gluten-it/pom.xml                            |   1 +
 18 files changed, 705 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/velox_docker.yml 
b/.github/workflows/velox_docker.yml
index 4d65df5b2..17a67d535 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -21,6 +21,7 @@ on:
       - '.github/workflows/velox_docker.yml'
       - 'pom.xml'
       - 'backends-velox/**'
+      - 'gluten-uniffle/**'
       - 'gluten-celeborn/common/**'
       - 'gluten-celeborn/package/**'
       - 'gluten-celeborn/velox/**'
@@ -337,6 +338,66 @@ jobs:
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
-s=30.0  --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 
\
             --skip-data-gen  --random-kill-tasks
 
+  run-tpc-test-centos8-uniffle:
+    needs: build-native-lib
+    strategy:
+      fail-fast: false
+      matrix:
+        spark: ["spark-3.2"]
+    runs-on: ubuntu-20.04
+    container: centos:8
+    steps:
+      - uses: actions/checkout@v2
+      - name: Download All Artifacts
+        uses: actions/download-artifact@v2
+        with:
+          name: velox-native-lib-${{github.sha}}
+          path: ./cpp/build/releases
+      - name: Update mirror list
+        run: |
+          sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* 
|| true
+          sed -i -e 
"s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" 
/etc/yum.repos.d/CentOS-* || true
+      - name: Setup java and maven
+        run: |
+          yum update -y && yum install -y java-1.8.0-openjdk-devel wget git
+          wget 
https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
+          tar -xvf apache-maven-3.8.8-bin.tar.gz
+          mv apache-maven-3.8.8 /usr/lib/maven
+      - name: Build for Spark ${{ matrix.spark }}
+        run: |
+          cd $GITHUB_WORKSPACE/ && \
+          export MAVEN_HOME=/usr/lib/maven && \
+          export PATH=${PATH}:${MAVEN_HOME}/bin && \
+          mvn clean install -P${{ matrix.spark }} -Pbackends-velox 
-Prss-uniffle -DskipTests
+      - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 
0.8.0
+        run: |
+          export MAVEN_HOME=/usr/lib/maven && \
+          export PATH=${PATH}:${MAVEN_HOME}/bin && \
+          export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
+          cd /opt && \
+          git clone -b branch-0.8 
https://github.com/apache/incubator-uniffle.git && \
+          cd incubator-uniffle && \
+          sed -i '250d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+          sed -i '228d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+          sed -i '226d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+          mvn clean install -Phadoop2.8 -DskipTests
+          cd /opt && \
+          wget -nv 
https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz
 && \
+          tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv 
/opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
+          wget -nv 
https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz 
&& \
+          tar xzf hadoop-2.8.5.tar.gz -C /opt/
+          rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
+          cp 
/opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar 
/opt/uniffle/jars/server/
+          rm -rf /opt/incubator-uniffle
+          cd /opt/uniffle && mkdir shuffle_data && \
+          bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > 
./bin/rss-env.sh" && \
+          bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 
1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
+          bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 
19997\nrss.jetty.http.port 19996\nrss.server.netty.port 
19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type 
MEMORY_LOCALFILE\nrss.coordinator.quorum 
localhost:19999\nrss.server.flush.thread.alive 
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
+          bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
+          cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install 
-Pspark-3.2,rss-uniffle && \
+          GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
+            --local --preset=velox-with-uniffle --benchmark-type=h 
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
+
   run-tpc-test-ubuntu-2204-celeborn:
     needs: build-native-lib
     strategy:
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 57fd81ba0..791a635b0 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -473,6 +473,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       val constructor =
         clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], 
classOf[SQLMetric])
       constructor.newInstance(readBatchNumRows, numOutputRows, 
dataSize).asInstanceOf[Serializer]
+    } else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
+      throw new UnsupportedOperationException("temporarily uniffle not support 
ch ")
     } else {
       new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
     }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index a869ba23b..1e6de343b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -349,7 +349,8 @@ object BackendSettings extends BackendSettingsApi {
 
   override def supportColumnarShuffleExec(): Boolean = {
     GlutenConfig.getConf.isUseColumnarShuffleManager ||
-    GlutenConfig.getConf.isUseCelebornShuffleManager
+    GlutenConfig.getConf.isUseCelebornShuffleManager ||
+    GlutenConfig.getConf.isUseUniffleShuffleManager
   }
 
   override def enableJoinKeysRewrite(): Boolean = false
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 51e1da652..6f5b2e332 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
         std::move(partitionWriterOptions),
         memoryManager->getArrowMemoryPool(),
         std::move(celebornClient));
+  } else if (partitionWriterType == "uniffle") {
+    jclass unifflePartitionPusherClass =
+        createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/shuffle/writer/PartitionPusher;");
+    jmethodID unifflePushPartitionDataMethod =
+        getMethodIdOrError(env, unifflePartitionPusherClass, 
"pushPartitionData", "(I[BI)I");
+    JavaVM* vm;
+    if (env->GetJavaVM(&vm) != JNI_OK) {
+      throw gluten::GlutenException("Unable to get JavaVM instance");
+    }
+    // rename CelebornClient RssClient
+    std::shared_ptr<CelebornClient> uniffleClient =
+        std::make_shared<CelebornClient>(vm, partitionPusher, 
unifflePushPartitionDataMethod);
+    partitionWriter = std::make_unique<CelebornPartitionWriter>(
+        numPartitions,
+        std::move(partitionWriterOptions),
+        memoryManager->getArrowMemoryPool(),
+        std::move(uniffleClient));
   } else {
     throw gluten::GlutenException("Unrecognizable partition writer type: " + 
partitionWriterType);
   }
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 6531f150e..6a793daa3 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
 static constexpr double kDefaultMergeBufferThreshold = 0.25;
 static constexpr bool kEnableBufferedWrite = true;
 
-enum PartitionWriterType { kLocal, kCeleborn };
+enum PartitionWriterType { kLocal, kCeleborn, kUniffle };
 
 struct ShuffleReaderOptions {
   arrow::Compression::type compressionType = 
arrow::Compression::type::LZ4_FRAME;
diff --git a/gluten-uniffle/package/pom.xml b/gluten-uniffle/package/pom.xml
new file mode 100644
index 000000000..64c0fadd3
--- /dev/null
+++ b/gluten-uniffle/package/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>gluten-uniffle</artifactId>
+    <groupId>org.apache.gluten</groupId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>gluten-uniffle-package</artifactId>
+  <packaging>jar</packaging>
+  <name>Gluten Uniffle Package</name>
+
+  <profiles>
+    <profile>
+      <id>backends-velox</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-uniffle-velox</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/gluten-uniffle/pom.xml b/gluten-uniffle/pom.xml
new file mode 100644
index 000000000..ce38bdd29
--- /dev/null
+++ b/gluten-uniffle/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>gluten-parent</artifactId>
+    <groupId>org.apache.gluten</groupId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>gluten-uniffle</artifactId>
+  <packaging>pom</packaging>
+  <name>Gluten Uniffle</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>net.minidev</groupId>
+          <artifactId>json-smart</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.uniffle</groupId>
+      <artifactId>rss-client-spark${spark.major.version}-shaded</artifactId>
+      <version>${uniffle.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalastyle</groupId>
+        <artifactId>scalastyle-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>com.diffplug.spotless</groupId>
+        <artifactId>spotless-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>backends-velox</id>
+      <properties>
+      </properties>
+      <modules>
+        <module>velox</module>
+        <module>package</module>
+      </modules>
+    </profile>
+  </profiles>
+</project>
diff --git a/gluten-uniffle/velox/pom.xml b/gluten-uniffle/velox/pom.xml
new file mode 100755
index 000000000..19865fa69
--- /dev/null
+++ b/gluten-uniffle/velox/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>gluten-uniffle</artifactId>
+    <groupId>org.apache.gluten</groupId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>gluten-uniffle-velox</artifactId>
+  <packaging>jar</packaging>
+  <name>Gluten Uniffle Velox</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>backends-velox</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-data</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalastyle</groupId>
+        <artifactId>scalastyle-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>com.diffplug.spotless</groupId>
+        <artifactId>spotless-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
new file mode 100644
index 000000000..9ae62f8b8
--- /dev/null
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.uniffle;
+
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandle;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
+import org.apache.uniffle.common.exception.RssException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UniffleShuffleManager extends RssShuffleManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(UniffleShuffleManager.class);
+
+  private boolean isDriver() {
+    return "driver".equals(SparkEnv.get().executorId());
+  }
+
+  public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
+    super(conf, isDriver);
+    conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssSparkConfig.RSS_ROW_BASED, "false");
+  }
+
+  @Override
+  public <K, V, C> ShuffleHandle registerShuffle(
+      int shuffleId, ShuffleDependency<K, V, C> dependency) {
+    return super.registerShuffle(shuffleId, dependency);
+  }
+
+  @Override
+  public <K, V> ShuffleWriter<K, V> getWriter(
+      ShuffleHandle handle, long mapId, TaskContext context, 
ShuffleWriteMetricsReporter metrics) {
+    if (!(handle instanceof RssShuffleHandle)) {
+      throw new RssException("Unexpected ShuffleHandle:" + 
handle.getClass().getName());
+    }
+    RssShuffleHandle<K, V, V> rssHandle = (RssShuffleHandle<K, V, V>) handle;
+    if (rssHandle.getDependency() instanceof ColumnarShuffleDependency) {
+      setPusherAppId(rssHandle);
+      String taskId = "" + context.taskAttemptId() + "_" + 
context.attemptNumber();
+      ShuffleWriteMetrics writeMetrics;
+      if (metrics != null) {
+        writeMetrics = new WriteMetrics(metrics);
+      } else {
+        writeMetrics = context.taskMetrics().shuffleWriteMetrics();
+      }
+      return new VeloxUniffleColumnarShuffleWriter<>(
+          context.partitionId(),
+          rssHandle.getAppId(),
+          rssHandle.getShuffleId(),
+          taskId,
+          context.taskAttemptId(),
+          writeMetrics,
+          this,
+          sparkConf,
+          shuffleWriteClient,
+          rssHandle,
+          this::markFailedTask,
+          context);
+    } else {
+      return super.getWriter(handle, mapId, context, metrics);
+    }
+  }
+}
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
new file mode 100644
index 000000000..17cfce1c0
--- /dev/null
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.writer;
+
+import org.apache.gluten.GlutenConfig;
+import org.apache.gluten.columnarbatch.ColumnarBatches;
+import org.apache.gluten.memory.memtarget.MemoryTarget;
+import org.apache.gluten.memory.memtarget.Spiller;
+import org.apache.gluten.memory.memtarget.Spillers;
+import org.apache.gluten.memory.nmm.NativeMemoryManagers;
+import org.apache.gluten.vectorized.ShuffleWriterJniWrapper;
+import org.apache.gluten.vectorized.SplitResult;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.memory.SparkMemoryUtil;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.GlutenShuffleUtils;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.util.SparkResourceUtil;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.Option;
+import scala.Product2;
+import scala.collection.Iterator;
+
+public class VeloxUniffleColumnarShuffleWriter<K, V> extends 
RssShuffleWriter<K, V, V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(VeloxUniffleColumnarShuffleWriter.class);
+
+  private long nativeShuffleWriter = -1L;
+
+  private boolean stopping = false;
+  private int compressThreshold = 
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
+  private double reallocThreshold = 
GlutenConfig.getConf().columnarShuffleReallocThreshold();
+  private String compressionCodec;
+  private int compressionLevel;
+  private int partitionId;
+
+  private ShuffleWriterJniWrapper jniWrapper = 
ShuffleWriterJniWrapper.create();
+  private SplitResult splitResult;
+  private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
+  private int bufferSize;
+  private PartitionPusher partitionPusher;
+
+  private final ColumnarShuffleDependency<K, V, V> columnarDep;
+  private final SparkConf sparkConf;
+
+  private long availableOffHeapPerTask() {
+    return SparkMemoryUtil.getCurrentAvailableOffHeapMemory()
+        / SparkResourceUtil.getTaskSlots(sparkConf);
+  }
+
+  public VeloxUniffleColumnarShuffleWriter(
+      int partitionId,
+      String appId,
+      int shuffleId,
+      String taskId,
+      long taskAttemptId,
+      ShuffleWriteMetrics shuffleWriteMetrics,
+      RssShuffleManager shuffleManager,
+      SparkConf sparkConf,
+      ShuffleWriteClient shuffleWriteClient,
+      RssShuffleHandle<K, V, V> rssHandle,
+      Function<String, Boolean> taskFailureCallback,
+      TaskContext context) {
+    super(
+        appId,
+        shuffleId,
+        taskId,
+        taskAttemptId,
+        shuffleWriteMetrics,
+        shuffleManager,
+        sparkConf,
+        shuffleWriteClient,
+        rssHandle,
+        taskFailureCallback,
+        context);
+    columnarDep = (ColumnarShuffleDependency<K, V, V>) 
rssHandle.getDependency();
+    this.partitionId = partitionId;
+    this.sparkConf = sparkConf;
+    bufferSize =
+        (int)
+            sparkConf.getSizeAsBytes(
+                RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(),
+                RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
+    if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
+      compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
+    }
+    compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, 
compressionCodec, null);
+  }
+
+  @Override
+  protected void writeImpl(Iterator<Product2<K, V>> records) throws 
IOException {
+    if (!records.hasNext() && !isMemoryShuffleEnabled) {
+      super.sendCommit();
+      return;
+    }
+    // writer already init
+    partitionPusher = new PartitionPusher(this);
+    while (records.hasNext()) {
+      ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
+      if (cb.numRows() == 0 || cb.numCols() == 0) {
+        LOG.info("Skip ColumnarBatch of 0 rows or 0 cols");
+      } else {
+        long handle = ColumnarBatches.getNativeHandle(cb);
+        if (nativeShuffleWriter == -1) {
+          nativeShuffleWriter =
+              jniWrapper.makeForRSS(
+                  columnarDep.nativePartitioning(),
+                  nativeBufferSize,
+                  // use field do this
+                  compressionCodec,
+                  compressionLevel,
+                  compressThreshold,
+                  GlutenConfig.getConf().columnarShuffleCompressionMode(),
+                  bufferSize,
+                  partitionPusher,
+                  NativeMemoryManagers.create(
+                          "UniffleShuffleWriter",
+                          new Spiller() {
+                            @Override
+                            public long spill(MemoryTarget self, long size) {
+                              if (nativeShuffleWriter == -1) {
+                                throw new IllegalStateException(
+                                    "Fatal: spill() called before a shuffle 
shuffle writer "
+                                        + "evaluator is created. This behavior 
should be"
+                                        + "optimized by moving memory "
+                                        + "allocations from make() to 
split()");
+                              }
+                              LOG.info(
+                                  "Gluten shuffle writer: Trying to push {} 
bytes of data", size);
+                              long pushed =
+                                  jniWrapper.nativeEvict(nativeShuffleWriter, 
size, false);
+                              LOG.info(
+                                  "Gluten shuffle writer: Pushed {} / {} bytes 
of data",
+                                  pushed,
+                                  size);
+                              return pushed;
+                            }
+
+                            @Override
+                            public Set<Phase> applicablePhases() {
+                              return Spillers.PHASE_SET_SPILL_ONLY;
+                            }
+                          })
+                      .getNativeInstanceHandle(),
+                  handle,
+                  taskAttemptId,
+                  GlutenShuffleUtils.getStartPartitionId(
+                      columnarDep.nativePartitioning(), partitionId),
+                  "uniffle",
+                  reallocThreshold);
+        }
+        long startTime = System.nanoTime();
+        long bytes =
+            jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle, 
availableOffHeapPerTask());
+        LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(), 
bytes);
+        columnarDep.metrics().get("dataSize").get().add(bytes);
+        // this metric replace part of uniffle shuffle write time
+        columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - 
startTime);
+        columnarDep.metrics().get("numInputRows").get().add(cb.numRows());
+        columnarDep.metrics().get("inputBatches").get().add(1);
+        shuffleWriteMetrics.incRecordsWritten(cb.numRows());
+      }
+    }
+
+    long startTime = System.nanoTime();
+    LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+    if (nativeShuffleWriter == -1L) {
+      throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+    }
+    splitResult = jniWrapper.stop(nativeShuffleWriter);
+    columnarDep
+        .metrics()
+        .get("splitTime")
+        .get()
+        .add(
+            System.nanoTime()
+                - startTime
+                - splitResult.getTotalPushTime()
+                - splitResult.getTotalWriteTime()
+                - splitResult.getTotalCompressTime());
+
+    shuffleWriteMetrics.incBytesWritten(splitResult.getTotalBytesWritten());
+    shuffleWriteMetrics.incWriteTime(
+        splitResult.getTotalWriteTime() + splitResult.getTotalPushTime());
+    // partitionLengths is calculate in uniffle side
+
+    long pushMergedDataTime = System.nanoTime();
+    // clear all
+    sendRestBlockAndWait();
+    if (!isMemoryShuffleEnabled) {
+      super.sendCommit();
+    }
+    long writeDurationMs = System.nanoTime() - pushMergedDataTime;
+    shuffleWriteMetrics.incWriteTime(writeDurationMs);
+    LOG.info(
+        "Finish write shuffle  with rest write {} ms",
+        TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
+  }
+
+  @Override
+  public Option<MapStatus> stop(boolean success) {
+    if (!stopping) {
+      stopping = true;
+      closeShuffleWriter();
+      return super.stop(success);
+    }
+    return Option.empty();
+  }
+
+  private void closeShuffleWriter() {
+    if (nativeShuffleWriter != -1) {
+      jniWrapper.close(nativeShuffleWriter);
+      nativeShuffleWriter = -1;
+    }
+  }
+
+  private void sendRestBlockAndWait() {
+    List<ShuffleBlockInfo> shuffleBlockInfos = 
super.getBufferManager().clear();
+    super.processShuffleBlockInfos(shuffleBlockInfos);
+    // make checkBlockSendResult no arguments
+    super.internalCheckBlockSendResult();
+  }
+
+  public int doAddByte(int partitionId, byte[] data, int length) {
+    List<ShuffleBlockInfo> shuffleBlockInfos =
+        super.getBufferManager()
+            .addPartitionData(partitionId, data, length, 
System.currentTimeMillis());
+    super.processShuffleBlockInfos(shuffleBlockInfos);
+    return length;
+  }
+}
diff --git 
a/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
 
b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
new file mode 100644
index 000000000..eb99fd23b
--- /dev/null
+++ 
b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.writer
+
+import java.io.IOException
+
+class PartitionPusher(val uniffleWriter: VeloxUniffleColumnarShuffleWriter[_, 
_]) {
+
+  @throws[IOException]
+  def pushPartitionData(partitionId: Int, buffer: Array[Byte], length: Int): 
Int = {
+    uniffleWriter.doAddByte(partitionId, buffer, length)
+  }
+}
diff --git a/package/pom.xml b/package/pom.xml
index 678f13b67..913621ee1 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -80,6 +80,16 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>rss-uniffle</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-uniffle-package</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
   <build>
diff --git a/pom.xml b/pom.xml
index b5833b3ad..b65f17314 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
     
<sparkshim.artifactId>spark-sql-columnar-shims-spark32</sparkshim.artifactId>
     <celeborn.version>0.3.2-incubating</celeborn.version>
     <arrow.version>14.0.1</arrow.version>
+    <uniffle.version>0.8.0</uniffle.version>
     <arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
     <hadoop.version>2.7.4</hadoop.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -237,6 +238,15 @@
         <module>gluten-celeborn</module>
       </modules>
     </profile>
+    <profile>
+      <id>rss-uniffle</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <modules>
+        <module>gluten-uniffle</module>
+      </modules>
+    </profile>
     <profile>
       <id>delta</id>
       <activation>
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9039d2b8d..bc98ea25e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -121,6 +121,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
       .getConfString("spark.shuffle.manager", "sort")
       .contains("celeborn")
 
+  def isUseUniffleShuffleManager: Boolean =
+    conf
+      .getConfString("spark.shuffle.manager", "sort")
+      .contains("RssShuffleManager")
+
   def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED)
 
   def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED)
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
index 15a8e66ca..c0313fe77 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java
@@ -35,10 +35,10 @@ public class TpcMixin {
   @CommandLine.Option(required = true, names = {"--benchmark-type"}, 
description = "TPC benchmark type: h, ds", defaultValue = "h")
   private String benchmarkType;
 
-  @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used: 
vanilla, velox, velox-with-celeborn...", defaultValue = "velox")
+  @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used: 
vanilla, velox, velox-with-celeborn, velox-with-uniffle...", defaultValue = 
"velox")
   private String preset;
 
-  @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline 
preset used: vanilla, velox, velox-with-celeborn...", defaultValue = "vanilla")
+  @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline 
preset used: vanilla, velox, velox-with-celeborn, velox-with-uniffle...", 
defaultValue = "vanilla")
   private String baselinePreset;
 
   @CommandLine.Option(names = {"--log-level"}, description = "Set log level: 0 
for DEBUG, 1 for INFO, 2 for WARN", defaultValue = "2")
@@ -92,6 +92,9 @@ public class TpcMixin {
       case "velox-with-celeborn":
         conf = Constants.VELOX_WITH_CELEBORN_CONF();
         break;
+      case "velox-with-uniffle":
+        conf = Constants.VELOX_WITH_UNIFFLE_CONF();
+        break;
       default:
         throw new IllegalArgumentException("Preset not found: " + preset);
     }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
index 44a025932..7564f6dce 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala
@@ -58,6 +58,25 @@ object Constants {
     .set("spark.celeborn.push.data.timeout", "600s")
     .set("spark.celeborn.push.limit.inFlight.timeout", "1200s")
 
+  val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false)
+    .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true")
+    .set("spark.sql.parquet.enableVectorizedReader", "true")
+    .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+    .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager")
+    .set("spark.rss.coordinator.quorum", "localhost:19999")
+    .set("spark.rss.storage.type", "MEMORY_LOCALFILE")
+    .set("spark.rss.client.type", "GRPC_NETTY")
+    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    .set("spark.shuffle.service.enabled", "false")
+    .set("spark.sql.adaptive.localShuffleReader.enabled", "false")
+    .set("spark.dynamicAllocation.enabled", "false")
+    .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true")
+    
.set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold",
 "0")
+    .set(
+      "spark.gluten.sql.columnar.physicalJoinOptimizeEnable",
+      "false"
+    )
+
   @deprecated
   val TYPE_MODIFIER_DATE_AS_DOUBLE: TypeModifier =
     new TypeModifier(TypeUtils.typeAccepts(_, DateType), DoubleType) {
diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml
index 6852edcfe..3beef2df7 100644
--- a/tools/gluten-it/package/pom.xml
+++ b/tools/gluten-it/package/pom.xml
@@ -118,5 +118,19 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>rss-uniffle</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.uniffle</groupId>
+          
<artifactId>rss-client-spark${spark.major.version}-shaded</artifactId>
+          <version>${uniffle.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 128bf4b26..c0e2fc321 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -22,6 +22,7 @@
     <scala.binary.version>2.12</scala.binary.version>
     <spark.major.version>3</spark.major.version>
     <celeborn.version>0.3.0-incubating</celeborn.version>
+    <uniffle.version>0.8.0</uniffle.version>
     <gluten.version>1.2.0-SNAPSHOT</gluten.version>
     <guava.version>32.0.1-jre</guava.version>
     <tpch.version>1.1</tpch.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to