Repository: tez Updated Branches: refs/heads/master 2e66f3cb2 -> ebc9f4f6d
TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ebc9f4f6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ebc9f4f6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ebc9f4f6 Branch: refs/heads/master Commit: ebc9f4f6dee1badeca39cac26c00818be3e4d77d Parents: 2e66f3c Author: Jason Lowe <[email protected]> Authored: Fri Apr 27 11:29:08 2018 -0500 Committer: Jason Lowe <[email protected]> Committed: Fri Apr 27 11:29:08 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/RecoveryParser.java | 19 +++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 ++- .../apache/tez/dag/history/HistoryEvent.java | 9 ++- .../tez/dag/history/events/AMLaunchedEvent.java | 12 +-- .../tez/dag/history/events/AMStartedEvent.java | 12 +-- .../dag/history/events/AppLaunchedEvent.java | 8 +- .../history/events/ContainerLaunchedEvent.java | 13 ++-- .../history/events/ContainerStoppedEvent.java | 13 ++-- .../history/events/DAGCommitStartedEvent.java | 11 +-- .../dag/history/events/DAGFinishedEvent.java | 11 +-- .../dag/history/events/DAGInitializedEvent.java | 14 ++-- .../dag/history/events/DAGKillRequestEvent.java | 17 ++-- .../dag/history/events/DAGRecoveredEvent.java | 8 +- .../tez/dag/history/events/DAGStartedEvent.java | 12 +-- .../dag/history/events/DAGSubmittedEvent.java | 11 +-- .../events/TaskAttemptFinishedEvent.java | 13 ++-- .../history/events/TaskAttemptStartedEvent.java | 12 +-- .../dag/history/events/TaskFinishedEvent.java | 12 +-- .../dag/history/events/TaskStartedEvent.java | 12 +-- .../events/VertexCommitStartedEvent.java | 11 +-- .../events/VertexConfigurationDoneEvent.java | 12 +-- .../dag/history/events/VertexFinishedEvent.java | 11 +-- .../events/VertexGroupCommitFinishedEvent.java | 11 +-- .../events/VertexGroupCommitStartedEvent.java | 11 +-- .../history/events/VertexInitializedEvent.java | 13 ++-- .../dag/history/events/VertexStartedEvent.java | 12 +-- .../dag/history/recovery/RecoveryService.java | 58 ++++++++++---- .../apache/tez/dag/app/TestRecoveryParser.java | 82 +++++++++++++++++++- .../TestHistoryEventsProtoConversion.java | 8 +- .../org/apache/tez/test/MiniTezCluster.java | 6 ++ .../RecoveryServiceWithEventHandlingHook.java | 9 ++- .../org/apache/tez/test/TestAMRecovery.java | 4 + .../org/apache/tez/test/TestDAGRecovery.java | 8 +- .../java/org/apache/tez/test/TestRecovery.java | 4 + 34 files changed, 317 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 368dd17..99ac283 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.protobuf.CodedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -251,11 +252,15 @@ public class RecoveryParser { } } - private static HistoryEvent getNextEvent(FSDataInputStream inputStream) + private static HistoryEvent getNextEvent(CodedInputStream inputStream) throws IOException { + boolean isAtEnd = inputStream.isAtEnd(); + if (isAtEnd) { + return null; + } int eventTypeOrdinal = -1; try { - eventTypeOrdinal = inputStream.readInt(); + eventTypeOrdinal = inputStream.readFixed32(); } catch (EOFException eof) { return null; } @@ -353,13 +358,15 @@ public class RecoveryParser { public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream) throws IOException { List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>(); + CodedInputStream codedInputStream = CodedInputStream.newInstance(inputStream); + codedInputStream.setSizeLimit(Integer.MAX_VALUE); while (true) { - HistoryEvent historyEvent = getNextEvent(inputStream); + HistoryEvent historyEvent = getNextEvent(codedInputStream); if (historyEvent == null) { LOG.info("Reached end of stream"); break; } - LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent); + LOG.debug("Read HistoryEvent, eventType={}, event={}", historyEvent.getEventType(), historyEvent); historyEvents.add(historyEvent); } return historyEvents; @@ -745,10 +752,12 @@ public class RecoveryParser { + ", dagRecoveryFile=" + dagRecoveryFile + ", len=" + fileStatus.getLen()); FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); + CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream); + codedInputStream.setSizeLimit(Integer.MAX_VALUE); while (true) { HistoryEvent event; try { - event = getNextEvent(dagRecoveryStream); + event = getNextEvent(codedInputStream); if (event == null) { LOG.info("Reached end of dag recovery stream"); break; http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ad26173..f3fc269 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -42,6 +42,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,7 +65,6 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.common.io.NonSyncByteArrayInputStream; import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; @@ -2695,9 +2696,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream(); try { - reconfigureDoneEvent.toProtoStream(out); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); + reconfigureDoneEvent.toProtoStream(codedOutputStream); + codedOutputStream.flush(); } catch (IOException e) { - throw new TezUncheckedException("Unable to deserilize VertexReconfigureDoneEvent"); + throw new TezUncheckedException("Unable to deserialize VertexReconfigureDoneEvent"); } this.vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName()) @@ -4589,7 +4592,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.debug("initialize NoOpVertexManager"); } configurationDoneEvent = new VertexConfigurationDoneEvent(); - configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray())); + configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray())); String vertexName = getContext().getVertexName(); if (getContext().getVertexNumTasks(vertexName) == -1) { Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called " http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java index 1ca0d5f..5b077e9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java @@ -18,9 +18,10 @@ package org.apache.tez.dag.history; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; + import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; public interface HistoryEvent { @@ -30,8 +31,8 @@ public interface HistoryEvent { public boolean isHistoryEvent(); - public void toProtoStream(OutputStream outputStream) throws IOException; + public void toProtoStream(CodedOutputStream outputStream) throws IOException; - public void fromProtoStream(InputStream inputStream) throws IOException; + public void fromProtoStream(CodedInputStream inputStream) throws IOException; } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java index fa332d6..001cbf0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.history.HistoryEvent; @@ -84,13 +84,13 @@ public class AMLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + AMLaunchedProto proto = inputStream.readMessage(AMLaunchedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java index 8a59d84..87daba6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.history.HistoryEvent; @@ -79,13 +79,13 @@ public class AMStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + AMStartedProto proto = inputStream.readMessage(AMStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java index 08d2aff..0b812f0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.VersionInfo; @@ -67,12 +67,12 @@ public class AppLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { + public void toProtoStream(CodedOutputStream outputStream) throws IOException { throw new UnsupportedOperationException("Not a recovery event"); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { + public void fromProtoStream(CodedInputStream inputStream) throws IOException { throw new UnsupportedOperationException("Not a recovery event"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java index 45d0261..11528e2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -77,14 +77,13 @@ public class ContainerLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - ContainerLaunchedProto proto = - ContainerLaunchedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + ContainerLaunchedProto proto = inputStream.readMessage(ContainerLaunchedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java index 86971ce..528f629 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -82,14 +82,13 @@ public class ContainerStoppedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - ContainerStoppedProto proto = - ContainerStoppedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + ContainerStoppedProto proto = inputStream.readMessage(ContainerStoppedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java index 016bb60..241dada 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java @@ -19,9 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -69,13 +70,13 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGCommitStartedProto proto = inputStream.readMessage(DAGCommitStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java index c395297..0a7ef56 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; @@ -121,13 +122,13 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGFinishedProto proto = inputStream.readMessage(DAGFinishedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java index 98d64d3..9e6c8b2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java @@ -19,15 +19,16 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Map; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGInitializedProto; public class DAGInitializedEvent implements HistoryEvent { @@ -83,14 +84,13 @@ public class DAGInitializedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - RecoveryProtos.DAGInitializedProto proto = - RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGInitializedProto proto = inputStream.readMessage(DAGInitializedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java index 525e361..c87f5ce 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java @@ -18,14 +18,16 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGKillRequestProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.utils.ProtoUtils; @@ -60,12 +62,12 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } - public RecoveryProtos.DAGKillRequestProto toProto() { - return RecoveryProtos.DAGKillRequestProto.newBuilder() + public DAGKillRequestProto toProto() { + return DAGKillRequestProto.newBuilder() .setDagId(dagID.toString()) .setKillRequestTime(killRequestTime) .setIsSessionStopped(isSessionStopped) @@ -73,9 +75,8 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent { } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - RecoveryProtos.DAGKillRequestProto proto = - RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGKillRequestProto proto = inputStream.readMessage(DAGKillRequestProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java index 2bfa43b..e5f5614 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.HistoryEvent; @@ -76,13 +76,13 @@ public class DAGRecoveredEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { + public void toProtoStream(CodedOutputStream outputStream) throws IOException { throw new UnsupportedOperationException("Invalid operation for eventType " + getEventType().name()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { + public void fromProtoStream(CodedInputStream inputStream) throws IOException { throw new UnsupportedOperationException("Invalid operation for eventType " + getEventType().name()); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java index d0e0e69..f1fdcac 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -74,13 +74,13 @@ public class DAGStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGStartedProto proto = inputStream.readMessage(DAGStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index 1b1fdf3..e04ee80 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -126,13 +127,13 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + DAGSubmittedProto proto = inputStream.readMessage(DAGSubmittedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index e9100e8..96dc099 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -20,10 +20,10 @@ package org.apache.tez.dag.history.events; import javax.annotation.Nullable; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.List; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; @@ -226,14 +226,13 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - TaskAttemptFinishedProto proto = - TaskAttemptFinishedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + TaskAttemptFinishedProto proto = inputStream.readMessage(TaskAttemptFinishedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index 71d4419..a49e47c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -91,13 +91,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + TaskAttemptStartedProto proto = inputStream.readMessage(TaskAttemptStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java index 71ff6c8..6befa1a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.counters.TezCounters; @@ -107,13 +107,13 @@ public class TaskFinishedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + TaskFinishedProto proto = inputStream.readMessage(TaskFinishedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java index 07dc2f9..cc62969 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -83,13 +83,13 @@ public class TaskStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + TaskStartedProto proto = inputStream.readMessage(TaskStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java index c452187..8ff86b8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -73,13 +74,13 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexCommitStartedProto proto = inputStream.readMessage(VertexCommitStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java index 137342c..a2e2039 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java @@ -18,12 +18,12 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.VertexLocationHint; @@ -155,13 +155,13 @@ public class VertexConfigurationDoneEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexConfigurationDoneProto proto = inputStream.readMessage(VertexConfigurationDoneProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index a2cdae2..58cb628 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,13 +124,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexFinishedProto proto = inputStream.readMessage(VertexFinishedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java index ec8f3e1..c9d5aae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -94,13 +95,13 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexGroupCommitFinishedProto proto = inputStream.readMessage(VertexGroupCommitFinishedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java index 3de355c..cdd11bc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java @@ -19,10 +19,11 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -94,13 +95,13 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexGroupCommitStartedProto proto = inputStream.readMessage(VertexGroupCommitStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java index 90099fc..e7452e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java @@ -19,12 +19,12 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; @@ -151,14 +151,13 @@ public class VertexInitializedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - RecoveryProtos.VertexInitializedProto proto = - RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexInitializedProto proto = inputStream.readMessage(VertexInitializedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java index a8bd21e..4a3e05f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -74,13 +74,13 @@ public class VertexStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(OutputStream outputStream) throws IOException { - toProto().writeDelimitedTo(outputStream); + public void toProtoStream(CodedOutputStream outputStream) throws IOException { + outputStream.writeMessageNoTag(toProto()); } @Override - public void fromProtoStream(InputStream inputStream) throws IOException { - VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream); + public void fromProtoStream(CodedInputStream inputStream) throws IOException { + VertexStartedProto proto = inputStream.readMessage(VertexStartedProto.PARSER, null); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index 8c29172..d874e0a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -84,8 +85,7 @@ public class RecoveryService extends AbstractService { private FileSystem recoveryDirFS; // FS where staging dir exists Path recoveryPath; @VisibleForTesting - public Map<TezDAGID, FSDataOutputStream> outputStreamMap = new - HashMap<TezDAGID, FSDataOutputStream>(); + public Map<TezDAGID, RecoveryStream> outputStreamMap = new HashMap<>(); private int bufferSize; @VisibleForTesting public FSDataOutputStream summaryStream; @@ -101,6 +101,31 @@ public class RecoveryService extends AbstractService { private volatile boolean drained = true; private Object waitForDrained = new Object(); + @VisibleForTesting + public static class RecoveryStream { + private final FSDataOutputStream outputStream; + private final CodedOutputStream codedOutputStream; + + RecoveryStream(FSDataOutputStream outputStream) { + this.outputStream = outputStream; + this.codedOutputStream = CodedOutputStream.newInstance(outputStream); + } + + public void write(byte[] bytes) throws IOException { + codedOutputStream.writeRawBytes(bytes); + } + + public void flush() throws IOException { + codedOutputStream.flush(); + outputStream.hflush(); + } + + public void close() throws IOException { + flush(); + outputStream.close(); + } + } + public RecoveryService(AppContext appContext) { super(RecoveryService.class.getName()); this.appContext = appContext; @@ -231,10 +256,9 @@ public class RecoveryService extends AbstractService { } } } - for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) { + for (Entry<TezDAGID, RecoveryStream> entry : outputStreamMap.entrySet()) { try { LOG.info("Closing Output Stream for DAG " + entry.getKey()); - entry.getValue().hflush(); entry.getValue().close(); } catch (IOException ioe) { if (!recoveryDirFS.exists(recoveryPath)) { @@ -303,7 +327,7 @@ public class RecoveryService extends AbstractService { if (event.getHistoryEvent() instanceof SummaryEvent) { synchronized (lock) { if (stopped.get()) { - LOG.warn("Igoring event as service stopped, eventType" + LOG.warn("Ignoring event as service stopped, eventType" + event.getHistoryEvent().getEventType()); return; } @@ -429,9 +453,9 @@ public class RecoveryService extends AbstractService { return; } - if (!outputStreamMap.containsKey(dagID)) { + RecoveryStream recoveryStream = outputStreamMap.get(dagID); + if (recoveryStream == null) { Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString()); - FSDataOutputStream outputStream; if (recoveryDirFS.exists(dagFilePath)) { createFatalErrorFlagDir(); return; @@ -440,12 +464,12 @@ public class RecoveryService extends AbstractService { LOG.debug("Opening DAG recovery file in create mode" + ", filePath=" + dagFilePath); } - outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); + FSDataOutputStream outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); + recoveryStream = new RecoveryStream(outputStream); } - outputStreamMap.put(dagID, outputStream); + outputStreamMap.put(dagID, recoveryStream); } - FSDataOutputStream outputStream = outputStreamMap.get(dagID); if (LOG.isDebugEnabled()) { LOG.debug("Writing recovery event to output stream" @@ -453,15 +477,15 @@ public class RecoveryService extends AbstractService { + ", eventType=" + eventType); } ++unflushedEventsCount; - outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal()); - event.getHistoryEvent().toProtoStream(outputStream); + recoveryStream.codedOutputStream.writeFixed32NoTag(event.getHistoryEvent().getEventType().ordinal()); + event.getHistoryEvent().toProtoStream(recoveryStream.codedOutputStream); if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED, HistoryEventType.DAG_FINISHED).contains(eventType)) { - maybeFlush(outputStream); + maybeFlush(recoveryStream); } } - private void maybeFlush(FSDataOutputStream outputStream) throws IOException { + private void maybeFlush(RecoveryStream recoveryStream) throws IOException { long currentTime = appContext.getClock().getTime(); boolean doFlush = false; if (maxUnflushedEvents >=0 @@ -482,12 +506,12 @@ public class RecoveryService extends AbstractService { if (!doFlush) { return; } - doFlush(outputStream, currentTime); + doFlush(recoveryStream, currentTime); } - private void doFlush(FSDataOutputStream outputStream, + private void doFlush(RecoveryStream recoveryStream, long currentTime) throws IOException { - outputStream.hflush(); + recoveryStream.flush(); if (LOG.isDebugEnabled()) { LOG.debug("Flushing output stream" http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 6673b39..1c09d5d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -20,12 +20,15 @@ package org.apache.tez.dag.app; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.collect.Sets; +import com.google.protobuf.CodedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -59,6 +64,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; @@ -91,6 +97,8 @@ public class TestRecoveryParser { private Path recoveryPath; private DAGAppMaster mockAppMaster; private DAGImpl mockDAGImpl; + // Protobuf message limit is 64 MB by default + private static final int PROTOBUF_DEFAULT_SIZE_LIMIT = 64 << 20; @Before public void setUp() throws IllegalArgumentException, IOException { @@ -105,7 +113,6 @@ public class TestRecoveryParser { mockDAGImpl = mock(DAGImpl.class); when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl); parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3); - LogManager.getRootLogger().setLevel(Level.DEBUG); } private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) { @@ -267,7 +274,7 @@ public class TestRecoveryParser { null, "user", new Configuration(), null, null))); // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread rService.await(); - rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); + rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8")); rService.stop(); // write data in attempt_2 @@ -278,7 +285,7 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); rService.await(); - rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); + rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8")); rService.stop(); // corrupted last records will be skipped but the whole recovery logs will be read @@ -618,6 +625,75 @@ public class TestRecoveryParser { + ", but its full recovery events are not seen")); } + @Test(timeout=20000) + public void testRecoveryLargeEventData() throws IOException { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + AppContext appContext = mock(AppContext.class); + when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); + when(appContext.getClock()).thenReturn(new SystemClock()); + when(mockDAGImpl.getID()).thenReturn(dagID); + when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); + when(appContext.getApplicationID()).thenReturn(appId); + + RecoveryService rService = new RecoveryService(appContext); + Configuration conf = new Configuration(); + conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + rService.init(conf); + rService.start(); + + DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); + // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent + rService.handle(new DAGHistoryEvent(dagID, + new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), + null, "user", new Configuration(), null, null))); + DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, + "user", "dagName", null); + DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName"); + rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent)); + rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent)); + + // Create a Recovery event larger than 64 MB to verify default max protobuf size + ArrayList<TaskLocationHint> taskLocationHints = new ArrayList<>(100000); + TaskLocationHint taskLocationHint = TaskLocationHint.createTaskLocationHint( + Sets.newHashSet("aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb", + "ccccccccccccccc.ccccccccccccccc.ccccccccccccccc", + "ddddddddddddddd.ddddddddddddddd.ddddddddddddddd", + "eeeeeeeeeeeeeee.eeeeeeeeeeeeeee.eeeeeeeeeeeeeee", + "fffffffffffffff.fffffffffffffff.fffffffffffffff", + "ggggggggggggggg.ggggggggggggggg.ggggggggggggggg", + "hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh", + "iiiiiiiiiiiiiii.iiiiiiiiiiiiiii.iiiiiiiiiiiiiii", + "jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj", + "kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk", + "lllllllllllllll.lllllllllllllll.lllllllllllllll", + "mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm", + "nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn"), + Sets.newHashSet("rack1", "rack2", "rack3")); + for (int i = 0; i < 100000; i++) { + taskLocationHints.add(taskLocationHint); + } + + TezVertexID v0Id = TezVertexID.getInstance(dagID, 0); + VertexLocationHint vertexLocationHint = VertexLocationHint.create(taskLocationHints); + VertexConfigurationDoneEvent vertexConfigurationDoneEvent = new VertexConfigurationDoneEvent( + v0Id, 0, 100000, vertexLocationHint, null, null, false); + // Verify large protobuf message + assertTrue(vertexConfigurationDoneEvent.toProto().getSerializedSize() > PROTOBUF_DEFAULT_SIZE_LIMIT ); + rService.handle(new DAGHistoryEvent(dagID, vertexConfigurationDoneEvent)); + rService.stop(); + + DAGRecoveryData dagData = parser.parseRecoveryData(); + VertexRecoveryData v0data = dagData.getVertexRecoveryData(v0Id); + assertNotNull("Vertex Recovery Data should be non-null", v0data); + VertexConfigurationDoneEvent parsedVertexConfigurationDoneEvent = v0data.getVertexConfigurationDoneEvent(); + assertNotNull("Vertex Configuration Done Event should be non-null", parsedVertexConfigurationDoneEvent); + VertexLocationHint parsedVertexLocationHint = parsedVertexConfigurationDoneEvent.getVertexLocationHint(); + assertNotNull("Vertex Location Hint should be non-null", parsedVertexLocationHint); + assertEquals(parsedVertexLocationHint.getTaskLocationHints().size(), 100000); + } + @Test(timeout=5000) public void testRecoveryData() throws IOException { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 47d8389..50a80cb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +91,9 @@ public class TestHistoryEventsProtoConversion { private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException { ByteArrayOutputStream os = new ByteArrayOutputStream(); HistoryEvent deserializedEvent = null; - event.toProtoStream(os); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(os); + event.toProtoStream(codedOutputStream); + codedOutputStream.flush(); os.flush(); os.close(); deserializedEvent = ReflectionUtils.createClazzInstance( @@ -98,7 +102,7 @@ public class TestHistoryEventsProtoConversion { + ", eventType=" + event.getEventType() + ", bufLen=" + os.toByteArray().length); deserializedEvent.fromProtoStream( - new ByteArrayInputStream(os.toByteArray())); + CodedInputStream.newInstance(os.toByteArray())); return deserializedEvent; } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index c727a8f..bac0e8c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -135,6 +136,11 @@ public class MiniTezCluster extends MiniYARNCluster { conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); try { Path stagingPath = FileContext.getFileContext(conf).makeQualified( http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java index c08780f..50c5a66 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.ReflectionUtils; @@ -222,15 +224,16 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { private String encodeHistoryEvent(HistoryEvent event) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - event.toProtoStream(out); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); + event.toProtoStream(codedOutputStream); + codedOutputStream.flush(); return event.getClass().getName() + "," + Base64.encodeBase64String(out.toByteArray()); } private HistoryEvent decodeHistoryEvent(String eventClass, String base64) throws IOException { - ByteArrayInputStream in = new ByteArrayInputStream( - Base64.decodeBase64(base64)); + CodedInputStream in = CodedInputStream.newInstance(Base64.decodeBase64(base64)); try { HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass); event.fromProtoStream(in); http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index f00ae5c..6d3ab1c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Random; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -175,6 +176,9 @@ public class TestAMRecovery { tezConf.setBoolean( RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezSession = TezClient.create("TestDAGRecovery", tezConf); tezSession.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index b0c9ccc..cf4744b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -18,6 +18,9 @@ package org.apache.tez.test; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -133,6 +136,9 @@ public class TestDAGRecovery { tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m"); tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false"); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezSession = TezClient.create("TestDAGRecovery", tezConf); tezSession.start(); @@ -154,7 +160,7 @@ public class TestDAGRecovery { void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.getDAGStatus(null); + DAGStatus dagStatus = dagClient.getDAGStatus(null, 10); while (!dagStatus.isCompleted()) { LOG.info("Waiting for dag to complete. Sleeping for 500ms." + " DAG name: " + dag.getName() http://git-wip-us.apache.org/repos/asf/tez/blob/ebc9f4f6/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index 93fd972..c7b1fb9 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -32,6 +32,7 @@ import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -478,6 +479,9 @@ public class TestRecovery { RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false); tezConf.setBoolean( TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); + tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG"); hashJoinExample.setConf(tezConf);
