Repository: tez Updated Branches: refs/heads/master 54a429e0b -> 57bd9f7e3
TEZ-1449. Change user payloads to work with a byte buffer (Siddharth Seth via bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57bd9f7e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57bd9f7e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57bd9f7e Branch: refs/heads/master Commit: 57bd9f7e3e1a8c34687f0c48f16bc7d5552c7874 Parents: 54a429e Author: Bikas Saha <[email protected]> Authored: Tue Aug 19 17:38:50 2014 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Aug 19 17:38:50 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../api/events/CompositeDataMovementEvent.java | 11 ++++++----- .../runtime/api/events/DataMovementEvent.java | 20 +++++++++++--------- .../api/events/InputDataInformationEvent.java | 13 ++++++++----- .../api/events/InputInitializerEvent.java | 12 +++++++----- .../api/events/InputUpdatePayloadEvent.java | 12 +++++++----- .../runtime/api/events/VertexManagerEvent.java | 12 +++++++----- .../event/TestCompositeDataMovementEvent.java | 4 +++- .../app/dag/impl/RootInputVertexManager.java | 2 +- .../apache/tez/dag/app/dag/impl/TestEdge.java | 9 ++++++--- .../tez/dag/app/dag/impl/TestVertexImpl.java | 12 ++++++------ .../common/MRInputAMSplitGenerator.java | 2 +- .../common/MRInputSplitDistributor.java | 7 +++++-- .../org/apache/tez/mapreduce/input/MRInput.java | 3 ++- .../tez/mapreduce/input/MultiMRInput.java | 3 ++- .../common/TestMRInputSplitDistributor.java | 4 ++-- .../tez/mapreduce/input/TestMultiMRInput.java | 11 ++++++----- .../org/apache/tez/common/ProtoConverters.java | 10 +++++----- .../vertexmanager/ShuffleVertexManager.java | 2 +- .../shuffle/impl/ShuffleInputEventHandler.java | 3 ++- .../writers/UnorderedPartitionedKVWriter.java | 2 +- .../output/OrderedPartitionedKVOutput.java | 4 ++-- .../library/output/UnorderedKVOutput.java | 2 +- .../impl/ShuffleInputEventHandlerImpl.java | 4 +++- .../vertexmanager/TestShuffleVertexManager.java | 11 ++++++----- .../impl/TestShuffleInputEventHandler.java | 3 ++- .../TestUnorderedPartitionedKVWriter.java | 11 +++++++---- .../library/output/TestOnFileSortedOutput.java | 8 +++++--- .../output/TestOnFileUnorderedKVOutput.java | 3 ++- .../impl/TestShuffleInputEventHandlerImpl.java | 3 ++- .../java/org/apache/tez/test/TestInput.java | 2 +- .../java/org/apache/tez/test/TestOutput.java | 3 ++- 32 files changed, 123 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e46867e..c611668 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -78,6 +78,7 @@ INCOMPATIBLE CHANGES TEZ-1246. Replace constructors with create() methods for DAG, Vertex, Edge etc in the API TEZ-1455. Replace deprecated junit.framework.Assert with org.junit.Assert TEZ-1465. Update and document IntersectExample. Change name to JoinExample + TEZ-1449. Change user payloads to work with a byte buffer Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java index 87403de..b38fda3 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -42,9 +43,9 @@ public class CompositeDataMovementEvent extends Event { protected final int count; protected int version; - protected final byte[] userPayload; + protected final ByteBuffer userPayload; - private CompositeDataMovementEvent(int srcIndexStart, int count, byte[] userPayload) { + private CompositeDataMovementEvent(int srcIndexStart, int count, ByteBuffer userPayload) { this.sourceIndexStart = srcIndexStart; this.count = count; this.userPayload = userPayload; @@ -61,7 +62,7 @@ public class CompositeDataMovementEvent extends Event { * the common payload between all the events. */ public static CompositeDataMovementEvent create(int srcIndexStart, int count, - byte[] userPayload) { + ByteBuffer userPayload) { return new CompositeDataMovementEvent(srcIndexStart, count, userPayload); } @@ -73,8 +74,8 @@ public class CompositeDataMovementEvent extends Event { return count; } - public byte[] getUserPayload() { - return userPayload; + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } public void setVersion(int version) { http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java index 88b36cb..b9c1cc4 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.tez.runtime.api.Event; @@ -46,7 +48,7 @@ public final class DataMovementEvent extends Event { /** * User Payload for this Event */ - private final byte[] userPayload; + private final ByteBuffer userPayload; /** * Version number to indicate what attempt generated this Event @@ -55,7 +57,7 @@ public final class DataMovementEvent extends Event { private DataMovementEvent(int sourceIndex, - byte[] userPayload) { + ByteBuffer userPayload) { this.userPayload = userPayload; this.sourceIndex = sourceIndex; } @@ -64,14 +66,14 @@ public final class DataMovementEvent extends Event { private DataMovementEvent(int sourceIndex, int targetIndex, int version, - byte[] userPayload) { + ByteBuffer userPayload) { this.userPayload = userPayload; this.sourceIndex = sourceIndex; this.version = version; this.targetIndex = targetIndex; } - private DataMovementEvent(byte[] userPayload) { + private DataMovementEvent(ByteBuffer userPayload) { this(-1, userPayload); } @@ -82,7 +84,7 @@ public final class DataMovementEvent extends Event { * @param userPayload User Payload of the User Event */ public static DataMovementEvent create(int sourceIndex, - byte[] userPayload) { + ByteBuffer userPayload) { return new DataMovementEvent(sourceIndex, userPayload); } @@ -90,7 +92,7 @@ public final class DataMovementEvent extends Event { * Constructor for Processor-generated User Events * @param userPayload */ - public static DataMovementEvent create(byte[] userPayload) { + public static DataMovementEvent create(ByteBuffer userPayload) { return new DataMovementEvent(userPayload); } @@ -98,12 +100,12 @@ public final class DataMovementEvent extends Event { public static DataMovementEvent create(int sourceIndex, int targetIndex, int version, - byte[] userPayload) { + ByteBuffer userPayload) { return new DataMovementEvent(sourceIndex, targetIndex, version, userPayload); } - public byte[] getUserPayload() { - return userPayload; + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } public int getSourceIndex() { http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index ec1d85e..ffeb7ff 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.tez.dag.api.VertexManagerPlugin; @@ -43,13 +45,14 @@ import org.apache.tez.runtime.api.InputInitializer; @Public public final class InputDataInformationEvent extends Event { + private final int sourceIndex; private int targetIndex; // TODO Likely to be multiple at a later point. - private final byte[] userPayload; + private final ByteBuffer userPayload; private final Object userPayloadObject; - private InputDataInformationEvent(int srcIndex, byte[] userPayload) { + private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) { this.sourceIndex = srcIndex; this.userPayload = userPayload; this.userPayloadObject = null; @@ -66,7 +69,7 @@ public final class InputDataInformationEvent extends Event { * @param srcIndex the src index * @param userPayload the serialized payload */ - public static InputDataInformationEvent create(int srcIndex, byte[] userPayload) { + public static InputDataInformationEvent create(int srcIndex, ByteBuffer userPayload) { return new InputDataInformationEvent(srcIndex, userPayload); } @@ -86,8 +89,8 @@ public final class InputDataInformationEvent extends Event { this.targetIndex = target; } - public byte[] getUserPayload() { - return userPayload; + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } public Object getDeserializedUserPayload() { http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java index e4e9ef9..3c5e78e 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java @@ -20,6 +20,8 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; + import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -38,10 +40,10 @@ public class InputInitializerEvent extends Event { private String targetVertexName; private String targetInputName; - private byte[] eventPayload; + private ByteBuffer eventPayload; private InputInitializerEvent(String targetVertexName, String targetInputName, - byte[] eventPayload) { + ByteBuffer eventPayload) { Preconditions.checkNotNull(targetVertexName, "TargetVertexName cannot be null"); Preconditions.checkNotNull(targetInputName, "TargetInputName cannot be null"); this.targetVertexName = targetVertexName; @@ -56,7 +58,7 @@ public class InputInitializerEvent extends Event { * payload to a few KB at max */ public static InputInitializerEvent create(String targetVertexName, String targetInputName, - byte[] eventPayload) { + ByteBuffer eventPayload) { return new InputInitializerEvent(targetVertexName, targetInputName, eventPayload); } @@ -83,7 +85,7 @@ public class InputInitializerEvent extends Event { * * @return a byte representation of the payload */ - public byte[] getUserPayload() { - return this.eventPayload; + public ByteBuffer getUserPayload() { + return eventPayload == null ? null : eventPayload.asReadOnlyBuffer(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java index 2d1a8c9..2cfec69 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputUpdatePayloadEvent.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.tez.runtime.api.Event; @@ -36,18 +38,18 @@ import com.google.common.base.Preconditions; @Public public class InputUpdatePayloadEvent extends Event { - private final byte[] userPayload; + private final ByteBuffer userPayload; - private InputUpdatePayloadEvent(byte[] userPayload) { + private InputUpdatePayloadEvent(ByteBuffer userPayload) { Preconditions.checkNotNull(userPayload); this.userPayload = userPayload; } - public static InputUpdatePayloadEvent create(byte[] userPayload) { + public static InputUpdatePayloadEvent create(ByteBuffer userPayload) { return new InputUpdatePayloadEvent(userPayload); } - public byte[] getUserPayload() { - return userPayload; + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java index d93294c..484087e 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.api.events; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.tez.runtime.api.Event; @@ -41,9 +43,9 @@ public class VertexManagerEvent extends Event { /** * User payload to be sent */ - private final byte[] userPayload; + private final ByteBuffer userPayload; - private VertexManagerEvent(String vertexName, byte[] userPayload) { + private VertexManagerEvent(String vertexName, ByteBuffer userPayload) { Preconditions.checkArgument(vertexName != null); Preconditions.checkArgument(userPayload != null); this.targetVertexName = vertexName; @@ -55,7 +57,7 @@ public class VertexManagerEvent extends Event { * @param vertexName * @param userPayload This should not be modified since a reference is kept */ - public static VertexManagerEvent create(String vertexName, byte[] userPayload) { + public static VertexManagerEvent create(String vertexName, ByteBuffer userPayload) { return new VertexManagerEvent(vertexName, userPayload); } @@ -63,7 +65,7 @@ public class VertexManagerEvent extends Event { return targetVertexName; } - public byte[] getUserPayload() { - return userPayload; + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java index 20e85ea..f162d1d 100644 --- a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java @@ -17,13 +17,15 @@ package org.apache.tez.runtime.api.event; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.nio.ByteBuffer; + import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.junit.Assert; import org.junit.Test; public class TestCompositeDataMovementEvent { - byte[] userPayload = "Dummy userPayLoad".getBytes(); + ByteBuffer userPayload = ByteBuffer.wrap("Dummy userPayLoad".getBytes()); @Test public void testGetCount(){ http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java index 34e3af3..e6ffdc5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java @@ -98,7 +98,7 @@ public class RootInputVertexManager extends VertexManagerPlugin { // No tasks should have been started yet. Checked by initial state check. Preconditions.checkState(dataInformationEventSeen == false); inputDescriptor.setUserPayload(UserPayload.create( - ByteBuffer.wrap(((InputUpdatePayloadEvent) event).getUserPayload()))); + ((InputUpdatePayloadEvent) event).getUserPayload())); } else if (event instanceof InputDataInformationEvent) { dataInformationEventSeen = true; // # Tasks should have been set by this point. http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java index 19e7067..f572237 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; @@ -88,7 +89,7 @@ public class TestEdge { // Verification via a CompositeEvent CompositeDataMovementEvent cdmEvent = CompositeDataMovementEvent.create(0, destTasks.size(), - "bytes".getBytes()); + ByteBuffer.wrap("bytes".getBytes())); cdmEvent.setVersion(2); // AttemptNum TezEvent tezEvent = new TezEvent(cdmEvent, srcMeta); // Event setup to look like it would after the Vertex is done with it. @@ -104,7 +105,7 @@ public class TestEdge { // Same Verification via regular DataMovementEvents reset(eventHandler); for (int i = 0 ; i < destTasks.size() ; i++) { - DataMovementEvent dmEvent = DataMovementEvent.create(i, "bytes".getBytes()); + DataMovementEvent dmEvent = DataMovementEvent.create(i, ByteBuffer.wrap("bytes".getBytes())); dmEvent.setVersion(2); tezEvent = new TezEvent(dmEvent, srcMeta); edge.sendTezEventToDestinationTasks(tezEvent); @@ -133,7 +134,9 @@ public class TestEdge { assertEquals(srcTAID.getId(), dmEvent.getVersion()); assertEquals(count, dmEvent.getSourceIndex()); assertEquals(srcTAID.getTaskID().getId(), dmEvent.getTargetIndex()); - assertTrue(Arrays.equals("bytes".getBytes(), dmEvent.getUserPayload())); + byte[] res = new byte[dmEvent.getUserPayload().limit() - dmEvent.getUserPayload().position()]; + dmEvent.getUserPayload().slice().get(res); + assertTrue(Arrays.equals("bytes".getBytes(), res)); count++; } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index c84c1a6..30b4275 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -1894,10 +1894,10 @@ public class TestVertexImpl { List<TezEvent> taskEvents = Lists.newLinkedList(); TezEvent tezEvent1 = new TezEvent( - CompositeDataMovementEvent.create(0, 1, new byte[0]), + CompositeDataMovementEvent.create(0, 1, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2)); TezEvent tezEvent2 = new TezEvent( - DataMovementEvent.create(0, new byte[0]), + DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2)); taskEvents.add(tezEvent1); taskEvents.add(tezEvent2); @@ -2911,10 +2911,10 @@ public class TestVertexImpl { TezTaskID t0_v1 = TezTaskID.getInstance(v1.getVertexId(), 0); TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance(t0_v1, 0); events.add(new TezEvent( - VertexManagerEvent.create("vertex2", new byte[0]), new EventMetaData( + VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData( EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", ta0_t0_v1))); - events.add(new TezEvent(InputDataInformationEvent.create(0, new byte[0]), + events.add(new TezEvent(InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", null))); dispatcher.getEventHandler().handle( @@ -3244,7 +3244,7 @@ public class TestVertexImpl { public void completeInputDistribution(byte[] payload) { List<Event> events = Lists.newArrayListWithCapacity(1); - InputUpdatePayloadEvent event = InputUpdatePayloadEvent.create(payload); + InputUpdatePayloadEvent event = InputUpdatePayloadEvent.create(ByteBuffer.wrap(payload)); events.add(event); eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs .get(0).getName(), events)); @@ -3462,7 +3462,7 @@ public class TestVertexImpl { lock.unlock(); } initComplete.set(true); - InputDataInformationEvent diEvent = InputDataInformationEvent.create(0, new byte[]{0}); + InputDataInformationEvent diEvent = InputDataInformationEvent.create(0, ByteBuffer.wrap(new byte[]{0})); List<Event> eventList = new LinkedList<Event>(); eventList.add(diEvent); return eventList; http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index a2777a8..2408ddb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -143,7 +143,7 @@ public class MRInputAMSplitGenerator extends InputInitializer { for (MRSplitProto mrSplit : splitsProto.getSplitsList()) { // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array. InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, - mrSplit.toByteArray()); + mrSplit.toByteString().asReadOnlyByteBuffer()); events.add(diEvent); } } else { http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index 1c28c2b..e28a3a5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.common; import java.io.IOException; import java.util.List; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -38,12 +39,14 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; /** * Implements an {@link InputInitializer} that distributes Map Reduce @@ -96,7 +99,7 @@ public class MRInputSplitDistributor extends InputInitializer { List<Event> events = Lists.newArrayListWithCapacity(this.splitsProto.getSplitsCount() + 1); InputUpdatePayloadEvent updatePayloadEvent = InputUpdatePayloadEvent.create( - updatedPayloadBuilder.build().toByteArray()); + updatedPayloadBuilder.build().toByteString().asReadOnlyByteBuffer()); events.add(updatePayloadEvent); int count = 0; @@ -108,7 +111,7 @@ public class MRInputSplitDistributor extends InputInitializer { if (sendSerializedEvents) { // Unnecessary array copy, can be avoided by using ByteBuffer instead of // a raw array. - diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteArray()); + diEvent = InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer()); } else { if (useNewApi) { org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index ebc2d45..7deef7c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -556,7 +557,7 @@ public class MRInput extends MRInputBase { private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException { LOG.info("Initializing RecordReader from event"); Preconditions.checkState(initEvent != null, "InitEvent must be specified"); - MRSplitProto splitProto = MRSplitProto.parseFrom(initEvent.getUserPayload()); + MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); Object split = null; if (useNewApi) { split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, jobConf); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index b360840..4ab5a52 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -28,6 +28,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -114,7 +115,7 @@ public class MultiMRInput extends MRInputBase { private MRReader initFromEvent(InputDataInformationEvent event) throws IOException { Preconditions.checkState(event != null, "Event must be specified"); LOG.info("Initializing Reader: " + eventCount.get()); - MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload()); + MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload())); Object split = null; MRReader reader = null; JobConf localJobConf = new JobConf(jobConf); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java index ebc12d4..b1a0880 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java @@ -90,12 +90,12 @@ public class TestMRInputSplitDistributor { assertNotNull(diEvent1.getUserPayload()); assertNotNull(diEvent2.getUserPayload()); - MRSplitProto event1Proto = MRSplitProto.parseFrom(diEvent1.getUserPayload()); + MRSplitProto event1Proto = MRSplitProto.parseFrom(ByteString.copyFrom(diEvent1.getUserPayload())); InputSplit is1 = MRInputUtils.getOldSplitDetailsFromEvent(event1Proto, new Configuration()); assertTrue(is1 instanceof InputSplitForTest); assertEquals(1, ((InputSplitForTest) is1).identifier); - MRSplitProto event2Proto = MRSplitProto.parseFrom(diEvent2.getUserPayload()); + MRSplitProto event2Proto = MRSplitProto.parseFrom(ByteString.copyFrom(diEvent2.getUserPayload())); InputSplit is2 = MRInputUtils.getOldSplitDetailsFromEvent(event2Proto, new Configuration()); assertTrue(is2 instanceof InputSplitForTest); assertEquals(2, ((InputSplitForTest) is2).identifier); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java index f418bbb..121a975 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java @@ -111,7 +111,8 @@ public class TestMultiMRInput { assertEquals(1, splits.length); MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]); - InputDataInformationEvent event = InputDataInformationEvent.create(0, splitProto.toByteArray()); + InputDataInformationEvent event = + InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event); @@ -171,11 +172,11 @@ public class TestMultiMRInput { MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]); InputDataInformationEvent event1 = - InputDataInformationEvent.create(0, splitProto1.toByteArray()); + InputDataInformationEvent.create(0, splitProto1.toByteString().asReadOnlyByteBuffer()); MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]); InputDataInformationEvent event2 = - InputDataInformationEvent.create(0, splitProto2.toByteArray()); + InputDataInformationEvent.create(0, splitProto2.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event1); @@ -224,9 +225,9 @@ public class TestMultiMRInput { MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]); InputDataInformationEvent event1 = - InputDataInformationEvent.create(0, splitProto.toByteArray()); + InputDataInformationEvent.create(0, splitProto.toByteString().asReadOnlyByteBuffer()); InputDataInformationEvent event2 = - InputDataInformationEvent.create(1, splitProto.toByteArray()); + InputDataInformationEvent.create(1, splitProto.toByteString().asReadOnlyByteBuffer()); eventList.clear(); eventList.add(event1); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index 4419b53..0650a90 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -48,7 +48,7 @@ public class ProtoConverters { proto.getTargetIndex(), proto.getVersion(), proto.getUserPayload() != null ? - proto.getUserPayload().toByteArray() : null); + proto.getUserPayload().asReadOnlyByteBuffer() : null); } public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto( @@ -67,7 +67,7 @@ public class ProtoConverters { EventProtos.CompositeEventProto proto) { return CompositeDataMovementEvent.create(proto.getStartIndex(), proto.getCount(), - proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null); + proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); } public static EventProtos.VertexManagerEventProto convertVertexManagerEventToProto( @@ -83,7 +83,7 @@ public class ProtoConverters { public static VertexManagerEvent convertVertexManagerEventFromProto( EventProtos.VertexManagerEventProto vmProto) { return VertexManagerEvent.create(vmProto.getTargetVertexName(), - vmProto.hasUserPayload() ? vmProto.getUserPayload().toByteArray() : null); + vmProto.hasUserPayload() ? vmProto.getUserPayload().asReadOnlyByteBuffer() : null); } public static EventProtos.RootInputDataInformationEventProto @@ -103,7 +103,7 @@ public class ProtoConverters { EventProtos.RootInputDataInformationEventProto proto) { InputDataInformationEvent diEvent = InputDataInformationEvent.create( proto.getSourceIndex(), - proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null); + proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); diEvent.setTargetIndex(proto.getTargetIndex()); return diEvent; } @@ -124,7 +124,7 @@ public class ProtoConverters { EventProtos.RootInputInitializerEventProto proto) { InputInitializerEvent event = InputInitializerEvent.create(proto.getTargetVertexName(), proto.getTargetInputName(), - (proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null)); + (proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null)); return event; } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 01de0bd..52e725e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -349,7 +349,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // save output size VertexManagerEventPayloadProto proto; try { - proto = VertexManagerEventPayloadProto.parseFrom(vmEvent.getUserPayload()); + proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload())); } catch (InvalidProtocolBufferException e) { throw new TezUncheckedException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java index 7cbbf07..b664ea7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java @@ -24,6 +24,7 @@ import java.util.BitSet; import java.util.List; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -90,7 +91,7 @@ public class ShuffleInputEventHandler { private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException { DataMovementEventPayloadProto shufflePayload; try { - shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload()); + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); } catch (InvalidProtocolBufferException e) { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 1eb09bc..50664f5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -491,7 +491,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } CompositeDataMovementEvent cDme = CompositeDataMovementEvent.create(0, numPartitions, - payloadBuidler.build().toByteArray()); + payloadBuidler.build().toByteString().asReadOnlyByteBuffer()); return cDme; } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 8708c6f..15121da 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -192,7 +192,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000)); DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); - byte[] payload = payloadProto.toByteArray(); + ByteBuffer payload = payloadProto.toByteString().asReadOnlyByteBuffer(); long outputSize = getContext().getCounters() .findCounter(TaskCounter.OUTPUT_BYTES).getValue(); @@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { .newBuilder(); vmBuilder.setOutputSize(outputSize); VertexManagerEvent vmEvent = VertexManagerEvent.create( - getContext().getDestinationVertexName(), vmBuilder.build().toByteArray()); + getContext().getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer()); List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs() + 1); events.add(vmEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 9ad7cd3..ea02743 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -154,7 +154,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { } DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); - DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteArray()); + DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteString().asReadOnlyByteBuffer()); List<Event> events = Lists.newArrayListWithCapacity(1); events.add(dmEvent); return events; http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java index 1cec071..d1b3362 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.BitSet; import java.util.List; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -100,7 +101,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private void processDataMovementEvent(DataMovementEvent dme) throws IOException { DataMovementEventPayloadProto shufflePayload; try { - shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload()); + shufflePayload = DataMovementEventPayloadProto.parseFrom( + ByteString.copyFrom(dme.getUserPayload())); } catch (InvalidProtocolBufferException e) { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index e4b4656..4768c6c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.library.vertexmanager; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -215,8 +216,8 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - byte[] payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray(); + ByteBuffer payload = + VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer(); VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); // parallelism not change due to large data size manager = createManager(conf, mockContext, 0.1f, 0.1f); @@ -236,7 +237,7 @@ public class TestShuffleVertexManager { // parallelism changed due to small data size scheduledTasks.clear(); payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray(); + VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer(); vmEvent = VertexManagerEvent.create("Vertex", payload); manager = createManager(conf, mockContext, 0.5f, 0.5f); @@ -283,7 +284,7 @@ public class TestShuffleVertexManager { EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next(); Map<Integer, List<Integer>> targets = Maps.newHashMap(); - DataMovementEvent dmEvent = DataMovementEvent.create(1, new byte[0]); + DataMovementEvent dmEvent = DataMovementEvent.create(1, ByteBuffer.wrap(new byte[0])); // 4 source task outputs - same as original number of partitions Assert.assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(0)); // 4 destination task inputs - 2 source tasks + 2 merged partitions @@ -295,7 +296,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(1, e.getValue().size()); Assert.assertEquals(3, e.getValue().get(0).intValue()); targets.clear(); - dmEvent = DataMovementEvent.create(2, new byte[0]); + dmEvent = DataMovementEvent.create(2, ByteBuffer.wrap(new byte[0])); edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), targets); Assert.assertEquals(1, targets.size()); e = targets.entrySet().iterator().next(); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java index 56e1337..14e833d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java @@ -73,7 +73,8 @@ public class TestShuffleInputEventHandler { if (emptyPartitionByteString != null) { builder.setEmptyPartitions(emptyPartitionByteString); } - return DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray()); + return DataMovementEvent + .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer()); } @Before http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index e3b8760..d15cfff 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; @@ -311,8 +312,9 @@ public class TestUnorderedPartitionedKVWriter { CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0); assertEquals(0, cdme.getSourceIndexStart()); assertEquals(numPartitions, cdme.getCount()); - DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme - .getUserPayload()); + DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom( + ByteString.copyFrom(cdme + .getUserPayload())); assertFalse(eventProto.hasData()); BitSet emptyPartitionBits = null; if (partitionsWithData.cardinality() != numPartitions) { @@ -498,8 +500,9 @@ public class TestUnorderedPartitionedKVWriter { CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0); assertEquals(0, cdme.getSourceIndexStart()); assertEquals(numOutputs, cdme.getCount()); - DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme - .getUserPayload()); + DataMovementEventPayloadProto eventProto = + DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom( + cdme.getUserPayload())); assertFalse(eventProto.hasData()); if (skippedPartitions == null && numRecordsWritten > 0) { assertFalse(eventProto.hasEmptyPartitions()); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index f3b3503..8a7f2a6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.output; +import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -175,7 +176,8 @@ public class TestOnFileSortedOutput { ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto - .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + .parseFrom( + ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload())); assertEquals(HOST, payload.getHost()); assertEquals(PORT, payload.getPort()); @@ -203,7 +205,7 @@ public class TestOnFileSortedOutput { ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto - .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + .parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload())); assertEquals(HOST, payload.getHost()); assertEquals(PORT, payload.getPort()); @@ -220,7 +222,7 @@ public class TestOnFileSortedOutput { ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto - .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()); + .parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload())); if (sendEmptyPartitionViaEvent) { assertEquals("", payload.getHost()); assertEquals(0, payload.getPort()); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 1c6db22..9d623b5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -145,7 +146,7 @@ public class TestOnFileUnorderedKVOutput { assertEquals("Invalid source index", 0, dmEvent.getSourceIndex()); DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto - .parseFrom(dmEvent.getUserPayload()); + .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); assertFalse(shufflePayload.hasEmptyPartitions()); assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent()); http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java index 646dca8..d6d6a4c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java @@ -148,7 +148,8 @@ public class TestShuffleInputEventHandlerImpl { if (emptyPartitionByteString != null) { builder.setEmptyPartitions(emptyPartitionByteString); } - Event dme = DataMovementEvent.create(srcIndex, targetIndex, 0, builder.build().toByteArray()); + Event dme = DataMovementEvent + .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer()); return dme; } http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index 29ea7c0..465dd9c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -291,7 +291,7 @@ public class TestInput extends AbstractLogicalInput { " numCompletedInputs: " + numCompletedInputs); this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion(); this.inputValues[dmEvent.getTargetIndex()] = - ByteBuffer.wrap(dmEvent.getUserPayload()).getInt(); + dmEvent.getUserPayload().getInt(); } else if (event instanceof InputFailedEvent) { InputFailedEvent ifEvent = (InputFailedEvent) event; numCompletedInputs--; http://git-wip-us.apache.org/repos/asf/tez/blob/57bd9f7e/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java index 9d9767b..8998c3c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java @@ -77,7 +77,8 @@ public class TestOutput extends AbstractLogicalOutput { @Override public List<Event> close() throws Exception { LOG.info("Sending data movement event with value: " + output); - byte[] result = ByteBuffer.allocate(4).putInt(output).array(); + ByteBuffer result = ByteBuffer.allocate(4).putInt(output); + result.flip(); List<Event> events = Lists.newArrayListWithCapacity(getNumPhysicalOutputs()); for (int i = 0; i < getNumPhysicalOutputs(); i++) { DataMovementEvent event = DataMovementEvent.create(i, result);
