This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a8d20f2a [#635] feat(client): enable LOCAL_ORDER by default for Spark
AQE (#644)
a8d20f2a is described below
commit a8d20f2a30bab1d1a6d0df37f87e8998ab87d28d
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Feb 23 09:50:17 2023 +0800
[#635] feat(client): enable LOCAL_ORDER by default for Spark AQE (#644)
### What changes were proposed in this pull request?
enable LOCAL_ORDER by default for Spark AQE
### Why are the changes needed?
Currently, the local_order data distribution type should be activated
explicitly. It's better to enable it when using AQE by default.
Fix: #635
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
1. Existing UTs
---
client-spark/spark3/pom.xml | 6 ++
.../apache/spark/shuffle/RssShuffleManager.java | 15 +++-
.../spark/shuffle/RssShuffleManagerTest.java | 86 ++++++++++++++++++++++
docs/client_guide.md | 9 +--
4 files changed, 108 insertions(+), 8 deletions(-)
diff --git a/client-spark/spark3/pom.xml b/client-spark/spark3/pom.xml
index 5860d6c8..38147246 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/spark3/pom.xml
@@ -79,6 +79,12 @@
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index a37453ee..e08607ec 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.shuffle.writer.BufferManagerOptions;
import org.apache.spark.shuffle.writer.RssShuffleWriter;
import org.apache.spark.shuffle.writer.WriteBufferManager;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManagerId;
import org.apache.spark.util.EventLoop;
@@ -71,6 +72,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -177,7 +179,7 @@ public class RssShuffleManager implements ShuffleManager {
final int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dynamicConfEnabled =
sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
- this.dataDistributionType =
RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+ this.dataDistributionType = getDataDistributionType(sparkConf);
long retryIntervalMax =
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
int heartBeatThreadNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
this.dataTransferPoolSize =
sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
@@ -220,6 +222,17 @@ public class RssShuffleManager implements ShuffleManager {
}
}
+ @VisibleForTesting
+ protected static ShuffleDataDistributionType
getDataDistributionType(SparkConf sparkConf) {
+ RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+ if ((boolean) sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED())
+ && !rssConf.containsKey(RssClientConf.DATA_DISTRIBUTION_TYPE.key())) {
+ return ShuffleDataDistributionType.LOCAL_ORDER;
+ }
+
+ return rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+ }
+
// For testing only
@VisibleForTesting
RssShuffleManager(
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
new file mode 100644
index 00000000..00bc591c
--- /dev/null
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssShuffleManagerTest {
+ private static final String SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY =
"spark.sql.adaptive.enabled";
+
+ @Test
+ public void testGetDataDistributionType() {
+ // case1
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+ assertEquals(
+ ShuffleDataDistributionType.LOCAL_ORDER,
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+
+ // case2
+ sparkConf = new SparkConf();
+ sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "false");
+ assertEquals(
+ RssClientConf.DATA_DISTRIBUTION_TYPE.defaultValue(),
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+
+ // case3
+ sparkConf = new SparkConf();
+ sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+ sparkConf.set("spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(),
ShuffleDataDistributionType.NORMAL.name());
+ assertEquals(
+ ShuffleDataDistributionType.NORMAL,
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+
+ // case4
+ sparkConf = new SparkConf();
+ sparkConf.set(SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY, "true");
+ sparkConf.set(
+ "spark." + RssClientConf.DATA_DISTRIBUTION_TYPE.key(),
+ ShuffleDataDistributionType.LOCAL_ORDER.name()
+ );
+ assertEquals(
+ ShuffleDataDistributionType.LOCAL_ORDER,
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+
+ // case5
+ sparkConf = new SparkConf();
+ boolean aqeEnable = (boolean)
sparkConf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED());
+ if (aqeEnable) {
+ assertEquals(
+ ShuffleDataDistributionType.LOCAL_ORDER,
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+ } else {
+ assertEquals(
+ RssClientConf.DATA_DISTRIBUTION_TYPE.defaultValue(),
+ RssShuffleManager.getDataDistributionType(sparkConf)
+ );
+ }
+ }
+}
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 8b1b008e..413a5334 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -62,13 +62,8 @@ After apply the patch and rebuild spark, add following
configuration in spark co
To improve performance of AQE skew optimization, uniffle introduces the
LOCAL_ORDER shuffle-data distribution mechanism
and Continuous partition assignment mechanism.
-1. LOCAL_ORDER shuffle-data distribution mechanism filter the lots of data to
reduce network bandwidth and shuffle-server local-disk pressure.
-
- It can be enabled by the following config
- ```bash
- # Default value is NORMAL, it will directly append to file when the
memory data is flushed to external storage
- spark.rss.client.shuffle.data.distribution.type LOCAL_ORDER
- ```
+1. LOCAL_ORDER shuffle-data distribution mechanism filter the lots of data to
reduce network bandwidth and shuffle-server local-disk pressure.
+ It will be enabled by default when AQE is enabled.
2. Continuous partition assignment mechanism assign consecutive partitions to
the same ShuffleServer to reduce the frequency of getShuffleResult.