This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 98b0d124c [#802] feat(spark): Implement ShuffleDataIo (#1226)
98b0d124c is described below

commit 98b0d124cc0bd27acd1b9ef55340390bb227163d
Author: summaryzb <[email protected]>
AuthorDate: Sun Oct 8 20:52:18 2023 -0500

    [#802] feat(spark): Implement ShuffleDataIo (#1226)
    
    ### What changes were proposed in this pull request?
    Implement ShuffleDataIo
    
    ### Why are the changes needed?
    https://github.com/apache/incubator-uniffle/issues/802
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. To use spark dynamicAllocation, user should set 
`spark.shuffle.sort.io.plugin.class` to 
`org.apache.spark.shuffle.RssShuffleDataIo`, otherwise spark3.5 will fail
    
    ### How was this patch tested?
    Integration test
    Manual test
---
 README.md                                          |  4 ++
 .../org/apache/spark/shuffle/RssShuffleDataIo.java | 43 ++++++++++++++++++++++
 .../spark/shuffle/RssShuffleDriverComponents.java  | 43 ++++++++++++++++++++++
 .../uniffle/test/SparkIntegrationTestBase.java     |  2 +
 .../org/apache/uniffle/test/GetReaderTest.java     |  2 +
 5 files changed, 94 insertions(+)

diff --git a/README.md b/README.md
index 95e1c82f2..5ec89f35e 100644
--- a/README.md
+++ b/README.md
@@ -257,6 +257,10 @@ After apply the patch and rebuild spark, add following 
configuration in spark co
   spark.shuffle.service.enabled false
   spark.dynamicAllocation.enabled true
   ```
+For spark3.5 or above just add one more configuration:
+  ```
+  spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo
+  ```
 
 ### Deploy MapReduce Client
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
new file mode 100644
index 000000000..edbe25c9f
--- /dev/null
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDataIo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.api.ShuffleDataIO;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents;
+
+public class RssShuffleDataIo implements ShuffleDataIO {
+  private final SparkConf sparkConf;
+
+  public RssShuffleDataIo(SparkConf sparkConf) {
+    this.sparkConf = sparkConf;
+  }
+
+  /** Compatible with SortShuffleManager when DelegationRssShuffleManager 
fallback */
+  @Override
+  public ShuffleExecutorComponents executor() {
+    return new LocalDiskShuffleExecutorComponents(sparkConf);
+  }
+
+  @Override
+  public ShuffleDriverComponents driver() {
+    return new RssShuffleDriverComponents(sparkConf);
+  }
+}
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
new file mode 100644
index 000000000..893b2e8f4
--- /dev/null
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleDriverComponents.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents;
+
+public class RssShuffleDriverComponents extends 
LocalDiskShuffleDriverComponents {
+
+  private final SparkConf sparkConf;
+
+  public RssShuffleDriverComponents(SparkConf sparkConf) {
+    this.sparkConf = sparkConf;
+  }
+
+  /**
+   * Omitting @Override annotation to avoid compile error before Spark 3.5.0
+   *
+   * <p>This method is called after DelegationRssShuffleManager initialize, so
+   * RssSparkConfig.RSS_ENABLED must be already set
+   */
+  public boolean supportsReliableStorage() {
+    return sparkConf.get(RssSparkConfig.RSS_ENABLED)
+        || RssShuffleManager.class
+            .getCanonicalName()
+            .equals(sparkConf.get("spark.shuffle.manager"));
+  }
+}
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index ac37f03ce..6d48b901c 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -101,6 +101,8 @@ public abstract class SparkIntegrationTestBase extends 
IntegrationTestBase {
 
   public void updateSparkConfWithRss(SparkConf sparkConf) {
     sparkConf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.RssShuffleManager");
+    sparkConf.set(
+        "spark.shuffle.sort.io.plugin.class", 
"org.apache.spark.shuffle.RssShuffleDataIo");
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "4m");
     sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "32m");
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 1b92530ec..1aaf7eed9 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -69,6 +69,8 @@ public class GetReaderTest extends IntegrationTestBase {
   public void test() throws Exception {
     SparkConf sparkConf = new SparkConf();
     sparkConf.set("spark.shuffle.manager", 
"org.apache.spark.shuffle.RssShuffleManager");
+    sparkConf.set(
+        "spark.shuffle.sort.io.plugin.class", 
"org.apache.spark.shuffle.RssShuffleDataIo");
     sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
     sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), 
COORDINATOR_QUORUM);
     sparkConf.setMaster("local[4]");

Reply via email to