PHILO-HE commented on code in PR #3767:
URL: https://github.com/apache/incubator-gluten/pull/3767#discussion_r1554839641
##########
.github/workflows/velox_docker.yml:
##########
@@ -275,6 +276,66 @@ jobs:
# -d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
# -d=OFFHEAP_SIZE:1g,spark.memory.offHeap.size=1g || true
+ run-tpc-test-centos8-uniffle:
+ needs: build-native-lib
+ strategy:
+ fail-fast: false
+ matrix:
+ spark: ["spark-3.2"]
+ runs-on: ubuntu-20.04
+ container: centos:8
+ steps:
+ - uses: actions/checkout@v2
+ - name: Download All Artifacts
+ uses: actions/download-artifact@v2
+ with:
+ name: velox-native-lib-${{github.sha}}
+ path: ./cpp/build/releases
+ - name: Update mirror list
+ run: |
+ sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-*
|| true
+ sed -i -e
"s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g"
/etc/yum.repos.d/CentOS-* || true
+ - name: Setup java and maven
+ run: |
+ yum update -y && yum install -y java-1.8.0-openjdk-devel wget git
+ wget
https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
+ tar -xvf apache-maven-3.8.8-bin.tar.gz
+ mv apache-maven-3.8.8 /usr/lib/maven
+ - name: Build for Spark ${{ matrix.spark }}
+ run: |
+ cd $GITHUB_WORKSPACE/ && \
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ mvn clean install -P${{ matrix.spark }} -Pbackends-velox
-Prss-uniffle -DskipTests
+ - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle
0.8.0
+ run: |
+ export MAVEN_HOME=/usr/lib/maven && \
+ export PATH=${PATH}:${MAVEN_HOME}/bin && \
+ export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \
+ cd /opt && \
+ git clone -b branch-0.8
https://github.com/apache/incubator-uniffle.git && \
+ cd incubator-uniffle && \
+ sed -i '250d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ sed -i '228d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ sed -i '226d'
./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \
+ mvn clean install -Phadoop2.8 -DskipTests
+ cd /opt && \
+ wget -nv
https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz
&& \
+ tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv
/opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \
+ wget -nv
https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz
&& \
+ tar xzf hadoop-2.8.5.tar.gz -C /opt/
+ rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar
+ cp
/opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar
/opt/uniffle/jars/server/
+ rm -rf /opt/incubator-uniffle
+ cd /opt/uniffle && mkdir shuffle_data && \
+ echo -e "XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5" >
./bin/rss-env.sh && \
Review Comment:
I found "echo -e" requires to be executed by `bash`, and it's NOT `sh`
compliant. So let's explicitly use `bash` like this:
https://github.com/apache/incubator-gluten/blob/main/.github/workflows/velox_docker.yml#L363
##########
cpp/core/jni/JniWrapper.cc:
##########
@@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
std::move(celebornClient));
+ } else if (partitionWriterType == "uniffle") {
+ jclass unifflePartitionPusherClass =
+ createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/shuffle/writer/PartitionPusher;");
+ jmethodID unifflePushPartitionDataMethod =
+ getMethodIdOrError(env, unifflePartitionPusherClass,
"pushPartitionData", "(I[BI)I");
+ JavaVM* vm;
+ if (env->GetJavaVM(&vm) != JNI_OK) {
+ throw gluten::GlutenException("Unable to get JavaVM instance");
+ }
+ // rename CelebornClient RssClient
+ std::shared_ptr<CelebornClient> uniffleClient =
+ std::make_shared<CelebornClient>(vm, partitionPusher,
unifflePushPartitionDataMethod);
+ partitionWriter = std::make_unique<CelebornPartitionWriter>(
Review Comment:
Like the above comment, better to use a common name if they indeed can be
shared by both celeborn & uniffle.
##########
gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/GlutenRssShuffleManager.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.uniffle;
+
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandle;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.sort.ColumnarShuffleManager;
+import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
+import org.apache.uniffle.common.exception.RssException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+public class GlutenRssShuffleManager extends RssShuffleManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlutenRssShuffleManager.class);
+ private static final String GLUTEN_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.sort.ColumnarShuffleManager";
+
+ private static final String VANILLA_UNIFFLE_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager";
+ private volatile ColumnarShuffleManager _columnarShuffleManager;
+ private volatile RssShuffleManager _vanillaUniffleShuffleManager;
+
+ private ColumnarShuffleManager columnarShuffleManager() {
+ if (_columnarShuffleManager == null) {
+ synchronized (this) {
+ if (_columnarShuffleManager == null) {
+ _columnarShuffleManager =
+ initShuffleManager(GLUTEN_SHUFFLE_MANAGER_NAME, sparkConf,
isDriver());
+ }
+ }
+ }
+ return _columnarShuffleManager;
+ }
+
+ private RssShuffleManager vanillaUniffleShuffleManager() {
+ if (_vanillaUniffleShuffleManager == null) {
+ synchronized (this) {
+ if (_vanillaUniffleShuffleManager == null) {
+ initShuffleManager(VANILLA_UNIFFLE_SHUFFLE_MANAGER_NAME, sparkConf,
isDriver());
+ }
+ }
+ }
+ return _vanillaUniffleShuffleManager;
+ }
+
+ private boolean isDriver() {
+ return "driver".equals(SparkEnv.get().executorId());
+ }
+
+ private ColumnarShuffleManager initShuffleManager(String name, SparkConf
conf, boolean isDriver) {
+ Constructor constructor;
+ ColumnarShuffleManager instance;
+ try {
+ Class klass = Class.forName(name);
+ try {
+ constructor = klass.getConstructor(conf.getClass(), Boolean.TYPE);
+ instance = (ColumnarShuffleManager) constructor.newInstance(conf,
isDriver);
+ } catch (NoSuchMethodException var7) {
+ constructor = klass.getConstructor(conf.getClass());
+ instance = (ColumnarShuffleManager) constructor.newInstance(conf);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("initColumnManager fail");
+ }
+ return instance;
+ }
+
+ public GlutenRssShuffleManager(SparkConf conf, boolean isDriver) {
+ super(conf, isDriver);
+ conf.set("spark.rss.row.based", "false");
Review Comment:
Nit: it can have potential issue to set with raw string. Maybe, better to
set it like this:
https://github.com/apache/incubator-gluten/pull/3767/files#diff-c5aeb43f0c96e20fabd596e8d6b8545669ce6bb966ca797f0041b2c3889957aaR112
##########
gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/GlutenRssShuffleManager.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.uniffle;
+
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandle;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.sort.ColumnarShuffleManager;
+import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
+import org.apache.uniffle.common.exception.RssException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+public class GlutenRssShuffleManager extends RssShuffleManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlutenRssShuffleManager.class);
+ private static final String GLUTEN_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.sort.ColumnarShuffleManager";
+
+ private static final String VANILLA_UNIFFLE_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager";
+ private volatile ColumnarShuffleManager _columnarShuffleManager;
+ private volatile RssShuffleManager _vanillaUniffleShuffleManager;
+
+ private ColumnarShuffleManager columnarShuffleManager() {
+ if (_columnarShuffleManager == null) {
+ synchronized (this) {
+ if (_columnarShuffleManager == null) {
+ _columnarShuffleManager =
+ initShuffleManager(GLUTEN_SHUFFLE_MANAGER_NAME, sparkConf,
isDriver());
+ }
+ }
+ }
+ return _columnarShuffleManager;
+ }
+
+ private RssShuffleManager vanillaUniffleShuffleManager() {
+ if (_vanillaUniffleShuffleManager == null) {
+ synchronized (this) {
+ if (_vanillaUniffleShuffleManager == null) {
+ initShuffleManager(VANILLA_UNIFFLE_SHUFFLE_MANAGER_NAME, sparkConf,
isDriver());
+ }
+ }
+ }
+ return _vanillaUniffleShuffleManager;
+ }
+
+ private boolean isDriver() {
+ return "driver".equals(SparkEnv.get().executorId());
+ }
+
+ private ColumnarShuffleManager initShuffleManager(String name, SparkConf
conf, boolean isDriver) {
+ Constructor constructor;
+ ColumnarShuffleManager instance;
+ try {
+ Class klass = Class.forName(name);
+ try {
+ constructor = klass.getConstructor(conf.getClass(), Boolean.TYPE);
+ instance = (ColumnarShuffleManager) constructor.newInstance(conf,
isDriver);
+ } catch (NoSuchMethodException var7) {
+ constructor = klass.getConstructor(conf.getClass());
+ instance = (ColumnarShuffleManager) constructor.newInstance(conf);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("initColumnManager fail");
+ }
+ return instance;
+ }
+
+ public GlutenRssShuffleManager(SparkConf conf, boolean isDriver) {
+ super(conf, isDriver);
+ conf.set("spark.rss.row.based", "false");
+ }
+
+ @Override
+ public <K, V, C> ShuffleHandle registerShuffle(
+ int shuffleId, ShuffleDependency<K, V, C> dependency) {
+ return super.registerShuffle(shuffleId, dependency);
+ }
+
+ @Override
+ public <K, V> ShuffleWriter<K, V> getWriter(
+ ShuffleHandle handle, long mapId, TaskContext context,
ShuffleWriteMetricsReporter metrics) {
+ if (!(handle instanceof RssShuffleHandle)) {
+ throw new RssException("Unexpected ShuffleHandle:" +
handle.getClass().getName());
+ }
+ sparkConf.setIfMissing(
Review Comment:
Do we need this code? Seems the setting during the instantiation of
`GlutenRssShuffleManager` is enough.
##########
gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/GlutenRssShuffleManager.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.uniffle;
+
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.ColumnarShuffleDependency;
+import org.apache.spark.shuffle.RssShuffleHandle;
+import org.apache.spark.shuffle.RssShuffleManager;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandle;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.sort.ColumnarShuffleManager;
+import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter;
+import org.apache.uniffle.common.exception.RssException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+public class GlutenRssShuffleManager extends RssShuffleManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlutenRssShuffleManager.class);
+ private static final String GLUTEN_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.sort.ColumnarShuffleManager";
+
+ private static final String VANILLA_UNIFFLE_SHUFFLE_MANAGER_NAME =
+ "org.apache.spark.shuffle.celeborn.SparkShuffleManager";
+ private volatile ColumnarShuffleManager _columnarShuffleManager;
+ private volatile RssShuffleManager _vanillaUniffleShuffleManager;
+
+ private ColumnarShuffleManager columnarShuffleManager() {
Review Comment:
Looks `columnarShuffleManager()` & `vanillaUniffleShuffleManager()` are not
really used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]