This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 98b0d124c [#802] feat(spark): Implement ShuffleDataIo (#1226)
98b0d124c is described below
commit 98b0d124cc0bd27acd1b9ef55340390bb227163d
Author: summaryzb <[email protected]>
AuthorDate: Sun Oct 8 20:52:18 2023 -0500
[#802] feat(spark): Implement ShuffleDataIo (#1226)
### What changes were proposed in this pull request?
Implement ShuffleDataIo
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/802
### Does this PR introduce _any_ user-facing change?
Yes. To use spark dynamicAllocation, user should set
`spark.shuffle.sort.io.plugin.class` to
`org.apache.spark.shuffle.RssShuffleDataIo`, otherwise spark3.5 will fail
### How was this patch tested?
Integration test
Manual test
---
README.md | 4 ++
.../org/apache/spark/shuffle/RssShuffleDataIo.java | 43 ++++++++++++++++++++++
.../spark/shuffle/RssShuffleDriverComponents.java | 43 ++++++++++++++++++++++
.../uniffle/test/SparkIntegrationTestBase.java | 2 +
.../org/apache/uniffle/test/GetReaderTest.java | 2 +
5 files changed, 94 insertions(+)
diff --git a/README.md b/README.md
index 95e1c82f2..5ec89f35e 100644
--- a/README.md
+++ b/README.md
@@ -257,6 +257,10 @@ After apply the patch and rebuild spark, add following
configuration in spark co
spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true
```
+For spark3.5 or above just add one more configuration:
+ ```
+ spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo
+ ```
### Deploy MapReduce Client
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
new file mode 100644
index 000000000..edbe25c9f
--- /dev/null
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.api.ShuffleDataIO;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents;
+
+public class RssShuffleDataIo implements ShuffleDataIO {
+ private final SparkConf sparkConf;
+
+ public RssShuffleDataIo(SparkConf sparkConf) {
+ this.sparkConf = sparkConf;
+ }
+
+ /** Compatible with SortShuffleManager when DelegationRssShuffleManager
fallback */
+ @Override
+ public ShuffleExecutorComponents executor() {
+ return new LocalDiskShuffleExecutorComponents(sparkConf);
+ }
+
+ @Override
+ public ShuffleDriverComponents driver() {
+ return new RssShuffleDriverComponents(sparkConf);
+ }
+}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
new file mode 100644
index 000000000..893b2e8f4
--- /dev/null
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents;
+
+public class RssShuffleDriverComponents extends
LocalDiskShuffleDriverComponents {
+
+ private final SparkConf sparkConf;
+
+ public RssShuffleDriverComponents(SparkConf sparkConf) {
+ this.sparkConf = sparkConf;
+ }
+
+ /**
+ * Omitting @Override annotation to avoid compile error before Spark 3.5.0
+ *
+ * <p>This method is called after DelegationRssShuffleManager initialize, so
+ * RssSparkConfig.RSS_ENABLED must be already set
+ */
+ public boolean supportsReliableStorage() {
+ return sparkConf.get(RssSparkConfig.RSS_ENABLED)
+ || RssShuffleManager.class
+ .getCanonicalName()
+ .equals(sparkConf.get("spark.shuffle.manager"));
+ }
+}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index ac37f03ce..6d48b901c 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -101,6 +101,8 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
public void updateSparkConfWithRss(SparkConf sparkConf) {
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
+ sparkConf.set(
+ "spark.shuffle.sort.io.plugin.class",
"org.apache.spark.shuffle.RssShuffleDataIo");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "4m");
sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "32m");
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 1b92530ec..1aaf7eed9 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -69,6 +69,8 @@ public class GetReaderTest extends IntegrationTestBase {
public void test() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
+ sparkConf.set(
+ "spark.shuffle.sort.io.plugin.class",
"org.apache.spark.shuffle.RssShuffleDataIo");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.setMaster("local[4]");