This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3bad1c8ab [CELEBORN-919][FOLLOWUP] Put map index args after partition
index args in CelebornShuffleReader constructor
3bad1c8ab is described below
commit 3bad1c8abcfe7cd81b3eff7bcb215ed72c714df4
Author: zhouyifan279 <[email protected]>
AuthorDate: Thu Aug 31 17:22:10 2023 +0800
[CELEBORN-919][FOLLOWUP] Put map index args after partition index args in
CelebornShuffleReader constructor
### What changes were proposed in this pull request?
Put map index args after partition index args in CelebornShuffleReader
constructor
### Why are the changes needed?
#1853 changed the args order in CelebornShuffleReader constructor. It will
break gluten celeborn shuffle manager.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run test locally.
Closes #1869 from zhouyifan279/shuffle-reader-ctor.
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/CelebornColumnarShuffleReader.scala | 8 +++---
.../shuffle/celeborn/SparkShuffleManager.java | 30 +++++++++++-----------
.../apache/spark/shuffle/celeborn/SparkUtils.java | 12 ++++-----
.../shuffle/celeborn/CelebornShuffleReader.scala | 4 +--
4 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
index 18abc5e3d..e6f3cdccb 100644
---
a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
+++
b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
@@ -27,19 +27,19 @@ import org.apache.celeborn.common.CelebornConf
class CelebornColumnarShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
- startMapIndex: Int = 0,
- endMapIndex: Int = Int.MaxValue,
startPartition: Int,
endPartition: Int,
+ startMapIndex: Int = 0,
+ endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)
extends CelebornShuffleReader[K, C](
handle,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
conf,
metrics) {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 49e3a950c..603cae517 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -264,15 +264,15 @@ public class SparkShuffleManager implements
ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
- handle, startMapIndex, endMapIndex, startPartition, endPartition,
context, metrics);
+ handle, startPartition, endPartition, startMapIndex, endMapIndex,
context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
metrics);
}
@@ -286,15 +286,15 @@ public class SparkShuffleManager implements
ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
- handle, 0, Integer.MAX_VALUE, startPartition, endPartition, context,
metrics);
+ handle, startPartition, endPartition, 0, Integer.MAX_VALUE, context,
metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
- 0,
- Integer.MAX_VALUE,
startPartition,
endPartition,
+ 0,
+ Integer.MAX_VALUE,
context,
metrics);
}
@@ -310,45 +310,45 @@ public class SparkShuffleManager implements
ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
- handle, startMapIndex, endMapIndex, startPartition, endPartition,
context, metrics);
+ handle, startPartition, endPartition, startMapIndex, endMapIndex,
context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
handle,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
metrics);
}
public <K, C> ShuffleReader<K, C> getCelebornShuffleReader(
ShuffleHandle handle,
- int startMapIndex,
- int endMapIndex,
int startPartition,
int endPartition,
+ int startMapIndex,
+ int endMapIndex,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) handle;
if (COLUMNAR_SHUFFLE_CLASSES_PRESENT &&
celebornConf.columnarShuffleEnabled()) {
return SparkUtils.createColumnarShuffleReader(
h,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
celebornConf,
metrics);
} else {
return new CelebornShuffleReader<>(
h,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
celebornConf,
metrics);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index fd9ce1dd5..46a54f2ff 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -135,10 +135,10 @@ public class SparkUtils {
public static <K, C> ShuffleReader<K, C> getReader(
SortShuffleManager sortShuffleManager,
ShuffleHandle handle,
- Integer startMapIndex,
- Integer endMapIndex,
Integer startPartition,
Integer endPartition,
+ Integer startMapIndex,
+ Integer endMapIndex,
TaskContext context,
ShuffleReadMetricsReporter metrics) {
ShuffleReader<K, C> shuffleReader =
@@ -200,10 +200,10 @@ public class SparkUtils {
public static <K, C> CelebornShuffleReader<K, C> createColumnarShuffleReader(
CelebornShuffleHandle<K, ?, C> handle,
- int startMapIndex,
- int endMapIndex,
int startPartition,
int endPartition,
+ int startMapIndex,
+ int endMapIndex,
TaskContext context,
CelebornConf conf,
ShuffleReadMetricsReporter metrics) {
@@ -212,10 +212,10 @@ public class SparkUtils {
.invoke(
null,
handle,
- startMapIndex,
- endMapIndex,
startPartition,
endPartition,
+ startMapIndex,
+ endMapIndex,
context,
conf,
metrics);
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index cb76ed015..eadb655c9 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -30,10 +30,10 @@ import org.apache.celeborn.common.CelebornConf
class CelebornShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
- startMapIndex: Int = 0,
- endMapIndex: Int = Int.MaxValue,
startPartition: Int,
endPartition: Int,
+ startMapIndex: Int = 0,
+ endMapIndex: Int = Int.MaxValue,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)