This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new fda735391 [CELEBORN-919][FOLLOWUP] Unify the order of map index args
and partit…
fda735391 is described below
commit fda735391a4920609c270effce3dc579ea6616af
Author: zhouyifan279 <[email protected]>
AuthorDate: Tue Aug 29 17:25:46 2023 +0800
[CELEBORN-919][FOLLOWUP] Unify the order of map index args and partit…
…ion index args in ShuffleReader related methods
### What changes were proposed in this pull request?
Unify the order of map index args and partition index args in ShuffleReader
related methods.
### Why are the changes needed?
The order of map index args and partition index args in
CelebornShuffleReader constructor is different the order in
`SparkShuffleManager#getReader`.
It can messed up easily.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run tests locally.
Closes #1853 from zhouyifan279/columnar-shuffle-followup.
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../shuffle/celeborn/CelebornColumnarShuffleReader.scala | 8 ++++----
.../apache/spark/shuffle/celeborn/SparkShuffleManager.java | 12 ++++++------
.../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 4 ++--
.../spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 ++--
4 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
index e6f3cdccb..18abc5e3d 100644
---
a/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
+++
b/client-spark/spark-3-columnar-shuffle/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornColumnarShuffleReader.scala
@@ -27,19 +27,19 @@ import org.apache.celeborn.common.CelebornConf
class CelebornColumnarShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
- startPartition: Int,
- endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
+ startPartition: Int,
+ endPartition: Int,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)
extends CelebornShuffleReader[K, C](
handle,
- startPartition,
- endPartition,
startMapIndex,
endMapIndex,
+ startPartition,
+ endPartition,
context,
conf,
metrics) {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index d22149017..49e3a950c 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -264,7 +264,7 @@ public class SparkShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
- handle, startPartition, endPartition, startMapIndex, endMapIndex,
context, metrics);
+ handle, startMapIndex, endMapIndex, startPartition, endPartition,
context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
@@ -310,7 +310,7 @@ public class SparkShuffleManager implements ShuffleManager {
ShuffleReadMetricsReporter metrics) {
if (handle instanceof CelebornShuffleHandle) {
return getCelebornShuffleReader(
- handle, startPartition, endPartition, startMapIndex, endMapIndex,
context, metrics);
+ handle, startMapIndex, endMapIndex, startPartition, endPartition,
context, metrics);
}
return SparkUtils.getReader(
sortShuffleManager(),
@@ -335,20 +335,20 @@ public class SparkShuffleManager implements
ShuffleManager {
if (COLUMNAR_SHUFFLE_CLASSES_PRESENT &&
celebornConf.columnarShuffleEnabled()) {
return SparkUtils.createColumnarShuffleReader(
h,
- startPartition,
- endPartition,
startMapIndex,
endMapIndex,
+ startPartition,
+ endPartition,
context,
celebornConf,
metrics);
} else {
return new CelebornShuffleReader<>(
h,
- startPartition,
- endPartition,
startMapIndex,
endMapIndex,
+ startPartition,
+ endPartition,
context,
celebornConf,
metrics);
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index b87dd0b82..fd9ce1dd5 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -200,10 +200,10 @@ public class SparkUtils {
public static <K, C> CelebornShuffleReader<K, C> createColumnarShuffleReader(
CelebornShuffleHandle<K, ?, C> handle,
- int startPartition,
- int endPartition,
int startMapIndex,
int endMapIndex,
+ int startPartition,
+ int endPartition,
TaskContext context,
CelebornConf conf,
ShuffleReadMetricsReporter metrics) {
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index eadb655c9..cb76ed015 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -30,10 +30,10 @@ import org.apache.celeborn.common.CelebornConf
class CelebornShuffleReader[K, C](
handle: CelebornShuffleHandle[K, _, C],
- startPartition: Int,
- endPartition: Int,
startMapIndex: Int = 0,
endMapIndex: Int = Int.MaxValue,
+ startPartition: Int,
+ endPartition: Int,
context: TaskContext,
conf: CelebornConf,
metrics: ShuffleReadMetricsReporter)