wanglijie95 commented on code in PR #22686:
URL: https://github.com/apache/flink/pull/22686#discussion_r1226372595
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1730,14 +1705,13 @@ public MarkPartitionFinishedStrategy
getMarkPartitionFinishedStrategy() {
return markPartitionFinishedStrategy;
}
- @Override
- public boolean isNonFinishedHybridPartitionShouldBeUnknown() {
- return nonFinishedHybridPartitionShouldBeUnknown;
- }
-
@Override
public JobVertexInputInfo getJobVertexInputInfo(
JobVertexID jobVertexId, IntermediateDataSetID resultId) {
return vertexInputInfoStore.get(jobVertexId, resultId);
}
+
Review Comment:
Add `@Override`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java:
##########
@@ -121,6 +121,14 @@ public static DefaultExecutionGraph buildGraph(
// create a new execution graph, if none exists so far
final DefaultExecutionGraph executionGraph;
try {
+ TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory =
Review Comment:
I think it would be better to add a separate try catch block, because the
exception message of current block is "Could not create the ExecutionGraph"
##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -156,20 +141,32 @@ private List<InputGateDeploymentDescriptor>
createInputGateDeploymentDescriptors
subpartitionRange,
consumedPartitionGroup.size(),
getConsumedPartitionShuffleDescriptors(
- consumedIntermediateResult,
consumedPartitionGroup)));
+ consumedIntermediateResult,
+ consumedPartitionGroup,
+
executionVertex.getExecutionGraphAccessor())));
}
- for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]>
entry :
- consumedClusterPartitionShuffleDescriptors.entrySet()) {
- // For FLIP-205, the JobGraph generating side ensure that the
cluster partition is
- // produced with only one subpartition. Therefore, we always
consume the partition with
- // subpartition index of 0.
- inputGates.add(
- new InputGateDeploymentDescriptor(
- entry.getKey(),
- ResultPartitionType.BLOCKING_PERSISTENT,
- 0,
- entry.getValue()));
+ try {
+ for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]>
entry :
Review Comment:
I prefer to keep the same logic as before, only put
`getClusterPartitionShuffleDescriptors` in try catch block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]