Github user yingdachen commented on a diff in the pull request:
https://github.com/apache/tez/pull/33#discussion_r232087761
--- Diff: tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---
@@ -2440,23 +2453,30 @@ public void run() {
}
}
- private void startDAG() throws IOException, TezException {
+ private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+ boolean hasConcurrentEdge = false;
+ for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+ if
(DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+ return true;
+ }
+ }
+ return hasConcurrentEdge;
+ }
+
+ private DAGPlan readDAGPlanFile() throws IOException, TezException {
FileInputStream dagPBBinaryStream = null;
+ DAGPlan dagPlan = null;
try {
- DAGPlan dagPlan = null;
-
// Read the protobuf DAG
dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
TezConstants.TEZ_PB_PLAN_BINARY_NAME));
dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
- startDAG(dagPlan, null);
--- End diff --
I am not sure whether you are referring to the recoveredDAGData != null on
line 1993 or 1983.
For the one on 1993, it would not call startDAG anyway. For the one on
1983, the logic added is only to forcefully invalid the recovery data that has
been read, it does not impact the startDAG behavior either.
Overall, there are two modification here:
1. the DAGPlan is read a bit earlier (in non-session mode)
2. the already-read recovery content may be useless when there is
concurrent edge in the DAG. This is done on purpose and I think should be
acceptable since it does not modify existing logic flow and provide the
safeguard for DAG execution with concurrent edge, before we have a proper
failover implementation for that TEZ-4017
---