This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a324fae7 [CELEBORN-1490][FOLLOWUP] Polish CelebornTierProducerAgent
2a324fae7 is described below

commit 2a324fae7309f72b64b7db20c0316333b4183c4d
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Nov 15 19:19:47 2024 +0800

    [CELEBORN-1490][FOLLOWUP] Polish CelebornTierProducerAgent
    
    ### What changes were proposed in this pull request?
    1. Change the order of some code to avoid contaminating state in bad path.
    2. Avoid NPE in `close` method.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need.
    
    Closes #2923 from reswqa/fix-tier-master.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../plugin/flink/tiered/CelebornTierMasterAgent.java         | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
index fe10a889a..05a5b45e7 100644
--- 
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
+++ 
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -125,6 +125,10 @@ public class CelebornTierMasterAgent implements 
TierMasterAgent {
   @Override
   public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
       JobID jobID, ResultPartitionID resultPartitionID) {
+    Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
+    if (shuffleIds == null) {
+      throw new RuntimeException("Can not find job in master agent, job: " + 
jobID);
+    }
     FlinkResultPartitionInfo resultPartitionInfo =
         new FlinkResultPartitionInfo(jobID, resultPartitionID);
     ShuffleResourceDescriptor shuffleResourceDescriptor =
@@ -132,10 +136,6 @@ public class CelebornTierMasterAgent implements 
TierMasterAgent {
             resultPartitionInfo.getShuffleId(),
             resultPartitionInfo.getTaskId(),
             resultPartitionInfo.getAttemptId());
-    Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
-    if (shuffleIds == null) {
-      throw new RuntimeException("Can not find job in master agent, job: " + 
jobID);
-    }
     shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
     shuffleResourceTracker.addPartitionResource(
         jobID,
@@ -181,7 +181,9 @@ public class CelebornTierMasterAgent implements 
TierMasterAgent {
   public void close() {
     try {
       jobShuffleIds.clear();
-      lifecycleManager.stop();
+      if (null != lifecycleManager) {
+        lifecycleManager.stop();
+      }
     } catch (Exception e) {
       LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
     }

Reply via email to