Repository: tez Updated Branches: refs/heads/master 713145d88 -> 1c16b5bfc
TEZ-1064. Restore dagName Set for duplicate detection in recovered AMs. (Jeff Zhang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1c16b5bf Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1c16b5bf Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1c16b5bf Branch: refs/heads/master Commit: 1c16b5bfc22cc5f09029c240fd2edd97954fa800 Parents: 713145d Author: Hitesh Shah <[email protected]> Authored: Wed Jul 30 20:30:41 2014 -0700 Committer: Hitesh Shah <[email protected]> Committed: Wed Jul 30 20:30:41 2014 -0700 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/RecoveryParser.java | 9 +++++++- .../history/events/DAGCommitStartedEvent.java | 2 +- .../dag/history/events/DAGSubmittedEvent.java | 22 ++++++++++++-------- .../org/apache/tez/dag/utils/ProtoUtils.java | 14 ++++++++++--- .../org/apache/tez/test/TestDAGRecovery.java | 8 +++++++ 5 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 85254cd..bcd9e9e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -390,6 +390,7 @@ public class RecoveryParser { private static class DAGSummaryData { final TezDAGID dagId; + String dagName; boolean completed = false; boolean dagCommitCompleted = true; DAGState dagState; @@ -410,6 +411,9 @@ public class RecoveryParser { switch (eventType) { case DAG_SUBMITTED: completed = false; + DAGSubmittedEvent dagSubmittedEvent = new DAGSubmittedEvent(); + dagSubmittedEvent.fromSummaryProtoStream(proto); + dagName = dagSubmittedEvent.getDAGName(); break; case DAG_FINISHED: completed = true; @@ -577,8 +581,11 @@ public class RecoveryParser { newSummaryStream.hsync(); newSummaryStream.close(); - // Set counter for next set of DAGs + // Set counter for next set of DAGs & update dagNames Set in DAGAppMaster dagAppMaster.setDAGCounter(dagCounter); + for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){ + dagAppMaster.dagNames.add(dagSummaryData.dagName); + } DAGSummaryData lastInProgressDAGData = getLastCompletedOrInProgressDAG(dagSummaryDataMap); http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java index c4c0320..016bb60 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java @@ -94,7 +94,7 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent { @Override public void toSummaryProtoStream(OutputStream outputStream) throws IOException { ProtoUtils.toSummaryEventProto(dagID, commitStartTime, - getEventType()).writeDelimitedTo(outputStream); + getEventType(), null).writeDelimitedTo(outputStream); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index a2c9d75..5911ff3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -44,7 +44,10 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { private static final Log LOG = LogFactory.getLog(DAGSubmittedEvent.class); + private static final String CHARSET_NAME = "utf-8"; + private TezDAGID dagID; + private String dagName; private long submitTime; private DAGProtos.DAGPlan dagPlan; private ApplicationAttemptId applicationAttemptId; @@ -59,6 +62,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { Map<String, LocalResource> cumulativeAdditionalLocalResources, String user) { this.dagID = dagID; + this.dagName = dagPlan.getName(); this.submitTime = submitTime; this.dagPlan = dagPlan; this.applicationAttemptId = applicationAttemptId; @@ -93,10 +97,11 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { } return builder.build(); } - + public void fromProto(DAGSubmittedProto proto) { this.dagID = TezDAGID.fromString(proto.getDagId()); this.dagPlan = proto.getDagPlan(); + this.dagName = this.dagPlan.getName(); this.submitTime = proto.getSubmitTime(); this.applicationAttemptId = ConverterUtils.toApplicationAttemptId( proto.getApplicationAttemptId()); @@ -129,13 +134,15 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { @Override public void toSummaryProtoStream(OutputStream outputStream) throws IOException { ProtoUtils.toSummaryEventProto(dagID, submitTime, - HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream); + HistoryEventType.DAG_SUBMITTED, dagName.getBytes(CHARSET_NAME)) + .writeDelimitedTo(outputStream); } @Override public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException { - throw new UnsupportedOperationException("Cannot re-initialize event from" - + " summary stream"); + this.dagID = TezDAGID.fromString(proto.getDagId()); + this.submitTime = proto.getTimestamp(); + this.dagName = new String(proto.getEventPayload().toByteArray(), CHARSET_NAME); } @Override @@ -144,10 +151,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { } public String getDAGName() { - if (dagPlan != null && dagPlan.hasName()) { - return dagPlan.getName(); - } - return null; + return this.dagName; } public DAGProtos.DAGPlan getDAGPlan() { @@ -161,7 +165,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { public ApplicationAttemptId getApplicationAttemptId() { return applicationAttemptId; } - + public Map<String, LocalResource> getCumulativeAdditionalLocalResources() { return cumulativeAdditionalLocalResources; } http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java index d17637a..56e46a0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java @@ -22,13 +22,21 @@ import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.recovery.records.RecoveryProtos; +import com.google.protobuf.ByteString; + public class ProtoUtils { public static RecoveryProtos.SummaryEventProto toSummaryEventProto( - TezDAGID dagID, long timestamp, HistoryEventType historyEventType) { - return RecoveryProtos.SummaryEventProto.newBuilder() + TezDAGID dagID, long timestamp, HistoryEventType historyEventType, byte[] payload) { + RecoveryProtos.SummaryEventProto.Builder builder = + RecoveryProtos.SummaryEventProto.newBuilder() .setDagId(dagID.toString()) .setTimestamp(timestamp) - .setEventType(historyEventType.ordinal()).build(); + .setEventType(historyEventType.ordinal()); + if (payload != null){ + builder.setEventPayload(ByteString.copyFrom(payload)); + } + return builder.build(); } + } http://git-wip-us.apache.org/repos/asf/tez/blob/1c16b5bf/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index ddbec28..0a3c06b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -31,6 +31,7 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatus.State; @@ -168,6 +169,13 @@ public class TestDAGRecovery { DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null); runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); + // it should fail if submitting same dags in recovery mode (TEZ-1064) + try{ + DAGClient dagClient = tezSession.submitDAG(dag); + Assert.fail("Expected DAG submit to fail on duplicate dag name"); + } catch (TezException e) { + Assert.assertTrue(e.getMessage().contains("Duplicate dag name")); + } } @Test(timeout=120000)
