This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.5 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit f089a0b60f64b60cfd44009fb261b5a88f901dbe Author: Sanskar Modi <[email protected]> AuthorDate: Tue Oct 22 14:28:12 2024 +0800 [CELEBORN-1663][FOLLOWUP] Only register appShuffleDeterminate if stage using celeborn for shuffle Making the same changes for Spark2 codebase Followup for https://github.com/apache/celeborn/pull/2832 NA Existing UTs Closes #2837 from s0nskar/fix_register_spark2. Authored-by: Sanskar Modi <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit 1e77f01cd317b1dc885965d6053b391db1d42bc7) Signed-off-by: SteNicholas <[email protected]> --- .../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java index aa5549184..5be2b5c26 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java @@ -116,16 +116,17 @@ public class SparkShuffleManager implements ShuffleManager { appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context()); initializeLifecycleManager(appUniqueId); - lifecycleManager.registerAppShuffleDeterminate( - shuffleId, - dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE()); - if (fallbackPolicyRunner.applyAllFallbackPolicy( lifecycleManager, dependency.partitioner().numPartitions())) { logger.warn("Fallback to SortShuffleManager!"); sortShuffleIds.add(shuffleId); return sortShuffleManager().registerShuffle(shuffleId, numMaps, dependency); } else { + lifecycleManager.registerAppShuffleDeterminate( + shuffleId, + !DeterministicLevel.INDETERMINATE() + .equals(dependency.rdd().getOutputDeterministicLevel())); + return new CelebornShuffleHandle<>( appUniqueId, lifecycleManager.getHost(),
