This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bedb2ee0b [GLUTEN-6483] Support Uniffle 0.9.0 (#6484)
bedb2ee0b is described below

commit bedb2ee0ba4bf1bb071234368f5d095c6b912afd
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Aug 1 19:01:17 2024 +0800

    [GLUTEN-6483] Support Uniffle 0.9.0 (#6484)
---
 .github/workflows/velox_docker.yml                 | 37 +++++++++++-----------
 .../gluten/uniffle/UniffleShuffleManager.java      | 14 +-------
 .../writer/VeloxUniffleColumnarShuffleWriter.java  | 35 +++++++++++---------
 pom.xml                                            |  2 +-
 tools/gluten-it/pom.xml                            |  2 +-
 5 files changed, 42 insertions(+), 48 deletions(-)

diff --git a/.github/workflows/velox_docker.yml 
b/.github/workflows/velox_docker.yml
index 913a55cbd..ee6bf9797 100644
--- a/.github/workflows/velox_docker.yml
+++ b/.github/workflows/velox_docker.yml
@@ -492,38 +492,39 @@ jobs:
           wget 
https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
           tar -xvf apache-maven-3.8.8-bin.tar.gz
           mv apache-maven-3.8.8 /usr/lib/maven
-      - name: Build for Spark ${{ matrix.spark }}
-        run: |
-          cd $GITHUB_WORKSPACE/ && \
-          export MAVEN_HOME=/usr/lib/maven && \
-          export PATH=${PATH}:${MAVEN_HOME}/bin && \
-          $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox 
-Puniffle -DskipTests
-      - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 
0.8.0
+      - name: Build for Uniffle 0.9.0
         run: |
           export MAVEN_HOME=/usr/lib/maven && \
           export PATH=${PATH}:${MAVEN_HOME}/bin && \
           export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
           cd /opt && \
-          git clone -b branch-0.8 
https://github.com/apache/incubator-uniffle.git && \
+          git clone -b v0.9.0 https://github.com/apache/incubator-uniffle.git 
&& \
           cd incubator-uniffle && \
-          sed -i '250d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
-          sed -i '228d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
-          sed -i '226d' 
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
-          $MVN_CMD clean install -Phadoop2.8 -DskipTests
+          $MVN_CMD clean install -Phadoop2.8,spark3 -DskipTests
           cd /opt && \
-          wget -nv 
https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz
 && \
-          tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv 
/opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
+          wget -nv 
https://archive.apache.org/dist/incubator/uniffle/0.9.0/apache-uniffle-0.9.0-incubating-bin.tar.gz
 && \
+          tar xzf apache-uniffle-0.9.0-incubating-bin.tar.gz -C /opt/ && mv 
/opt/rss-0.9.0-hadoop2.8 /opt/uniffle && \
           wget -nv 
https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz 
&& \
           tar xzf hadoop-2.8.5.tar.gz -C /opt/
-          rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
-          cp 
/opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar 
/opt/uniffle/jars/server/
           rm -rf /opt/incubator-uniffle
           cd /opt/uniffle && mkdir shuffle_data && \
           bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > 
./bin/rss-env.sh" && \
           bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 
1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \
-          bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 
19997\nrss.jetty.http.port 19996\nrss.server.netty.port 
19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type 
MEMORY_LOCALFILE\nrss.coordinator.quorum 
localhost:19999\nrss.server.flush.thread.alive 
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
+          bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 
7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 
19997\nrss.rpc.server.type GRPC_NETTY\nrss.jetty.http.port 
19996\nrss.server.netty.port 19995\nrss.storage.basePath 
/opt/uniffle/shuffle_data\nrss.storage.type 
MEMORY_LOCALFILE\nrss.coordinator.quorum 
localhost:19999\nrss.server.flush.thread.alive 
10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \
           bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh
-          cd $GITHUB_WORKSPACE/tools/gluten-it && $MVN_CMD clean install 
-Pspark-3.2 -Puniffle && \
+      - name: Build for Spark ${{ matrix.spark }}
+        run: |
+          export MAVEN_HOME=/usr/lib/maven && \
+          export PATH=${PATH}:${MAVEN_HOME}/bin && \
+          cd $GITHUB_WORKSPACE/ && \
+          $MVN_CMD clean install -P${{ matrix.spark }} -Pbackends-velox 
-Puniffle -DskipTests
+      - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with 
uniffle-0.9.0
+        run: |
+          export MAVEN_HOME=/usr/lib/maven && \
+          export PATH=${PATH}:${MAVEN_HOME}/bin && \
+          export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
+          cd $GITHUB_WORKSPACE/tools/gluten-it && \
+          $MVN_CMD clean install -Pspark-3.2 -Puniffle  && \
           GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
             --local --preset=velox-with-uniffle --benchmark-type=h 
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1
 
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
index f91141c1e..eb66dae90 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -16,9 +16,7 @@
  */
 package org.apache.spark.shuffle.gluten.uniffle;
 
-import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.shuffle.ColumnarShuffleDependency;
@@ -36,21 +34,11 @@ import org.slf4j.LoggerFactory;
 public class UniffleShuffleManager extends RssShuffleManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(UniffleShuffleManager.class);
 
-  private boolean isDriver() {
-    return "driver".equals(SparkEnv.get().executorId());
-  }
-
   public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
     super(conf, isDriver);
     conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssSparkConfig.RSS_ROW_BASED.key(), "false");
   }
 
-  @Override
-  public <K, V, C> ShuffleHandle registerShuffle(
-      int shuffleId, ShuffleDependency<K, V, C> dependency) {
-    return super.registerShuffle(shuffleId, dependency);
-  }
-
   @Override
   public <K, V> ShuffleWriter<K, V> getWriter(
       ShuffleHandle handle, long mapId, TaskContext context, 
ShuffleWriteMetricsReporter metrics) {
@@ -62,7 +50,7 @@ public class UniffleShuffleManager extends RssShuffleManager {
       ColumnarShuffleDependency<K, V, V> dependency =
           (ColumnarShuffleDependency<K, V, V>) rssHandle.getDependency();
       setPusherAppId(rssHandle);
-      String taskId = "" + context.taskAttemptId() + "_" + 
context.attemptNumber();
+      String taskId = context.taskAttemptId() + "_" + context.attemptNumber();
       ShuffleWriteMetrics writeMetrics;
       if (metrics != null) {
         writeMetrics = new WriteMetrics(metrics);
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index b84c9d4ee..a80e34fb1 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.util.SparkResourceUtil;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.exception.RssException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,19 +62,18 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
   private long nativeShuffleWriter = -1L;
 
   private boolean stopping = false;
-  private int compressThreshold = 
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
-  private double reallocThreshold = 
GlutenConfig.getConf().columnarShuffleReallocThreshold();
+  private final int compressThreshold =
+      GlutenConfig.getConf().columnarShuffleCompressionThreshold();
+  private final double reallocThreshold = 
GlutenConfig.getConf().columnarShuffleReallocThreshold();
   private String compressionCodec;
-  private int compressionLevel;
-  private int partitionId;
+  private final int compressionLevel;
+  private final int partitionId;
 
-  private Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
-  private ShuffleWriterJniWrapper jniWrapper = 
ShuffleWriterJniWrapper.create(runtime);
-  private SplitResult splitResult;
-  private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
-  private int bufferSize;
-  private PartitionPusher partitionPusher;
-  private Boolean isSort;
+  private final Runtime runtime = 
Runtimes.contextInstance("UniffleShuffleWriter");
+  private final ShuffleWriterJniWrapper jniWrapper = 
ShuffleWriterJniWrapper.create(runtime);
+  private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
+  private final int bufferSize;
+  private final Boolean isSort;
 
   private final ColumnarShuffleDependency<K, V, V> columnarDep;
   private final SparkConf sparkConf;
@@ -125,13 +125,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
   }
 
   @Override
-  protected void writeImpl(Iterator<Product2<K, V>> records) throws 
IOException {
+  protected void writeImpl(Iterator<Product2<K, V>> records) {
     if (!records.hasNext() && !isMemoryShuffleEnabled) {
       super.sendCommit();
       return;
     }
     // writer already init
-    partitionPusher = new PartitionPusher(this);
+    PartitionPusher partitionPusher = new PartitionPusher(this);
     while (records.hasNext()) {
       ColumnarBatch cb = (ColumnarBatch) (records.next()._2());
       if (cb.numRows() == 0 || cb.numCols() == 0) {
@@ -194,7 +194,12 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
     if (nativeShuffleWriter == -1L) {
       throw new IllegalStateException("nativeShuffleWriter should not be -1L");
     }
-    splitResult = jniWrapper.stop(nativeShuffleWriter);
+    SplitResult splitResult;
+    try {
+      splitResult = jniWrapper.stop(nativeShuffleWriter);
+    } catch (IOException e) {
+      throw new RssException(e);
+    }
     columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() - 
startTime);
     columnarDep
         .metrics()
@@ -220,7 +225,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
     long writeDurationMs = System.nanoTime() - pushMergedDataTime;
     shuffleWriteMetrics.incWriteTime(writeDurationMs);
     LOG.info(
-        "Finish write shuffle  with rest write {} ms",
+        "Finish write shuffle with rest write {} ms",
         TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
   }
 
diff --git a/pom.xml b/pom.xml
index bb59ad2a7..cbec5befb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
     <delta.version>2.4.0</delta.version>
     <delta.binary.version>24</delta.binary.version>
     <celeborn.version>0.5.1</celeborn.version>
-    <uniffle.version>0.8.0</uniffle.version>
+    <uniffle.version>0.9.0</uniffle.version>
     <arrow.version>15.0.0</arrow.version>
     <arrow-gluten.version>15.0.0-gluten</arrow-gluten.version>
     <arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 43bc3ae09..5ab633252 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -22,7 +22,7 @@
     <scala.binary.version>2.12</scala.binary.version>
     <spark.major.version>3</spark.major.version>
     <celeborn.version>0.3.2-incubating</celeborn.version>
-    <uniffle.version>0.8.0</uniffle.version>
+    <uniffle.version>0.9.0</uniffle.version>
     <gluten.version>1.2.0-SNAPSHOT</gluten.version>
     <guava.version>32.0.1-jre</guava.version>
     <tpch.version>1.1</tpch.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to