This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2a324fae7 [CELEBORN-1490][FOLLOWUP] Polish CelebornTierProducerAgent
2a324fae7 is described below
commit 2a324fae7309f72b64b7db20c0316333b4183c4d
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Nov 15 19:19:47 2024 +0800
[CELEBORN-1490][FOLLOWUP] Polish CelebornTierProducerAgent
### What changes were proposed in this pull request?
1. Change the order of some code to avoid contaminating state in bad path.
2. Avoid NPE in `close` method.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
Closes #2923 from reswqa/fix-tier-master.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../plugin/flink/tiered/CelebornTierMasterAgent.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
index fe10a889a..05a5b45e7 100644
---
a/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
+++
b/client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgent.java
@@ -125,6 +125,10 @@ public class CelebornTierMasterAgent implements
TierMasterAgent {
@Override
public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
JobID jobID, ResultPartitionID resultPartitionID) {
+ Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
+ if (shuffleIds == null) {
+ throw new RuntimeException("Can not find job in master agent, job: " +
jobID);
+ }
FlinkResultPartitionInfo resultPartitionInfo =
new FlinkResultPartitionInfo(jobID, resultPartitionID);
ShuffleResourceDescriptor shuffleResourceDescriptor =
@@ -132,10 +136,6 @@ public class CelebornTierMasterAgent implements
TierMasterAgent {
resultPartitionInfo.getShuffleId(),
resultPartitionInfo.getTaskId(),
resultPartitionInfo.getAttemptId());
- Set<Integer> shuffleIds = jobShuffleIds.get(jobID);
- if (shuffleIds == null) {
- throw new RuntimeException("Can not find job in master agent, job: " +
jobID);
- }
shuffleIds.add(shuffleResourceDescriptor.getShuffleId());
shuffleResourceTracker.addPartitionResource(
jobID,
@@ -181,7 +181,9 @@ public class CelebornTierMasterAgent implements
TierMasterAgent {
public void close() {
try {
jobShuffleIds.clear();
- lifecycleManager.stop();
+ if (null != lifecycleManager) {
+ lifecycleManager.stop();
+ }
} catch (Exception e) {
LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
}