Repository: tez Updated Branches: refs/heads/master 086d7bad2 -> 21cd02417
Revert "TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe)" This reverts commit ebc9f4f6dee1badeca39cac26c00818be3e4d77d. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/21cd0241 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/21cd0241 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/21cd0241 Branch: refs/heads/master Commit: 21cd0241776c46821c83a67aac2c979e405651b6 Parents: 086d7ba Author: Jonathan Eagles <[email protected]> Authored: Fri Apr 27 14:14:40 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Apr 27 14:14:40 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/RecoveryParser.java | 19 ++--- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 +-- .../apache/tez/dag/history/HistoryEvent.java | 9 +-- .../tez/dag/history/events/AMLaunchedEvent.java | 12 +-- .../tez/dag/history/events/AMStartedEvent.java | 12 +-- .../dag/history/events/AppLaunchedEvent.java | 8 +- .../history/events/ContainerLaunchedEvent.java | 13 ++-- .../history/events/ContainerStoppedEvent.java | 13 ++-- .../history/events/DAGCommitStartedEvent.java | 11 ++- .../dag/history/events/DAGFinishedEvent.java | 11 ++- .../dag/history/events/DAGInitializedEvent.java | 14 ++-- .../dag/history/events/DAGKillRequestEvent.java | 17 ++-- .../dag/history/events/DAGRecoveredEvent.java | 8 +- .../tez/dag/history/events/DAGStartedEvent.java | 12 +-- .../dag/history/events/DAGSubmittedEvent.java | 11 ++- .../events/TaskAttemptFinishedEvent.java | 13 ++-- .../history/events/TaskAttemptStartedEvent.java | 12 +-- .../dag/history/events/TaskFinishedEvent.java | 12 +-- .../dag/history/events/TaskStartedEvent.java | 12 +-- .../events/VertexCommitStartedEvent.java | 11 ++- .../events/VertexConfigurationDoneEvent.java | 12 +-- .../dag/history/events/VertexFinishedEvent.java | 11 ++- .../events/VertexGroupCommitFinishedEvent.java | 11 ++- .../events/VertexGroupCommitStartedEvent.java | 11 ++- .../history/events/VertexInitializedEvent.java | 13 ++-- .../dag/history/events/VertexStartedEvent.java | 12 +-- .../dag/history/recovery/RecoveryService.java | 58 ++++---------- .../apache/tez/dag/app/TestRecoveryParser.java | 82 +------------------- .../TestHistoryEventsProtoConversion.java | 8 +- .../org/apache/tez/test/MiniTezCluster.java | 6 -- .../RecoveryServiceWithEventHandlingHook.java | 9 +-- .../org/apache/tez/test/TestAMRecovery.java | 4 - .../org/apache/tez/test/TestDAGRecovery.java | 8 +- .../java/org/apache/tez/test/TestRecovery.java | 4 - 34 files changed, 173 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 99ac283..368dd17 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import com.google.protobuf.CodedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -252,15 +251,11 @@ public class RecoveryParser { } } - private static HistoryEvent getNextEvent(CodedInputStream inputStream) + private static HistoryEvent getNextEvent(FSDataInputStream inputStream) throws IOException { - boolean isAtEnd = inputStream.isAtEnd(); - if (isAtEnd) { - return null; - } int eventTypeOrdinal = -1; try { - eventTypeOrdinal = inputStream.readFixed32(); + eventTypeOrdinal = inputStream.readInt(); } catch (EOFException eof) { return null; } @@ -358,15 +353,13 @@ public class RecoveryParser { public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream) throws IOException { List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>(); - CodedInputStream codedInputStream = CodedInputStream.newInstance(inputStream); - codedInputStream.setSizeLimit(Integer.MAX_VALUE); while (true) { - HistoryEvent historyEvent = getNextEvent(codedInputStream); + HistoryEvent historyEvent = getNextEvent(inputStream); if (historyEvent == null) { LOG.info("Reached end of stream"); break; } - LOG.debug("Read HistoryEvent, eventType={}, event={}", historyEvent.getEventType(), historyEvent); + LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent); historyEvents.add(historyEvent); } return historyEvents; @@ -752,12 +745,10 @@ public class RecoveryParser { + ", dagRecoveryFile=" + dagRecoveryFile + ", len=" + fileStatus.getLen()); FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); - CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream); - codedInputStream.setSizeLimit(Integer.MAX_VALUE); while (true) { HistoryEvent event; try { - event = getNextEvent(codedInputStream); + event = getNextEvent(dagRecoveryStream); if (event == null) { LOG.info("Reached end of dag recovery stream"); break; http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f3fc269..ad26173 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -42,8 +42,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -65,6 +63,7 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; @@ -2696,11 +2695,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream(); try { - CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); - reconfigureDoneEvent.toProtoStream(codedOutputStream); - codedOutputStream.flush(); + reconfigureDoneEvent.toProtoStream(out); } catch (IOException e) { - throw new TezUncheckedException("Unable to deserialize VertexReconfigureDoneEvent"); + throw new TezUncheckedException("Unable to deserilize VertexReconfigureDoneEvent"); } this.vertexManager = new VertexManager( VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName()) @@ -4592,7 +4589,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.debug("initialize NoOpVertexManager"); } configurationDoneEvent = new VertexConfigurationDoneEvent(); - configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray())); + configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray())); String vertexName = getContext().getVertexName(); if (getContext().getVertexNumTasks(vertexName) == -1) { Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called " http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java index 5b077e9..1ca0d5f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java @@ -18,10 +18,9 @@ package org.apache.tez.dag.history; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; - import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; public interface HistoryEvent { @@ -31,8 +30,8 @@ public interface HistoryEvent { public boolean isHistoryEvent(); - public void toProtoStream(CodedOutputStream outputStream) throws IOException; + public void toProtoStream(OutputStream outputStream) throws IOException; - public void fromProtoStream(CodedInputStream inputStream) throws IOException; + public void fromProtoStream(InputStream inputStream) throws IOException; } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java index 001cbf0..fa332d6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.history.HistoryEvent; @@ -84,13 +84,13 @@ public class AMLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - AMLaunchedProto proto = inputStream.readMessage(AMLaunchedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java index 87daba6..8a59d84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.history.HistoryEvent; @@ -79,13 +79,13 @@ public class AMStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - AMStartedProto proto = inputStream.readMessage(AMStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java index 0b812f0..08d2aff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.VersionInfo; @@ -67,12 +67,12 @@ public class AppLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { + public void toProtoStream(OutputStream outputStream) throws IOException { throw new UnsupportedOperationException("Not a recovery event"); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { + public void fromProtoStream(InputStream inputStream) throws IOException { throw new UnsupportedOperationException("Not a recovery event"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java index 11528e2..45d0261 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -77,13 +77,14 @@ public class ContainerLaunchedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - ContainerLaunchedProto proto = inputStream.readMessage(ContainerLaunchedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + ContainerLaunchedProto proto = + ContainerLaunchedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java index 528f629..86971ce 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -82,13 +82,14 @@ public class ContainerStoppedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - ContainerStoppedProto proto = inputStream.readMessage(ContainerStoppedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + ContainerStoppedProto proto = + ContainerStoppedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java index 241dada..016bb60 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java @@ -19,10 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -70,13 +69,13 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGCommitStartedProto proto = inputStream.readMessage(DAGCommitStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java index 0a7ef56..c395297 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Map; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; @@ -122,13 +121,13 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGFinishedProto proto = inputStream.readMessage(DAGFinishedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java index 9e6c8b2..98d64d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java @@ -19,16 +19,15 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Map; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos; -import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGInitializedProto; public class DAGInitializedEvent implements HistoryEvent { @@ -84,13 +83,14 @@ public class DAGInitializedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGInitializedProto proto = inputStream.readMessage(DAGInitializedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + RecoveryProtos.DAGInitializedProto proto = + RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java index c87f5ce..525e361 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java @@ -18,16 +18,14 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.recovery.records.RecoveryProtos; -import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGKillRequestProto; import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; import org.apache.tez.dag.utils.ProtoUtils; @@ -62,12 +60,12 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } - public DAGKillRequestProto toProto() { - return DAGKillRequestProto.newBuilder() + public RecoveryProtos.DAGKillRequestProto toProto() { + return RecoveryProtos.DAGKillRequestProto.newBuilder() .setDagId(dagID.toString()) .setKillRequestTime(killRequestTime) .setIsSessionStopped(isSessionStopped) @@ -75,8 +73,9 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent { } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGKillRequestProto proto = inputStream.readMessage(DAGKillRequestProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + RecoveryProtos.DAGKillRequestProto proto = + RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java index e5f5614..2bfa43b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.HistoryEvent; @@ -76,13 +76,13 @@ public class DAGRecoveredEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { + public void toProtoStream(OutputStream outputStream) throws IOException { throw new UnsupportedOperationException("Invalid operation for eventType " + getEventType().name()); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { + public void fromProtoStream(InputStream inputStream) throws IOException { throw new UnsupportedOperationException("Invalid operation for eventType " + getEventType().name()); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java index f1fdcac..d0e0e69 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -74,13 +74,13 @@ public class DAGStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGStartedProto proto = inputStream.readMessage(DAGStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index e04ee80..1b1fdf3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Map; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -127,13 +126,13 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - DAGSubmittedProto proto = inputStream.readMessage(DAGSubmittedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 96dc099..e9100e8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -20,10 +20,10 @@ package org.apache.tez.dag.history.events; import javax.annotation.Nullable; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.List; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; @@ -226,13 +226,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - TaskAttemptFinishedProto proto = inputStream.readMessage(TaskAttemptFinishedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + TaskAttemptFinishedProto proto = + TaskAttemptFinishedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index a49e47c..71d4419 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -91,13 +91,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - TaskAttemptStartedProto proto = inputStream.readMessage(TaskAttemptStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java index 6befa1a..71ff6c8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.common.counters.TezCounters; @@ -107,13 +107,13 @@ public class TaskFinishedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - TaskFinishedProto proto = inputStream.readMessage(TaskFinishedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java index cc62969..07dc2f9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -83,13 +83,13 @@ public class TaskStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - TaskStartedProto proto = inputStream.readMessage(TaskStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java index 8ff86b8..c452187 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -74,13 +73,13 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexCommitStartedProto proto = inputStream.readMessage(VertexCommitStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java index a2e2039..137342c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java @@ -18,12 +18,12 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.VertexLocationHint; @@ -155,13 +155,13 @@ public class VertexConfigurationDoneEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexConfigurationDoneProto proto = inputStream.readMessage(VertexConfigurationDoneProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java index 58cb628..a2cdae2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Map; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.impl.ServicePluginInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,13 +123,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexFinishedProto proto = inputStream.readMessage(VertexFinishedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java index c9d5aae..ec8f3e1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -95,13 +94,13 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexGroupCommitFinishedProto proto = inputStream.readMessage(VertexGroupCommitFinishedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java index cdd11bc..3de355c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java @@ -19,11 +19,10 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; @@ -95,13 +94,13 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexGroupCommitStartedProto proto = inputStream.readMessage(VertexGroupCommitStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java index e7452e6..90099fc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java @@ -19,12 +19,12 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; @@ -151,13 +151,14 @@ public class VertexInitializedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexInitializedProto proto = inputStream.readMessage(VertexInitializedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + RecoveryProtos.VertexInitializedProto proto = + RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java index 4a3e05f..a8bd21e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java @@ -19,9 +19,9 @@ package org.apache.tez.dag.history.events; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -74,13 +74,13 @@ public class VertexStartedEvent implements HistoryEvent { } @Override - public void toProtoStream(CodedOutputStream outputStream) throws IOException { - outputStream.writeMessageNoTag(toProto()); + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); } @Override - public void fromProtoStream(CodedInputStream inputStream) throws IOException { - VertexStartedProto proto = inputStream.readMessage(VertexStartedProto.PARSER, null); + public void fromProtoStream(InputStream inputStream) throws IOException { + VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream); if (proto == null) { throw new IOException("No data found in stream"); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index d874e0a..8c29172 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.protobuf.CodedOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -85,7 +84,8 @@ public class RecoveryService extends AbstractService { private FileSystem recoveryDirFS; // FS where staging dir exists Path recoveryPath; @VisibleForTesting - public Map<TezDAGID, RecoveryStream> outputStreamMap = new HashMap<>(); + public Map<TezDAGID, FSDataOutputStream> outputStreamMap = new + HashMap<TezDAGID, FSDataOutputStream>(); private int bufferSize; @VisibleForTesting public FSDataOutputStream summaryStream; @@ -101,31 +101,6 @@ public class RecoveryService extends AbstractService { private volatile boolean drained = true; private Object waitForDrained = new Object(); - @VisibleForTesting - public static class RecoveryStream { - private final FSDataOutputStream outputStream; - private final CodedOutputStream codedOutputStream; - - RecoveryStream(FSDataOutputStream outputStream) { - this.outputStream = outputStream; - this.codedOutputStream = CodedOutputStream.newInstance(outputStream); - } - - public void write(byte[] bytes) throws IOException { - codedOutputStream.writeRawBytes(bytes); - } - - public void flush() throws IOException { - codedOutputStream.flush(); - outputStream.hflush(); - } - - public void close() throws IOException { - flush(); - outputStream.close(); - } - } - public RecoveryService(AppContext appContext) { super(RecoveryService.class.getName()); this.appContext = appContext; @@ -256,9 +231,10 @@ public class RecoveryService extends AbstractService { } } } - for (Entry<TezDAGID, RecoveryStream> entry : outputStreamMap.entrySet()) { + for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) { try { LOG.info("Closing Output Stream for DAG " + entry.getKey()); + entry.getValue().hflush(); entry.getValue().close(); } catch (IOException ioe) { if (!recoveryDirFS.exists(recoveryPath)) { @@ -327,7 +303,7 @@ public class RecoveryService extends AbstractService { if (event.getHistoryEvent() instanceof SummaryEvent) { synchronized (lock) { if (stopped.get()) { - LOG.warn("Ignoring event as service stopped, eventType" + LOG.warn("Igoring event as service stopped, eventType" + event.getHistoryEvent().getEventType()); return; } @@ -453,9 +429,9 @@ public class RecoveryService extends AbstractService { return; } - RecoveryStream recoveryStream = outputStreamMap.get(dagID); - if (recoveryStream == null) { + if (!outputStreamMap.containsKey(dagID)) { Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString()); + FSDataOutputStream outputStream; if (recoveryDirFS.exists(dagFilePath)) { createFatalErrorFlagDir(); return; @@ -464,12 +440,12 @@ public class RecoveryService extends AbstractService { LOG.debug("Opening DAG recovery file in create mode" + ", filePath=" + dagFilePath); } - FSDataOutputStream outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); - recoveryStream = new RecoveryStream(outputStream); + outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize); } - outputStreamMap.put(dagID, recoveryStream); + outputStreamMap.put(dagID, outputStream); } + FSDataOutputStream outputStream = outputStreamMap.get(dagID); if (LOG.isDebugEnabled()) { LOG.debug("Writing recovery event to output stream" @@ -477,15 +453,15 @@ public class RecoveryService extends AbstractService { + ", eventType=" + eventType); } ++unflushedEventsCount; - recoveryStream.codedOutputStream.writeFixed32NoTag(event.getHistoryEvent().getEventType().ordinal()); - event.getHistoryEvent().toProtoStream(recoveryStream.codedOutputStream); + outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal()); + event.getHistoryEvent().toProtoStream(outputStream); if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED, HistoryEventType.DAG_FINISHED).contains(eventType)) { - maybeFlush(recoveryStream); + maybeFlush(outputStream); } } - private void maybeFlush(RecoveryStream recoveryStream) throws IOException { + private void maybeFlush(FSDataOutputStream outputStream) throws IOException { long currentTime = appContext.getClock().getTime(); boolean doFlush = false; if (maxUnflushedEvents >=0 @@ -506,12 +482,12 @@ public class RecoveryService extends AbstractService { if (!doFlush) { return; } - doFlush(recoveryStream, currentTime); + doFlush(outputStream, currentTime); } - private void doFlush(RecoveryStream recoveryStream, + private void doFlush(FSDataOutputStream outputStream, long currentTime) throws IOException { - recoveryStream.flush(); + outputStream.hflush(); if (LOG.isDebugEnabled()) { LOG.debug("Flushing output stream" http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index 1c09d5d..6673b39 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -20,15 +20,12 @@ package org.apache.tez.dag.app; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; -import com.google.common.collect.Sets; -import com.google.protobuf.CodedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,8 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.Level; import org.apache.log4j.LogManager; -import org.apache.tez.dag.api.TaskLocationHint; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -64,7 +59,6 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; import org.apache.tez.dag.history.events.TaskFinishedEvent; import org.apache.tez.dag.history.events.TaskStartedEvent; import org.apache.tez.dag.history.events.VertexCommitStartedEvent; -import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; @@ -97,8 +91,6 @@ public class TestRecoveryParser { private Path recoveryPath; private DAGAppMaster mockAppMaster; private DAGImpl mockDAGImpl; - // Protobuf message limit is 64 MB by default - private static final int PROTOBUF_DEFAULT_SIZE_LIMIT = 64 << 20; @Before public void setUp() throws IllegalArgumentException, IOException { @@ -113,6 +105,7 @@ public class TestRecoveryParser { mockDAGImpl = mock(DAGImpl.class); when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl); parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3); + LogManager.getRootLogger().setLevel(Level.DEBUG); } private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) { @@ -274,7 +267,7 @@ public class TestRecoveryParser { null, "user", new Configuration(), null, null))); // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread rService.await(); - rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8")); + rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); rService.stop(); // write data in attempt_2 @@ -285,7 +278,7 @@ public class TestRecoveryParser { rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); rService.await(); - rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8")); + rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); rService.stop(); // corrupted last records will be skipped but the whole recovery logs will be read @@ -625,75 +618,6 @@ public class TestRecoveryParser { + ", but its full recovery events are not seen")); } - @Test(timeout=20000) - public void testRecoveryLargeEventData() throws IOException { - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - TezDAGID dagID = TezDAGID.getInstance(appId, 1); - AppContext appContext = mock(AppContext.class); - when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1")); - when(appContext.getClock()).thenReturn(new SystemClock()); - when(mockDAGImpl.getID()).thenReturn(dagID); - when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim()); - when(appContext.getApplicationID()).thenReturn(appId); - - RecoveryService rService = new RecoveryService(appContext); - Configuration conf = new Configuration(); - conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); - rService.init(conf); - rService.start(); - - DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan(); - // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent - rService.handle(new DAGHistoryEvent(dagID, - new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null, null))); - DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, - "user", "dagName", null); - DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName"); - rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent)); - rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent)); - - // Create a Recovery event larger than 64 MB to verify default max protobuf size - ArrayList<TaskLocationHint> taskLocationHints = new ArrayList<>(100000); - TaskLocationHint taskLocationHint = TaskLocationHint.createTaskLocationHint( - Sets.newHashSet("aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa", - "bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb", - "ccccccccccccccc.ccccccccccccccc.ccccccccccccccc", - "ddddddddddddddd.ddddddddddddddd.ddddddddddddddd", - "eeeeeeeeeeeeeee.eeeeeeeeeeeeeee.eeeeeeeeeeeeeee", - "fffffffffffffff.fffffffffffffff.fffffffffffffff", - "ggggggggggggggg.ggggggggggggggg.ggggggggggggggg", - "hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh", - "iiiiiiiiiiiiiii.iiiiiiiiiiiiiii.iiiiiiiiiiiiiii", - "jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj", - "kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk", - "lllllllllllllll.lllllllllllllll.lllllllllllllll", - "mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm", - "nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn"), - Sets.newHashSet("rack1", "rack2", "rack3")); - for (int i = 0; i < 100000; i++) { - taskLocationHints.add(taskLocationHint); - } - - TezVertexID v0Id = TezVertexID.getInstance(dagID, 0); - VertexLocationHint vertexLocationHint = VertexLocationHint.create(taskLocationHints); - VertexConfigurationDoneEvent vertexConfigurationDoneEvent = new VertexConfigurationDoneEvent( - v0Id, 0, 100000, vertexLocationHint, null, null, false); - // Verify large protobuf message - assertTrue(vertexConfigurationDoneEvent.toProto().getSerializedSize() > PROTOBUF_DEFAULT_SIZE_LIMIT ); - rService.handle(new DAGHistoryEvent(dagID, vertexConfigurationDoneEvent)); - rService.stop(); - - DAGRecoveryData dagData = parser.parseRecoveryData(); - VertexRecoveryData v0data = dagData.getVertexRecoveryData(v0Id); - assertNotNull("Vertex Recovery Data should be non-null", v0data); - VertexConfigurationDoneEvent parsedVertexConfigurationDoneEvent = v0data.getVertexConfigurationDoneEvent(); - assertNotNull("Vertex Configuration Done Event should be non-null", parsedVertexConfigurationDoneEvent); - VertexLocationHint parsedVertexLocationHint = parsedVertexConfigurationDoneEvent.getVertexLocationHint(); - assertNotNull("Vertex Location Hint should be non-null", parsedVertexLocationHint); - assertEquals(parsedVertexLocationHint.getTaskLocationHints().size(), 100000); - } - @Test(timeout=5000) public void testRecoveryData() throws IOException { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 50a80cb..47d8389 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -23,8 +23,6 @@ import static org.junit.Assert.fail; import java.nio.ByteBuffer; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +89,7 @@ public class TestHistoryEventsProtoConversion { private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException { ByteArrayOutputStream os = new ByteArrayOutputStream(); HistoryEvent deserializedEvent = null; - CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(os); - event.toProtoStream(codedOutputStream); - codedOutputStream.flush(); + event.toProtoStream(os); os.flush(); os.close(); deserializedEvent = ReflectionUtils.createClazzInstance( @@ -102,7 +98,7 @@ public class TestHistoryEventsProtoConversion { + ", eventType=" + event.getEventType() + ", bufLen=" + os.toByteArray().length); deserializedEvent.fromProtoStream( - CodedInputStream.newInstance(os.toByteArray())); + new ByteArrayInputStream(os.toByteArray())); return deserializedEvent; } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index bac0e8c..c727a8f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -136,11 +135,6 @@ public class MiniTezCluster extends MiniYARNCluster { conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000); - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); - conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); try { Path stagingPath = FileContext.getFileContext(conf).makeQualified( http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java index 50c5a66..c08780f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -24,8 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.ReflectionUtils; @@ -224,16 +222,15 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { private String encodeHistoryEvent(HistoryEvent event) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); - event.toProtoStream(codedOutputStream); - codedOutputStream.flush(); + event.toProtoStream(out); return event.getClass().getName() + "," + Base64.encodeBase64String(out.toByteArray()); } private HistoryEvent decodeHistoryEvent(String eventClass, String base64) throws IOException { - CodedInputStream in = CodedInputStream.newInstance(Base64.decodeBase64(base64)); + ByteArrayInputStream in = new ByteArrayInputStream( + Base64.decodeBase64(base64)); try { HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass); event.fromProtoStream(in); http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 6d3ab1c..f00ae5c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -26,7 +26,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Random; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -176,9 +175,6 @@ public class TestAMRecovery { tezConf.setBoolean( RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezSession = TezClient.create("TestDAGRecovery", tezConf); tezSession.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index cf4744b..b0c9ccc 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -18,9 +18,6 @@ package org.apache.tez.test; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -136,9 +133,6 @@ public class TestDAGRecovery { tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m"); tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false"); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezSession = TezClient.create("TestDAGRecovery", tezConf); tezSession.start(); @@ -160,7 +154,7 @@ public class TestDAGRecovery { void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception { tezSession.waitTillReady(); DAGClient dagClient = tezSession.submitDAG(dag); - DAGStatus dagStatus = dagClient.getDAGStatus(null, 10); + DAGStatus dagStatus = dagClient.getDAGStatus(null); while (!dagStatus.isCompleted()) { LOG.info("Waiting for dag to complete. Sleeping for 500ms." + " DAG name: " + dag.getName() http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index c7b1fb9..93fd972 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -32,7 +32,6 @@ import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -479,9 +478,6 @@ public class TestRecovery { RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false); tezConf.setBoolean( TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0); - tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000); tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG"); hashJoinExample.setConf(tezConf);
