Repository: tez
Updated Branches:
  refs/heads/master 713145d88 -> 1c16b5bfc


TEZ-1064. Restore dagName Set for duplicate detection in recovered AMs. (Jeff 
Zhang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1c16b5bf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1c16b5bf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1c16b5bf

Branch: refs/heads/master
Commit: 1c16b5bfc22cc5f09029c240fd2edd97954fa800
Parents: 713145d
Author: Hitesh Shah <[email protected]>
Authored: Wed Jul 30 20:30:41 2014 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Wed Jul 30 20:30:41 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/RecoveryParser.java  |  9 +++++++-
 .../history/events/DAGCommitStartedEvent.java   |  2 +-
 .../dag/history/events/DAGSubmittedEvent.java   | 22 ++++++++++++--------
 .../org/apache/tez/dag/utils/ProtoUtils.java    | 14 ++++++++++---
 .../org/apache/tez/test/TestDAGRecovery.java    |  8 +++++++
 5 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 85254cd..bcd9e9e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -390,6 +390,7 @@ public class RecoveryParser {
   private static class DAGSummaryData {
 
     final TezDAGID dagId;
+    String dagName;
     boolean completed = false;
     boolean dagCommitCompleted = true;
     DAGState dagState;
@@ -410,6 +411,9 @@ public class RecoveryParser {
       switch (eventType) {
         case DAG_SUBMITTED:
           completed = false;
+          DAGSubmittedEvent dagSubmittedEvent = new DAGSubmittedEvent();
+          dagSubmittedEvent.fromSummaryProtoStream(proto);
+          dagName = dagSubmittedEvent.getDAGName();
           break;
         case DAG_FINISHED:
           completed = true;
@@ -577,8 +581,11 @@ public class RecoveryParser {
     newSummaryStream.hsync();
     newSummaryStream.close();
 
-    // Set counter for next set of DAGs
+    // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster
     dagAppMaster.setDAGCounter(dagCounter);
+    for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){
+      dagAppMaster.dagNames.add(dagSummaryData.dagName);
+    }
 
     DAGSummaryData lastInProgressDAGData =
         getLastCompletedOrInProgressDAG(dagSummaryDataMap);

http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index c4c0320..016bb60 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -94,7 +94,7 @@ public class DAGCommitStartedEvent implements HistoryEvent, 
SummaryEvent {
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws 
IOException {
     ProtoUtils.toSummaryEventProto(dagID, commitStartTime,
-        getEventType()).writeDelimitedTo(outputStream);
+        getEventType(), null).writeDelimitedTo(outputStream);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index a2c9d75..5911ff3 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -44,7 +44,10 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
 
   private static final Log LOG = LogFactory.getLog(DAGSubmittedEvent.class);
 
+  private static final String CHARSET_NAME = "utf-8";
+
   private TezDAGID dagID;
+  private String dagName;
   private long submitTime;
   private DAGProtos.DAGPlan dagPlan;
   private ApplicationAttemptId applicationAttemptId;
@@ -59,6 +62,7 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
       Map<String, LocalResource> cumulativeAdditionalLocalResources,
       String user) {
     this.dagID = dagID;
+    this.dagName = dagPlan.getName();
     this.submitTime = submitTime;
     this.dagPlan = dagPlan;
     this.applicationAttemptId = applicationAttemptId;
@@ -93,10 +97,11 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
     }
     return builder.build();
   }
- 
+
   public void fromProto(DAGSubmittedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.dagPlan = proto.getDagPlan();
+    this.dagName = this.dagPlan.getName();
     this.submitTime = proto.getSubmitTime();
     this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
         proto.getApplicationAttemptId());
@@ -129,13 +134,15 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws 
IOException {
     ProtoUtils.toSummaryEventProto(dagID, submitTime,
-        HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream);
+        HistoryEventType.DAG_SUBMITTED, dagName.getBytes(CHARSET_NAME))
+        .writeDelimitedTo(outputStream);
   }
 
   @Override
   public void fromSummaryProtoStream(SummaryEventProto proto) throws 
IOException {
-    throw new UnsupportedOperationException("Cannot re-initialize event from"
-        + " summary stream");
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.submitTime = proto.getTimestamp();
+    this.dagName = new String(proto.getEventPayload().toByteArray(), 
CHARSET_NAME);
   }
 
   @Override
@@ -144,10 +151,7 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
   }
 
   public String getDAGName() {
-    if (dagPlan != null && dagPlan.hasName()) {
-      return dagPlan.getName();
-    }
-    return null;
+    return this.dagName;
   }
 
   public DAGProtos.DAGPlan getDAGPlan() {
@@ -161,7 +165,7 @@ public class DAGSubmittedEvent implements HistoryEvent, 
SummaryEvent {
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
-  
+
   public Map<String, LocalResource> getCumulativeAdditionalLocalResources() {
     return cumulativeAdditionalLocalResources;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java 
b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
index d17637a..56e46a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
@@ -22,13 +22,21 @@ import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 
+import com.google.protobuf.ByteString;
+
 public class ProtoUtils {
 
   public static RecoveryProtos.SummaryEventProto toSummaryEventProto(
-      TezDAGID dagID, long timestamp, HistoryEventType historyEventType) {
-    return RecoveryProtos.SummaryEventProto.newBuilder()
+      TezDAGID dagID, long timestamp, HistoryEventType historyEventType, 
byte[] payload) {
+    RecoveryProtos.SummaryEventProto.Builder builder =
+        RecoveryProtos.SummaryEventProto.newBuilder()
         .setDagId(dagID.toString())
         .setTimestamp(timestamp)
-        .setEventType(historyEventType.ordinal()).build();
+        .setEventType(historyEventType.ordinal());
+    if (payload != null){
+      builder.setEventPayload(ByteString.copyFrom(payload));
+    }
+    return builder.build();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index ddbec28..0a3c06b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
@@ -168,6 +169,13 @@ public class TestDAGRecovery {
     DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
 
+    // it should fail if submitting same dags in recovery mode (TEZ-1064)
+    try{
+      DAGClient dagClient = tezSession.submitDAG(dag);
+      Assert.fail("Expected DAG submit to fail on duplicate dag name");
+    } catch (TezException e) {
+      Assert.assertTrue(e.getMessage().contains("Duplicate dag name"));
+    }
   }
 
   @Test(timeout=120000)

Reply via email to