Repository: tez Updated Branches: refs/heads/master db6d9b29e -> 373793b01
TEZ-1472. Separate method calls for creating InputDataInformationEvent with serialized/unserialized payloads (Siddharth Seth via bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/373793b0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/373793b0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/373793b0 Branch: refs/heads/master Commit: 373793b013877eb5f22965c9d6cf20aac1076a8b Parents: db6d9b2 Author: Bikas Saha <[email protected]> Authored: Wed Aug 20 19:20:41 2014 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Aug 20 19:20:41 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 4 +++- .../api/events/InputDataInformationEvent.java | 10 ++++++---- .../dag/app/dag/impl/TestRootInputVertexManager.java | 6 ++++-- .../apache/tez/dag/app/dag/impl/TestVertexImpl.java | 9 ++++++--- .../mapreduce/common/MRInputAMSplitGenerator.java | 9 ++++++--- .../mapreduce/common/MRInputSplitDistributor.java | 10 ++++------ .../apache/tez/mapreduce/input/TestMultiMRInput.java | 15 ++++++++++----- .../java/org/apache/tez/common/ProtoConverters.java | 2 +- 8 files changed, 40 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0deb1ca..f2e8ec8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,7 +79,9 @@ INCOMPATIBLE CHANGES TEZ-1455. Replace deprecated junit.framework.Assert with org.junit.Assert TEZ-1465. Update and document IntersectExample. Change name to JoinExample TEZ-1449. Change user payloads to work with a byte buffer - TEZ-1472. AM/Session LRs are not shipped to vertices in new API use-case + TEZ-1469. AM/Session LRs are not shipped to vertices in new API use-case + TEZ-1472. Separate method calls for creating InputDataInformationEvent with + serialized/unserialized payloads Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index ffeb7ff..a62a341 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -58,7 +58,7 @@ public final class InputDataInformationEvent extends Event { this.userPayloadObject = null; } - private InputDataInformationEvent(int srcIndex, Object userPayloadDeserialized) { + private InputDataInformationEvent(int srcIndex, Object userPayloadDeserialized, Object sigChanged) { this.sourceIndex = srcIndex; this.userPayloadObject = userPayloadDeserialized; this.userPayload = null; @@ -69,12 +69,14 @@ public final class InputDataInformationEvent extends Event { * @param srcIndex the src index * @param userPayload the serialized payload */ - public static InputDataInformationEvent create(int srcIndex, ByteBuffer userPayload) { + public static InputDataInformationEvent createWithSerializedPayload(int srcIndex, + ByteBuffer userPayload) { return new InputDataInformationEvent(srcIndex, userPayload); } - public static InputDataInformationEvent create(int srcIndex, Object userPayloadDeserialized) { - return new InputDataInformationEvent(srcIndex, userPayloadDeserialized); + public static InputDataInformationEvent createWithObjectPayload(int srcIndex, + Object userPayloadDeserialized) { + return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null); } public int getSourceIndex() { http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java index b9aafa2..fc02bbf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java @@ -48,14 +48,16 @@ public class TestRootInputVertexManager { InputDescriptor id1 = mock(InputDescriptor.class); List<Event> events1 = new LinkedList<Event>(); - InputDataInformationEvent diEvent11 = InputDataInformationEvent.create(0, null); + InputDataInformationEvent diEvent11 = InputDataInformationEvent.createWithSerializedPayload(0, + null); events1.add(diEvent11); rootInputVertexManager.onRootVertexInitialized("input1", id1, events1); // All good so far, single input only. InputDescriptor id2 = mock(InputDescriptor.class); List<Event> events2 = new LinkedList<Event>(); - InputDataInformationEvent diEvent21 = InputDataInformationEvent.create(0, null); + InputDataInformationEvent diEvent21 = InputDataInformationEvent.createWithSerializedPayload(0, + null); events2.add(diEvent21); try { // Should fail due to second input http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 30b4275..75d0fff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2914,7 +2914,8 @@ public class TestVertexImpl { VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData( EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", ta0_t0_v1))); - events.add(new TezEvent(InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[0])), + events.add(new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, + ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", null))); dispatcher.getEventHandler().handle( @@ -3259,7 +3260,8 @@ public class TestVertexImpl { targetTasks, VertexLocationHint.create(locationHints), null); events.add(configEvent); for (int i = 0; i < targetTasks; i++) { - InputDataInformationEvent diEvent = InputDataInformationEvent.create(i, null); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(i, + null); events.add(diEvent); } eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs @@ -3462,7 +3464,8 @@ public class TestVertexImpl { lock.unlock(); } initComplete.set(true); - InputDataInformationEvent diEvent = InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[]{0})); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(0, + ByteBuffer.wrap(new byte[]{0})); List<Event> eventList = new LinkedList<Event>(); eventList.add(diEvent); return eventList; http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index 2408ddb..884054b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -142,7 +142,8 @@ public class MRInputAMSplitGenerator extends InputInitializer { int count = 0; for (MRSplitProto mrSplit : splitsProto.getSplitsList()) { // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array. - InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( + count++, mrSplit.toByteString().asReadOnlyByteBuffer()); events.add(diEvent); } @@ -150,12 +151,14 @@ public class MRInputAMSplitGenerator extends InputInitializer { int count = 0; if (inputSplitInfo.holdsNewFormatSplits()) { for (org.apache.hadoop.mapreduce.InputSplit split : inputSplitInfo.getNewFormatSplits()) { - InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload( + count++, split); events.add(diEvent); } } else { for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) { - InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split); + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload( + count++, split); events.add(diEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index e28a3a5..1307687 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -21,7 +21,6 @@ package org.apache.tez.mapreduce.common; import java.io.IOException; import java.util.List; -import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -39,14 +38,12 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; /** * Implements an {@link InputInitializer} that distributes Map Reduce @@ -111,16 +108,17 @@ public class MRInputSplitDistributor extends InputInitializer { if (sendSerializedEvents) { // Unnecessary array copy, can be avoided by using ByteBuffer instead of // a raw array. - diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer()); + diEvent = InputDataInformationEvent.createWithSerializedPayload(count++, + mrSplit.toByteString().asReadOnlyByteBuffer()); } else { if (useNewApi) { org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils .getNewSplitDetailsFromEvent(mrSplit, conf); - diEvent = InputDataInformationEvent.create(count++, newInputSplit); + diEvent = InputDataInformationEvent.createWithObjectPayload(count++, newInputSplit); } else { org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils .getOldSplitDetailsFromEvent(mrSplit, conf); - diEvent = InputDataInformationEvent.create(count++, oldInputSplit); + diEvent = InputDataInformationEvent.createWithObjectPayload(count++, oldInputSplit); } } events.add(diEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index 121a975..05f6bbc 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -112,7 +112,8 @@ public class TestMultiMRInput { MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]); InputDataInformationEvent event = - InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(0, + splitProto.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event); @@ -172,11 +173,13 @@ public class TestMultiMRInput { MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]); InputDataInformationEvent event1 = - InputDataInformationEvent.create(0, splitProto1.toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(0, + splitProto1.toByteString().asReadOnlyByteBuffer()); MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]); InputDataInformationEvent event2 = - InputDataInformationEvent.create(0, splitProto2.toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(0, + splitProto2.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event1); @@ -225,9 +228,11 @@ public class TestMultiMRInput { MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]); InputDataInformationEvent event1 = - InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(0, + splitProto.toByteString().asReadOnlyByteBuffer()); InputDataInformationEvent event2 = - InputDataInformationEvent.create(1, splitProto.toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(1, + splitProto.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event1); http://git-wip-us.apache.org/repos/asf/tez/blob/373793b0/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index 0650a90..f765f42 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -101,7 +101,7 @@ public class ProtoConverters { public static InputDataInformationEvent convertRootInputDataInformationEventFromProto( EventProtos.RootInputDataInformationEventProto proto) { - InputDataInformationEvent diEvent = InputDataInformationEvent.create( + InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( proto.getSourceIndex(), proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); diEvent.setTargetIndex(proto.getTargetIndex());
