This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 31cc707d8 [#1316] improvement(spark): detect OutputTracker API version
via Spark version (#1317)
31cc707d8 is described below
commit 31cc707d8a233d90c5e64b136c4d2e03c8f9e76b
Author: Enrico Minack <[email protected]>
AuthorDate: Tue Feb 20 09:23:33 2024 +0100
[#1316] improvement(spark): detect OutputTracker API version via Spark
version (#1317)
### What changes were proposed in this pull request?
Uses the spark version to decide which OutputTracker API to call.
### Why are the changes needed?
Currently, it calls into one possible API version by reflection, and if
this fails, calls into another version. Knowing the actual Spark version allows
to call into the right API version.
Fix: #1316
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No tests added.
---
.../apache/spark/shuffle/SparkVersionUtils.java | 4 +
.../spark/shuffle/SparkVersionUtilsTest.java | 3 +
.../apache/spark/shuffle/RssShuffleManager.java | 108 ++++++++++-----------
.../apache/spark/shuffle/Spark3VersionUtils.java} | 11 +--
.../spark/shuffle/SparkVersionUtilsTest.java | 14 +++
5 files changed, 80 insertions(+), 60 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java
index 616135e0f..a55f0aa7d 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/SparkVersionUtils.java
@@ -47,4 +47,8 @@ public class SparkVersionUtils {
public static boolean isSpark3() {
return MAJOR_VERSION == 3;
}
+
+ public static boolean isSpark320() {
+ return SPARK_VERSION.matches("^3.2.0([^\\d].*)?$");
+ }
}
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
index 776b6dece..58c1fe0ea 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
@@ -19,11 +19,14 @@ package org.apache.spark.shuffle;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SparkVersionUtilsTest {
@Test
public void testSparkVersion() {
assertTrue(SparkVersionUtils.isSpark2());
+ assertFalse(SparkVersionUtils.isSpark3());
+ assertFalse(SparkVersionUtils.isSpark320());
}
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index cb35ce3a2..b3aa46912 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -91,7 +91,6 @@ import static
org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;
public class RssShuffleManager extends RssShuffleManagerBase {
-
private static final Logger LOG =
LoggerFactory.getLogger(RssShuffleManager.class);
private final String clientType;
private final long heartbeatInterval;
@@ -770,32 +769,59 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
int shuffleId, int startPartition, int endPartition, int startMapIndex,
int endMapIndex) {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>
mapStatusIter = null;
- // Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
- // we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
try {
- // attempt to use Spark 3.1's API
- mapStatusIter =
- (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object,
Object>>>>)
- SparkEnv.get()
- .mapOutputTracker()
- .getClass()
- .getDeclaredMethod(
- "getMapSizesByExecutorId",
- int.class,
- int.class,
- int.class,
- int.class,
- int.class)
- .invoke(
- SparkEnv.get().mapOutputTracker(),
- shuffleId,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition);
- } catch (Exception ignored) {
- // fallback and attempt to use Spark 3.0's API
- try {
+ // Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
+ // we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
+ if (Spark3VersionUtils.MAJOR_VERSION > 3
+ || Spark3VersionUtils.MINOR_VERSION > 2
+ || Spark3VersionUtils.MINOR_VERSION == 2 &&
!Spark3VersionUtils.isSpark320()
+ || Spark3VersionUtils.MINOR_VERSION == 1) {
+ // use Spark 3.1's API
+ mapStatusIter =
+ (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object,
Object>>>>)
+ SparkEnv.get()
+ .mapOutputTracker()
+ .getClass()
+ .getDeclaredMethod(
+ "getMapSizesByExecutorId",
+ int.class,
+ int.class,
+ int.class,
+ int.class,
+ int.class)
+ .invoke(
+ SparkEnv.get().mapOutputTracker(),
+ shuffleId,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition);
+ } else if (Spark3VersionUtils.isSpark320()) {
+ // use Spark 3.2.0's API
+ // Each Spark release will be versioned:
[MAJOR].[FEATURE].[MAINTENANCE].
+ // Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
+ // some interfaces were removed wrongly in Spark 3.2.0. And they were
added by Spark
+ // 3.2.1.
+ // So we need to adapt Spark 3.2.0 here
+ mapStatusIter =
+ (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object,
Object>>>>)
+ MapOutputTracker.class
+ .getDeclaredMethod(
+ "getMapSizesByExecutorId",
+ int.class,
+ int.class,
+ int.class,
+ int.class,
+ int.class)
+ .invoke(
+ SparkEnv.get().mapOutputTracker(),
+ shuffleId,
+ startMapIndex,
+ endMapIndex,
+ startPartition,
+ endPartition);
+ } else {
+ // use Spark 3.0's API
mapStatusIter =
(Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object,
Object>>>>)
SparkEnv.get()
@@ -804,35 +830,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
.getDeclaredMethod("getMapSizesByExecutorId", int.class,
int.class, int.class)
.invoke(
SparkEnv.get().mapOutputTracker(), shuffleId,
startPartition, endPartition);
- } catch (Exception ignored1) {
- try {
- // attempt to use Spark 3.2.0's API
- // Each Spark release will be versioned:
[MAJOR].[FEATURE].[MAINTENANCE].
- // Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
- // some interfaces were removed wrongly in Spark 3.2.0. And they
were added by Spark
- // 3.2.1.
- // So we need to adapt Spark 3.2.0 here
- mapStatusIter =
- (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object,
Object>>>>)
- MapOutputTracker.class
- .getDeclaredMethod(
- "getMapSizesByExecutorId",
- int.class,
- int.class,
- int.class,
- int.class,
- int.class)
- .invoke(
- SparkEnv.get().mapOutputTracker(),
- shuffleId,
- startMapIndex,
- endMapIndex,
- startPartition,
- endPartition);
- } catch (Exception e) {
- throw new RssException(e);
- }
}
+ } catch (Exception e) {
+ throw new RssException(e);
}
while (mapStatusIter.hasNext()) {
Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>> tuple2 =
mapStatusIter.next();
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java
similarity index 75%
copy from
client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
copy to
client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java
index 776b6dece..76f464eec 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/Spark3VersionUtils.java
@@ -17,13 +17,12 @@
package org.apache.spark.shuffle;
-import org.junit.jupiter.api.Test;
+import org.apache.spark.package$;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+public class Spark3VersionUtils extends SparkVersionUtils {
+ public static final String SPARK_VERSION_SHORT =
package$.MODULE$.SPARK_VERSION_SHORT();
-public class SparkVersionUtilsTest {
- @Test
- public void testSparkVersion() {
- assertTrue(SparkVersionUtils.isSpark2());
+ public static boolean isSpark320() {
+ return SPARK_VERSION_SHORT.equals("3.2.0");
}
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
index dbc6bb3dc..64742c998 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/SparkVersionUtilsTest.java
@@ -17,13 +17,27 @@
package org.apache.spark.shuffle;
+import org.apache.spark.package$;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SparkVersionUtilsTest {
@Test
public void testSparkVersion() {
+ assertFalse(SparkVersionUtils.isSpark2());
assertTrue(SparkVersionUtils.isSpark3());
+ assertEquals(
+ package$.MODULE$.SPARK_VERSION_SHORT().equals("3.2.0"),
SparkVersionUtils.isSpark320());
+ }
+
+ @Test
+ public void testSpark3Version() {
+ assertFalse(Spark3VersionUtils.isSpark2());
+ assertTrue(Spark3VersionUtils.isSpark3());
+ assertEquals(
+ package$.MODULE$.SPARK_VERSION_SHORT().equals("3.2.0"),
Spark3VersionUtils.isSpark320());
}
}