Repository: tez Updated Branches: refs/heads/master a4247a7cd -> 8bfbdfefa
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. (Peter Slawski via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8bfbdfef Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8bfbdfef Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8bfbdfef Branch: refs/heads/master Commit: 8bfbdfefa4bf9a87acd90eb4582fe6a621fb2389 Parents: a4247a7 Author: Hitesh Shah <[email protected]> Authored: Tue Jul 19 14:29:56 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Jul 19 14:29:56 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 167 +++++++++++++++++-- .../test/GraceShuffleVertexManagerForTest.java | 159 ++++++++++++++++++ .../vertexmanager/ShuffleVertexManager.java | 8 +- 4 files changed, 320 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8bfbdfef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a9ff8cc..6e29110 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion. TEZ-3303. Have ShuffleVertexManager consume more precise partition stats. @@ -82,6 +83,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion. http://git-wip-us.apache.org/repos/asf/tez/blob/8bfbdfef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index d165272..06ae442 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -40,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -61,6 +62,7 @@ import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.apache.tez.test.GraceShuffleVertexManagerForTest; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2256,6 +2258,102 @@ public class TestVertexImpl { return dag; } + private DAGPlan createDAGPlanForGraceParallelism() throws IOException { + LOG.info("Setting up grace parallelism dag plan"); + return DAGPlan.newBuilder() + .setName("GraceParallelismDAG") + .addVertex( + VertexPlan.newBuilder() + .setName("A") + .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class")) + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("") + .build() + ) + .addOutEdgeId("A_B") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("B") + .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class")) + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("") + .build() + ) + .addInEdgeId("A_B") + .addOutEdgeId("B_C") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("C") + .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class")) + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(-1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("") + .build() + ) + .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() + .setClassName(GraceShuffleVertexManagerForTest.class.getName()) + .setTezUserPayload( + DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload( + GraceShuffleVertexManagerForTest.newConfBuilder() + .setGrandparentVertex("A") + .setDesiredParallelism(1) + .toByteString() + ) + .build() + ) + ) + .addInEdgeId("B_C") + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_B")) + .setInputVertexName("A") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")) + .setOutputVertexName("B") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("A_B") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C")) + .setInputVertexName("B") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")) + .setOutputVertexName("C") + .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER) + .setId("B_C") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .build(); + } + private void setupVertices() { int vCnt = dagPlan.getVertexCount(); LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt); @@ -2538,6 +2636,17 @@ public class TestVertexImpl { } } + private void completeAllTasksSuccessfully(Vertex v) { + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Set<TezTaskID> tasks = v.getTasks().keySet(); + Assert.assertFalse(tasks.isEmpty()); + for (TezTaskID task : tasks) { + dispatcher.getEventHandler() + .handle(new VertexEventTaskCompleted(task, TaskState.SUCCEEDED)); + } + dispatcher.await(); + } + @Test(timeout = 5000) public void testVertexInit() throws AMUserCodeException { initAllVertices(VertexState.INITED); @@ -5883,16 +5992,9 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITED, vC.getState()); //Send VertexManagerEvent - long[] sizes = new long[]{(100 * 1000l * 1000l)}; - Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B"); - - TezTaskAttemptID taId = TezTaskAttemptID.getInstance( - TezTaskID.getInstance(vC.getVertexId(), 1), 1); - EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "B", "C", taId); - TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo); - dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(), - Lists.newArrayList(tezEvent))); - dispatcher.await(); + long[] sizes = new long[]{(100_000_000L)}; + Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vB); + sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.INPUT, vC, vB); Assert.assertEquals(VertexState.INITED, vC.getState()); //vB start @@ -5902,7 +6004,48 @@ public class TestVertexImpl { } - VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) + @Test(timeout = 5000) + public void testVertexGraceParallelism() throws IOException, TezException { + setupPreDagCreation(); + dagPlan = createDAGPlanForGraceParallelism(); + setupPostDagCreation(); + + VertexImpl vA = vertices.get("A"); + VertexImpl vB = vertices.get("B"); + VertexImpl vC = vertices.get("C"); + + initVertex(vA); + Assert.assertEquals(VertexState.INITED, vA.getState()); + Assert.assertEquals(VertexState.INITED, vB.getState()); + Assert.assertEquals(VertexState.INITIALIZING, vC.getState()); + + long[] sizes = new long[]{(100_000_000L)}; + Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vC); + sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.OUTPUT, vB, vC); + Assert.assertEquals(VertexState.INITIALIZING, vC.getState()); + + startVertex(vA); + completeAllTasksSuccessfully(vA); + Assert.assertEquals(VertexState.SUCCEEDED, vA.getState()); + Assert.assertEquals(VertexState.RUNNING, vC.getState()); + } + + private void sendTaskGeneratedEvent(Event event, EventProducerConsumerType generator, + Vertex taskVertex, Vertex edgeVertex) { + TezTaskAttemptID taId = TezTaskAttemptID.getInstance( + TezTaskID.getInstance(taskVertex.getVertexId(), 1), 1); + EventMetaData sourceInfo = new EventMetaData(generator, + taskVertex.getName(), edgeVertex.getName(), taId); + sendVertexEventRouteEvent(taskVertex, new TezEvent(event, sourceInfo)); + } + + private void sendVertexEventRouteEvent(Vertex sourceVertex, TezEvent... tezEvents) { + dispatcher.getEventHandler().handle(new VertexEventRouteEvent(sourceVertex.getVertexId(), + Arrays.asList(tezEvents))); + dispatcher.await(); + } + + private VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, Vertex vertex) throws IOException { ByteBuffer payload = null; if (sizes != null) { @@ -5924,7 +6067,7 @@ public class TestVertexImpl { .build().toByteString() .asReadOnlyByteBuffer(); } - VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload); + VertexManagerEvent vmEvent = VertexManagerEvent.create(vertex.getName(), payload); return vmEvent; } http://git-wip-us.apache.org/repos/asf/tez/blob/8bfbdfef/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java new file mode 100644 index 0000000..40a6bd3 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.test; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.EnumSet; + +/** + * A shuffle vertex manager that will set the vertex's parallelism upon + * completion of its single grandparent simulating a very simplified + * version of PigGraceShuffleVertexManager for testing purposes. + * + * This manager plugin should only be used for vertices that have a single + * grandparent. + */ +public final class GraceShuffleVertexManagerForTest extends ShuffleVertexManager { + + private static final Logger logger = LoggerFactory.getLogger(GraceShuffleVertexManagerForTest.class); + + private GraceConf graceConf; + private boolean isParallelismSet = false; + + public GraceShuffleVertexManagerForTest(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() { + try { + Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); + graceConf = GraceConf.fromConfiguration(conf); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + getContext().registerForVertexStateUpdates(graceConf.grandparentVertex, + EnumSet.of(VertexState.SUCCEEDED)); + logger.info("Watching {}", graceConf.grandparentVertex); + super.initialize(); + } + + @Override + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + logger.info("Received onVertexStateUpdated"); + + String vertexName = stateUpdate.getVertexName(); + VertexState vertexState = stateUpdate.getVertexState(); + + Preconditions.checkState(graceConf != null, + "Received state notification {} for vertex {} in vertex {} before manager was initialized", + vertexState, vertexName, getContext().getVertexName()); + + if (!shouldSetParallelism(stateUpdate)) { + return; + } + getContext().reconfigureVertex(graceConf.desiredParallelism, null, null); + isParallelismSet = true; + + logger.info("Initialize parallelism for {} to {}", + getContext().getVertexName(), graceConf.desiredParallelism); + } + + private boolean shouldSetParallelism(VertexStateUpdate update) { + return !isParallelismSet && + update.getVertexState().equals(VertexState.SUCCEEDED) && + update.getVertexName().equals(graceConf.grandparentVertex); + } + + private static final class GraceConf { + + static final String TEST_GRACE_GRANDPARENT_VERTEX = "test.grace.grandparent-vertex"; + static final String TEST_GRACE_DESIRED_PARALLELISM = "test.grace.desired-parallelism"; + + final String grandparentVertex; + final int desiredParallelism; + + GraceConf(GraceConfBuilder builder) { + grandparentVertex = builder.grandparentVertex; + desiredParallelism = builder.desiredParallelism; + } + + static GraceConf fromConfiguration(Configuration conf) { + return newConfBuilder() + .setGrandparentVertex(conf.get(TEST_GRACE_GRANDPARENT_VERTEX)) + .setDesiredParallelism(conf.getInt(TEST_GRACE_DESIRED_PARALLELISM, -1)) + .build(); + } + + Configuration toConfiguration() { + Configuration conf = new Configuration(); + conf.setStrings(TEST_GRACE_GRANDPARENT_VERTEX, grandparentVertex); + conf.setInt(TEST_GRACE_DESIRED_PARALLELISM, desiredParallelism); + return conf; + } + } + + public static GraceConfBuilder newConfBuilder() { + return new GraceConfBuilder(); + } + + public static final class GraceConfBuilder { + + private String grandparentVertex; + private int desiredParallelism; + + private GraceConfBuilder() { + } + + public GraceConfBuilder setGrandparentVertex(String grandparentVertex) { + this.grandparentVertex = grandparentVertex; + return this; + } + + public GraceConfBuilder setDesiredParallelism(int desiredParallelism) { + this.desiredParallelism = desiredParallelism; + return this; + } + + public ByteString toByteString() throws IOException { + return TezUtils.createByteStringFromConf(build().toConfiguration()); + } + + private GraceConf build() { + Preconditions.checkNotNull(grandparentVertex, + "Grandparent vertex is required"); + Preconditions.checkArgument(desiredParallelism > 0, + "Desired parallelism must be greater than 0"); + return new GraceConf(this); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8bfbdfef/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index c8a1f30..c5278dd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -508,14 +508,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } pendingStateUpdates.clear(); + // track the tasks in this vertex + updatePendingTasks(); + for (VertexManagerEvent vmEvent : pendingVMEvents) { handleVertexManagerEvent(vmEvent); } pendingVMEvents.clear(); - - // track the tasks in this vertex - updatePendingTasks(); - + LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() + " with " + totalNumBipartiteSourceTasks + " source tasks and " + totalTasksToSchedule + " pending tasks");
