Repository: tez Updated Branches: refs/heads/master 8079919dc -> 43ca78fea
TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/43ca78fe Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/43ca78fe Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/43ca78fe Branch: refs/heads/master Commit: 43ca78fea0115e05c8d14626f470010e2f7334c3 Parents: 8079919 Author: Jonathan Eagles <[email protected]> Authored: Mon Dec 5 11:13:17 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Dec 5 11:13:17 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/api/EdgeManagerPluginOnDemand.java | 30 ++++- .../api/events/CompositeDataMovementEvent.java | 6 + .../CompositeRoutedDataMovementEvent.java | 126 ++++++++++++++++++ tez-api/src/main/proto/Events.proto | 8 ++ .../dag/app/dag/impl/BroadcastEdgeManager.java | 4 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 37 ++---- .../dag/impl/OneToOneEdgeManagerOnDemand.java | 7 +- .../app/dag/impl/ScatterGatherEdgeManager.java | 6 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 + .../tez/dag/app/TestMockDAGAppMaster.java | 9 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 2 +- .../org/apache/tez/test/EdgeManagerForTest.java | 2 +- .../org/apache/tez/common/ProtoConverters.java | 22 ++++ .../apache/tez/runtime/api/impl/EventType.java | 1 + .../apache/tez/runtime/api/impl/TezEvent.java | 14 ++ .../vertexmanager/FairShuffleEdgeManager.java | 9 +- .../vertexmanager/ShuffleVertexManager.java | 6 +- .../CartesianProductEdgeManager.java | 5 +- .../CartesianProductEdgeManagerPartitioned.java | 5 +- .../CartesianProductEdgeManagerReal.java | 6 +- ...artesianProductEdgeManagerUnpartitioned.java | 4 +- .../impl/ShuffleInputEventHandlerImpl.java | 55 ++++++-- .../ShuffleInputEventHandlerOrderedGrouped.java | 54 ++++++-- .../TestFairShuffleVertexManager.java | 85 ++++++------ ...tCartesianProductEdgeManagerPartitioned.java | 85 ++++++------ ...artesianProductEdgeManagerUnpartitioned.java | 131 ++++++++++--------- .../tez/test/TestExceptionPropagation.java | 2 +- 28 files changed, 490 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b4603cf..d9a7ca6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case. TEZ-3547. Add TaskAssignment Analyzer. TEZ-3508. TestTaskScheduler cleanup. TEZ-3536. NPE in WebUIService start when host resolution fails. http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java index 05c0c62..3d7f2ab 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java @@ -40,6 +40,34 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent; @Unstable public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin { + public static class CompositeEventRouteMetadata { + private final int count; + private final int target; + private final int source; + + public static CompositeEventRouteMetadata create(int count, int target, int source) { + return new CompositeEventRouteMetadata(count, target, source); + } + + private CompositeEventRouteMetadata(int count, int target, int source) { + this.count = count; + this.target = target; + this.source = source; + } + + public int getCount() { + return count; + } + + public int getTarget() { + return target; + } + + public int getSource() { + return source; + } + } + /** * Class to provide routing metadata for {@link Event}s to be routed between * producer and consumer tasks. The routing data enabled the system to send @@ -237,7 +265,7 @@ public abstract class EdgeManagerPluginOnDemand extends EdgeManagerPlugin { * source task. * @throws Exception */ - public abstract @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination( + public abstract @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception; /** http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java index c45d272..32089a9 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java @@ -23,6 +23,7 @@ import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; import org.apache.tez.runtime.api.Event; /** @@ -139,4 +140,9 @@ public class CompositeDataMovementEvent extends Event { }; } + @Private + public CompositeRoutedDataMovementEvent expandRouted(CompositeEventRouteMetadata routeMeta) { + return CompositeRoutedDataMovementEvent.create(routeMeta.getSource(), routeMeta.getTarget(), routeMeta.getCount(), version, userPayload); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java new file mode 100644 index 0000000..6dbed71 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeRoutedDataMovementEvent.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.events; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.tez.runtime.api.Event; + +/** + * A convenience class to specify multiple DataMovementEvents which share the same payload. + * This event is generated by Edge in Application Master and sent to downstream vertex input + * for optimization purpose. It is not to be consumed by any user code. + */ +@Public +public final class CompositeRoutedDataMovementEvent extends Event { + + /** + * Index(i) of the i-th (physical) Input or Output that generated an Event. + * For a Processor-generated event, this is ignored. + */ + private final int sourceIndex; + + /** + * Index(i) of the i-th (physical) Input or Output that is meant to receive + * this Event. For a Processor event, this is ignored. + */ + private int targetIndex; + private int count; + + /** + * User Payload for this Event + */ + private final ByteBuffer userPayload; + + /** + * Version number to indicate what attempt generated this Event + */ + private int version; + + + @Private + public static CompositeRoutedDataMovementEvent create(int sourceIndex, + int targetIndex, + int count, + int version, + ByteBuffer userPayload) { + return new CompositeRoutedDataMovementEvent(sourceIndex, targetIndex, count, version, userPayload); + } + + @Private + CompositeRoutedDataMovementEvent(int sourceIndex, + int targetIndex, + int count, + int version, + ByteBuffer userPayload) { + this.userPayload = userPayload; + this.sourceIndex = sourceIndex; + this.version = version; + this.targetIndex = targetIndex; + this.count = count; + } + + public ByteBuffer getUserPayload() { + return userPayload == null ? null : userPayload.asReadOnlyBuffer(); + } + + public int getSourceIndex() { + return sourceIndex; + } + + public int getTargetIndex() { + return targetIndex; + } + + @Private + public void setTargetIndex(int targetIndex) { + this.targetIndex = targetIndex; + } + + public int getCount() { + return count; + } + + @Private + public void setCount(int count) { + this.count = count; + } + + public int getVersion() { + return version; + } + + @Private + public void setVersion(int version) { + this.version = version; + } + + @Override + public String toString() { + return "CompositeRoutedDataMovementEvent [sourceIndex=" + sourceIndex + ", targetIndex=" + + targetIndex + ", count=" + count + ", version=" + version + "]"; + } + + @Private + public DataMovementEvent expand(int offset) { + return DataMovementEvent.create(sourceIndex + offset, targetIndex + offset, version, userPayload); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-api/src/main/proto/Events.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index 490fa3c..e018864 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -27,6 +27,14 @@ message DataMovementEventProto { optional int32 version = 4; } +message CompositeRoutedDataMovementEventProto { + optional int32 source_index = 1; + optional int32 target_index = 2; + optional int32 count = 3; + optional bytes user_payload = 4; + optional int32 version = 5; +} + message InputReadErrorEventProto { optional int32 index = 1; optional string diagnostics = 2; http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java index d14527d..ca510f7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java @@ -78,10 +78,10 @@ public class BroadcastEdgeManager extends EdgeManagerPluginOnDemand { } @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination( + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { - return commonRouteMeta[sourceTaskIndex]; + return CompositeEventRouteMetadata.create(1, sourceTaskIndex, 0); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index bb4d319..9640f06 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -38,6 +38,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.Vertex; @@ -49,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -549,36 +551,15 @@ public class Edge { switch (tezEvent.getEventType()) { case COMPOSITE_DATA_MOVEMENT_EVENT: { - CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent(); - EventRouteMetadata routeMeta; - int numEventsDone; - if (pendingRoutes != null) { - routeMeta = pendingRoutes.getRouteMeta(); - numEventsDone = pendingRoutes.getNumEventsRouted(); - } else { - routeMeta = edgeManagerOnDemand + CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent) tezEvent.getEvent(); + CompositeEventRouteMetadata routeMeta = edgeManagerOnDemand .routeCompositeDataMovementEventToDestination(srcTaskIndex, taskIndex); - numEventsDone = 0; - } + if (routeMeta != null) { - int listSize = listToAdd.size(); - int numEvents = routeMeta.getNumEvents(); - int[] sourceIndices = routeMeta.getSourceIndices(); - int[] targetIndices = routeMeta.getTargetIndices(); - while (numEventsDone < numEvents && listSize++ < listMaxSize) { - DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone], - targetIndices[numEventsDone]); - numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), - tezEvent.getEventReceivedTime()); - tezEventToSend.setDestinationInfo(destinationMetaInfo); - listToAdd.add(tezEventToSend); - } - if (numEventsDone < numEvents) { - pendingEvents.put(attemptID, new PendingEventRouteMetadata(routeMeta, tezEvent, - numEventsDone)); - return false; - } + CompositeRoutedDataMovementEvent edme = compEvent.expandRouted(routeMeta); + TezEvent tezEventToSend = new TezEvent(edme, tezEvent.getSourceInfo(), tezEvent.getEventReceivedTime()); + tezEventToSend.setDestinationInfo(destinationMetaInfo); + listToAdd.add(tezEventToSend); } } break; http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java index 84e7e66..819735a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManagerOnDemand.java @@ -41,6 +41,9 @@ public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand { final EventRouteMetadata commonRouteMeta = EventRouteMetadata.create(1, new int[]{0}, new int[]{0}); + final CompositeEventRouteMetadata compositeCommonRouteMeta = + CompositeEventRouteMetadata.create(1, 0, 0); + public OneToOneEdgeManagerOnDemand(EdgeManagerPluginContext context) { super(context); } @@ -84,11 +87,11 @@ public class OneToOneEdgeManagerOnDemand extends EdgeManagerPluginOnDemand { } @Override - public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination( + public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { if (sourceTaskIndex == destinationTaskIndex) { - return commonRouteMeta; + return compositeCommonRouteMeta; } return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java index 3b66b8f..4d373ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java @@ -114,11 +114,11 @@ public class ScatterGatherEdgeManager extends EdgeManagerPluginOnDemand { } @Override - public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination( + public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { - return EventRouteMetadata.create(1, targetIndices[sourceTaskIndex], - sourceIndices[destinationTaskIndex]); + return CompositeEventRouteMetadata.create(1, targetIndices[sourceTaskIndex][0], + sourceIndices[destinationTaskIndex][0]); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 38ef89f..3f6debf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -3769,6 +3769,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl EventType lastEventType = lastEvent.getEventType(); // if the following changes then critical path logic/recording may need revision if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || + lastEventType == EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT || lastEventType == EventType.DATA_MOVEMENT_EVENT || lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) { task.getAttempt(attemptID).setLastEventSent(lastEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 74ac51e..6268912 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -93,6 +93,7 @@ import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.IOStatistics; @@ -275,11 +276,11 @@ public class TestMockDAGAppMaster { tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents(); Assert.assertEquals(2, tEvents.size()); // 2 from vA Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex()); + Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex()); Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName()); - Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex()); - targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex(); - targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex(); + Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex()); + targetIndex1 = ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex(); + targetIndex2 = ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex(); // order of vA task completion can change order of events Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2, (targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0)); http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 0b96db7..966b464 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -2197,7 +2197,7 @@ public class TestDAGImpl { } @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination( + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) { http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java index 9cb914f..47d133b 100644 --- a/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java +++ b/tez-dag/src/test/java/org/apache/tez/test/EdgeManagerForTest.java @@ -86,7 +86,7 @@ public class EdgeManagerForTest extends EdgeManagerPluginOnDemand { } @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination( + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index f765f42..ea90158 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.EventProtos; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; @@ -50,6 +51,27 @@ public class ProtoConverters { proto.getUserPayload() != null ? proto.getUserPayload().asReadOnlyByteBuffer() : null); } + public static EventProtos.CompositeRoutedDataMovementEventProto convertCompositeRoutedDataMovementEventToProto( + CompositeRoutedDataMovementEvent event) { + EventProtos.CompositeRoutedDataMovementEventProto.Builder builder = + EventProtos.CompositeRoutedDataMovementEventProto.newBuilder(); + builder.setSourceIndex(event.getSourceIndex()). + setTargetIndex(event.getTargetIndex()).setVersion(event.getVersion()).setCount(event.getCount()); + if (event.getUserPayload() != null) { + builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); + } + return builder.build(); + } + + public static CompositeRoutedDataMovementEvent convertCompositeRoutedDataMovementEventFromProto( + EventProtos.CompositeRoutedDataMovementEventProto proto) { + return CompositeRoutedDataMovementEvent.create(proto.getSourceIndex(), + proto.getTargetIndex(), + proto.getCount(), + proto.getVersion(), + proto.getUserPayload() != null ? + proto.getUserPayload().asReadOnlyByteBuffer() : null); + } public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto( CompositeDataMovementEvent event) { http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java index cb247c9..e573526 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java @@ -30,4 +30,5 @@ public enum EventType { ROOT_INPUT_DATA_INFORMATION_EVENT, COMPOSITE_DATA_MOVEMENT_EVENT, ROOT_INPUT_INITIALIZER_EVENT, + COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT, } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index e76bdbb..1a90ada 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -31,9 +31,11 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.EventProtos; import org.apache.tez.runtime.api.events.EventProtos.CompositeEventProto; import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto; +import org.apache.tez.runtime.api.events.EventProtos.CompositeRoutedDataMovementEventProto; import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto; import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto; import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto; @@ -82,6 +84,8 @@ public class TezEvent implements Writable { eventType = EventType.DATA_MOVEMENT_EVENT; } else if (event instanceof CompositeDataMovementEvent) { eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT; + } else if (event instanceof CompositeRoutedDataMovementEvent) { + eventType = EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT; } else if (event instanceof VertexManagerEvent) { eventType = EventType.VERTEX_MANAGER_EVENT; } else if (event instanceof InputReadErrorEvent) { @@ -158,6 +162,11 @@ public class TezEvent implements Writable { ProtoConverters.convertDataMovementEventToProto( (DataMovementEvent) event); break; + case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT: + message = + ProtoConverters.convertCompositeRoutedDataMovementEventToProto( + (CompositeRoutedDataMovementEvent) event); + break; case COMPOSITE_DATA_MOVEMENT_EVENT: message = ProtoConverters.convertCompositeDataMovementEventToProto( @@ -256,6 +265,11 @@ public class TezEvent implements Writable { DataMovementEventProto.parseFrom(input); event = ProtoConverters.convertDataMovementEventFromProto(dmProto); break; + case COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT: + CompositeRoutedDataMovementEventProto edmProto = + CompositeRoutedDataMovementEventProto.parseFrom(eventBytes); + event = ProtoConverters.convertCompositeRoutedDataMovementEventFromProto(edmProto); + break; case COMPOSITE_DATA_MOVEMENT_EVENT: CompositeEventProto cProto = CompositeEventProto.parseFrom(input); event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto); http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java index ff1c032..2336e15 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/FairShuffleEdgeManager.java @@ -121,17 +121,16 @@ public class FairShuffleEdgeManager extends EdgeManagerPluginOnDemand { } @Override - public @Nullable EventRouteMetadata + public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int sourceTaskIndex, int destinationTaskIndex) { DestinationTaskInputsProperty property = mapping.get(destinationTaskIndex); int firstPhysicalInputIndex = property.getFirstPhysicalInputIndex(sourceTaskIndex); if (firstPhysicalInputIndex >= 0) { - return EventRouteMetadata.create(property.getNumOfPartitions(), - getRange(firstPhysicalInputIndex, property.getNumOfPartitions()), - getRange(property.getFirstPartitionId(), - property.getNumOfPartitions())); + return CompositeEventRouteMetadata.create(property.getNumOfPartitions(), + firstPhysicalInputIndex, + property.getFirstPartitionId()); } else { return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 55a6ced..ed27f04 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -299,7 +299,7 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase { } @Override - public @Nullable EventRouteMetadata routeCompositeDataMovementEventToDestination( + public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { int[] targetIndicesToSend; @@ -316,8 +316,8 @@ public class ShuffleVertexManager extends ShuffleVertexManagerBase { partitionRange = basePartitionRange; } - return EventRouteMetadata.create(partitionRange, targetIndicesToSend, - sourceIndices[destinationTaskIndex]); + return CompositeEventRouteMetadata.create(partitionRange, targetIndicesToSend[0], + sourceIndices[destinationTaskIndex][0]); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java index 96cce94..1dbe6bf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; -import org.apache.tez.dag.api.TezException; import javax.annotation.Nullable; @@ -75,8 +74,8 @@ public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand { @Nullable @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, - int destTaskId) + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) throws Exception { return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId); } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java index 644d5af..068da81 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java @@ -21,6 +21,7 @@ import com.google.common.primitives.Ints; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; import org.apache.tez.dag.api.UserPayload; import javax.annotation.Nullable; @@ -69,12 +70,12 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager @Nullable @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, int destTaskId) throws Exception { int partition = CartesianProductCombination.fromTaskId(numPartitions, getIdealTaskId(destTaskId)).getCombination().get(positionId); - return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition}); + return CompositeEventRouteMetadata.create(1, srcTaskId, partition); } @Nullable http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java index 705db05..3e1407c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java @@ -18,7 +18,9 @@ package org.apache.tez.runtime.library.cartesianproduct; import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; /** * base class of cartesian product edge manager implementation @@ -46,8 +48,8 @@ abstract class CartesianProductEdgeManagerReal { int destTaskId) throws Exception; - public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, - int destTaskId) + public abstract CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) throws Exception; public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java index cea4142..9e46e95 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java @@ -63,12 +63,12 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag @Nullable @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, int destTaskId) throws Exception { int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId) .getCombination().get(positionId); - return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null; + return index == srcTaskId ? CompositeEventRouteMetadata.create(1, 0, 0) : null; } @Nullable http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 7d9eacf..c1893fc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -27,6 +27,7 @@ import java.util.zip.Inflater; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.io.compress.CompressionCodec; @@ -93,7 +94,45 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private void handleEvent(Event event) throws IOException { if (event instanceof DataMovementEvent) { numDmeEvents.incrementAndGet(); - processDataMovementEvent((DataMovementEvent)event); + DataMovementEvent dmEvent = (DataMovementEvent)event; + DataMovementEventPayloadProto shufflePayload; + try { + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); + } catch (InvalidProtocolBufferException e) { + throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); + } + BitSet emptyPartitionsBitSet = null; + if (shufflePayload.hasEmptyPartitions()) { + try { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); + emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + } catch (IOException e) { + throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); + } + } + processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet); + shuffleManager.updateEventReceivedTime(); + } else if (event instanceof CompositeRoutedDataMovementEvent) { + CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event; + DataMovementEventPayloadProto shufflePayload; + try { + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload())); + } catch (InvalidProtocolBufferException e) { + throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); + } + BitSet emptyPartitionsBitSet = null; + if (shufflePayload.hasEmptyPartitions()) { + try { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); + emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + } catch (IOException e) { + throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); + } + } + for (int offset = 0; offset < edme.getCount(); offset++) { + numDmeEvents.incrementAndGet(); + processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet); + } shuffleManager.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { numObsoletionEvents.incrementAndGet(); @@ -117,14 +156,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { + (updateOnClose == true ? ", updateOnClose" : "")); } - private void processDataMovementEvent(DataMovementEvent dme) throws IOException { - DataMovementEventPayloadProto shufflePayload; - try { - shufflePayload = DataMovementEventPayloadProto.parseFrom( - ByteString.copyFrom(dme.getUserPayload())); - } catch (InvalidProtocolBufferException e) { - throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); - } + private void processDataMovementEvent(DataMovementEvent dme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException { int srcIndex = dme.getSourceIndex(); if (LOG.isDebugEnabled()) { LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() @@ -133,10 +165,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { } if (shufflePayload.hasEmptyPartitions()) { - byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload - .getEmptyPartitions(), inflater); - BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); - if (emptyPartionsBitSet.get(srcIndex)) { + if (emptyPartitionsBitSet.get(srcIndex)) { InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme, shufflePayload, false); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index f6f6da1..f39affe 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Inflater; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,45 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private void handleEvent(Event event) throws IOException { if (event instanceof DataMovementEvent) { numDmeEvents.incrementAndGet(); - processDataMovementEvent((DataMovementEvent) event); + DataMovementEvent dmEvent = (DataMovementEvent)event; + DataMovementEventPayloadProto shufflePayload; + try { + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); + } catch (InvalidProtocolBufferException e) { + throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); + } + BitSet emptyPartitionsBitSet = null; + if (shufflePayload.hasEmptyPartitions()) { + try { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); + emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + } catch (IOException e) { + throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); + } + } + processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet); + scheduler.updateEventReceivedTime(); + } else if (event instanceof CompositeRoutedDataMovementEvent) { + CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event; + DataMovementEventPayloadProto shufflePayload; + try { + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload())); + } catch (InvalidProtocolBufferException e) { + throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); + } + BitSet emptyPartitionsBitSet = null; + if (shufflePayload.hasEmptyPartitions()) { + try { + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); + emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); + } catch (IOException e) { + throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); + } + } + for (int offset = 0; offset < edme.getCount(); offset++) { + numDmeEvents.incrementAndGet(); + processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet); + } scheduler.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { numObsoletionEvents.incrementAndGet(); @@ -94,13 +133,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl } } - private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException { - DataMovementEventPayloadProto shufflePayload; - try { - shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload())); - } catch (InvalidProtocolBufferException e) { - throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); - } + private void processDataMovementEvent(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException { int partitionId = dmEvent.getSourceIndex(); InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); @@ -112,8 +145,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl if (shufflePayload.hasEmptyPartitions()) { try { - byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); - BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartitionsBitSet.get(partitionId)) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -125,8 +156,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl return; } } catch (IOException e) { - throw new TezUncheckedException("Unable to set " + - "the empty partition to succeeded", e); + throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java index 9c94c14..61ca785 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.java @@ -122,22 +122,22 @@ public class TestFairShuffleVertexManager // The first destination task fetches two partitions from all source tasks. // 6 == 3 source tasks * 2 merged partitions Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0)); - EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata; for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) { for (int j = 0; j < 2; j++) { - routeMetadata = (j == 0) ? - edgeManager.routeCompositeDataMovementEventToDestination( - sourceTaskIndex, 0) : - edgeManager.routeInputSourceTaskFailedEventToDestination( - sourceTaskIndex, 0); - Assert.assertEquals(2, routeMetadata.getNumEvents()); if (j == 0) { - Assert.assertArrayEquals(new int[]{0, 1}, - routeMetadata.getSourceIndices()); + EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = + edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0); + Assert.assertEquals(2, routeMetadata.getCount()); + Assert.assertEquals(0, routeMetadata.getSource()); + Assert.assertEquals(sourceTaskIndex*2, routeMetadata.getTarget()); + } else { + EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = + edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0); + Assert.assertEquals(2, routeMetadata.getNumEvents()); + Assert.assertArrayEquals( + new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, + routeMetadata.getTargetIndices()); } - Assert.assertArrayEquals( - new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, - routeMetadata.getTargetIndices()); } } } @@ -156,22 +156,22 @@ public class TestFairShuffleVertexManager // The first destination task fetches two partitions from all source tasks. // 6 == 3 source tasks * 2 merged partitions Assert.assertEquals(6, edgeManager.getNumDestinationTaskPhysicalInputs(0)); - EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata; for (int sourceTaskIndex = 0; sourceTaskIndex < 3; sourceTaskIndex++) { for (int j = 0; j < 2; j++) { - routeMetadata = (j == 0) ? - edgeManager.routeCompositeDataMovementEventToDestination( - sourceTaskIndex, 0) : - edgeManager.routeInputSourceTaskFailedEventToDestination( - sourceTaskIndex, 0); - Assert.assertEquals(2, routeMetadata.getNumEvents()); if (j == 0) { - Assert.assertArrayEquals(new int[]{0, 1}, - routeMetadata.getSourceIndices()); + EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = + edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0); + Assert.assertEquals(2, routeMetadata.getCount()); + Assert.assertEquals(0, routeMetadata.getSource()); + Assert.assertEquals(sourceTaskIndex*2, routeMetadata.getTarget()); + } else { + EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = + edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0); + Assert.assertEquals(2, routeMetadata.getNumEvents()); + Assert.assertArrayEquals( + new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, + routeMetadata.getTargetIndices()); } - Assert.assertArrayEquals( - new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, - routeMetadata.getTargetIndices()); } } @@ -179,16 +179,18 @@ public class TestFairShuffleVertexManager // task. Assert.assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); for (int j = 0; j < 2; j++) { - routeMetadata = (j == 0) ? - edgeManager.routeCompositeDataMovementEventToDestination( - 0, 1) : - edgeManager.routeInputSourceTaskFailedEventToDestination( - 0, 1); - Assert.assertEquals(1, routeMetadata.getNumEvents()); if (j == 0) { - Assert.assertEquals(2, routeMetadata.getSourceIndices()[0]); + EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = + edgeManager.routeCompositeDataMovementEventToDestination(0, 1); + Assert.assertEquals(1, routeMetadata.getCount()); + Assert.assertEquals(2, routeMetadata.getSource()); + Assert.assertEquals(0, routeMetadata.getTarget()); + } else { + EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = + edgeManager.routeInputSourceTaskFailedEventToDestination(0, 1); + Assert.assertEquals(1, routeMetadata.getNumEvents()); + Assert.assertEquals(0, routeMetadata.getTargetIndices()[0]); } - Assert.assertEquals(0, routeMetadata.getTargetIndices()[0]); } // The 3rd destination task fetches one partition from the 2nd and 3rd @@ -196,17 +198,18 @@ public class TestFairShuffleVertexManager Assert.assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(2)); for (int sourceTaskIndex = 1; sourceTaskIndex < 3; sourceTaskIndex++) { for (int j = 0; j < 2; j++) { - routeMetadata = (j == 0) ? - edgeManager.routeCompositeDataMovementEventToDestination( - sourceTaskIndex, 2) : - edgeManager.routeInputSourceTaskFailedEventToDestination( - sourceTaskIndex, 2); - Assert.assertEquals(1, routeMetadata.getNumEvents()); if (j == 0) { - Assert.assertEquals(2, routeMetadata.getSourceIndices()[0]); + EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata = + edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 2); + Assert.assertEquals(1, routeMetadata.getCount()); + Assert.assertEquals(2, routeMetadata.getSource()); + Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTarget()); + } else { + EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = + edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 2); + Assert.assertEquals(1, routeMetadata.getNumEvents()); + Assert.assertEquals(sourceTaskIndex - 1, routeMetadata.getTargetIndices()[0]); } - Assert.assertEquals(sourceTaskIndex - 1, - routeMetadata.getTargetIndices()[0]); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java index 8710c55..09f3b52 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; import org.apache.tez.dag.api.UserPayload; import org.junit.Before; import org.junit.Test; @@ -61,13 +62,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeDataMovementEventToDestination(1,0,1); + EventRouteMetadata routingData = edgeManager.routeDataMovementEventToDestination(1,0,1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -92,13 +93,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{1}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(1, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -156,13 +157,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{2}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(2, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -179,13 +180,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -218,13 +219,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -241,13 +242,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); @@ -264,13 +265,13 @@ public class TestCartesianProductEdgeManagerPartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(4); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{1}, routingData.getSourceIndices()); - assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(1, compositeRoutingData.getSource()); + assertEquals(1, compositeRoutingData.getTarget()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNotNull(routingData); assertEquals(1, routingData.getNumEvents()); assertArrayEquals(new int[]{1}, routingData.getTargetIndices()); http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java index 4c69482..ec97335 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; import org.junit.Before; import org.junit.Test; @@ -56,23 +57,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(routingData); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNull(compositeRoutingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNull(routingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); @@ -86,23 +87,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2); - assertNull(routingData); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 2); + assertNull(compositeRoutingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2); assertNull(routingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0)); @@ -130,23 +131,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(routingData); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNull(compositeRoutingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNull(routingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); @@ -160,23 +161,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(routingData); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + assertNull(compositeRoutingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); assertNull(routingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); @@ -190,23 +191,23 @@ public class TestCartesianProductEdgeManagerUnpartitioned { when(mockContext.getSourceVertexNumTasks()).thenReturn(4); edgeManager.initialize(config); - EventRouteMetadata routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0); - assertNull(routingData); + CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0); + assertNull(compositeRoutingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0); + EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0); assertNull(routingData); - routingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - assertArrayEquals(new int[]{0}, routingData.getSourceIndices()); + compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13); + assertNotNull(compositeRoutingData); + assertEquals(1, compositeRoutingData.getCount()); + assertEquals(0, compositeRoutingData.getTarget()); + assertEquals(0, compositeRoutingData.getSource()); assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0)); http://git-wip-us.apache.org/repos/asf/tez/blob/43ca78fe/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index 7d88fdf..438a5aa 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -845,7 +845,7 @@ public class TestExceptionPropagation { } @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination( + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
