Repository: tez
Updated Branches:
refs/heads/branch-0.8 62dd11ed0 -> 71ec8564f
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
(Peter Slawski via hitesh)
(cherry picked from commit 8bfbdfefa4bf9a87acd90eb4582fe6a621fb2389)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/71ec8564
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/71ec8564
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/71ec8564
Branch: refs/heads/branch-0.8
Commit: 71ec8564f72de5277f11592d9dc3626ac95166fc
Parents: 62dd11e
Author: Hitesh Shah <[email protected]>
Authored: Tue Jul 19 14:29:56 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Tue Jul 19 14:34:20 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TestVertexImpl.java | 167 +++++++++++++++++--
.../test/GraceShuffleVertexManagerForTest.java | 159 ++++++++++++++++++
.../vertexmanager/ShuffleVertexManager.java | 8 +-
4 files changed, 319 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8684efa..1419aa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed
before initialization.
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for
large dag plans.
TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid
confusion.
http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d165272..06ae442 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,6 +62,7 @@ import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.apache.tez.test.GraceShuffleVertexManagerForTest;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2256,6 +2258,102 @@ public class TestVertexImpl {
return dag;
}
+ private DAGPlan createDAGPlanForGraceParallelism() throws IOException {
+ LOG.info("Setting up grace parallelism dag plan");
+ return DAGPlan.newBuilder()
+ .setName("GraceParallelismDAG")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("A")
+
.setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("A.class"))
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("")
+ .build()
+ )
+ .addOutEdgeId("A_B")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("B")
+
.setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("B.class"))
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("")
+ .build()
+ )
+ .addInEdgeId("A_B")
+ .addOutEdgeId("B_C")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("C")
+
.setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("C.class"))
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("")
+ .build()
+ )
+ .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+
.setClassName(GraceShuffleVertexManagerForTest.class.getName())
+ .setTezUserPayload(
+ DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(
+
GraceShuffleVertexManagerForTest.newConfBuilder()
+ .setGrandparentVertex("A")
+ .setDesiredParallelism(1)
+ .toByteString()
+ )
+ .build()
+ )
+ )
+ .addInEdgeId("B_C")
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("A_B"))
+ .setInputVertexName("A")
+
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("A_B.class"))
+ .setOutputVertexName("B")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("A_B")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("B_C"))
+ .setInputVertexName("B")
+
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("B_C.class"))
+ .setOutputVertexName("C")
+ .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+ .setId("B_C")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ }
+
private void setupVertices() {
int vCnt = dagPlan.getVertexCount();
LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
@@ -2538,6 +2636,17 @@ public class TestVertexImpl {
}
}
+ private void completeAllTasksSuccessfully(Vertex v) {
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Set<TezTaskID> tasks = v.getTasks().keySet();
+ Assert.assertFalse(tasks.isEmpty());
+ for (TezTaskID task : tasks) {
+ dispatcher.getEventHandler()
+ .handle(new VertexEventTaskCompleted(task, TaskState.SUCCEEDED));
+ }
+ dispatcher.await();
+ }
+
@Test(timeout = 5000)
public void testVertexInit() throws AMUserCodeException {
initAllVertices(VertexState.INITED);
@@ -5883,16 +5992,9 @@ public class TestVertexImpl {
Assert.assertEquals(VertexState.INITED, vC.getState());
//Send VertexManagerEvent
- long[] sizes = new long[]{(100 * 1000l * 1000l)};
- Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B");
-
- TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
- TezTaskID.getInstance(vC.getVertexId(), 1), 1);
- EventMetaData sourceInfo = new
EventMetaData(EventProducerConsumerType.INPUT, "B", "C", taId);
- TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
- dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(vC.getVertexId(),
- Lists.newArrayList(tezEvent)));
- dispatcher.await();
+ long[] sizes = new long[]{(100_000_000L)};
+ Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vB);
+ sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.INPUT, vC, vB);
Assert.assertEquals(VertexState.INITED, vC.getState());
//vB start
@@ -5902,7 +6004,48 @@ public class TestVertexImpl {
}
- VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize,
String vertexName)
+ @Test(timeout = 5000)
+ public void testVertexGraceParallelism() throws IOException, TezException {
+ setupPreDagCreation();
+ dagPlan = createDAGPlanForGraceParallelism();
+ setupPostDagCreation();
+
+ VertexImpl vA = vertices.get("A");
+ VertexImpl vB = vertices.get("B");
+ VertexImpl vC = vertices.get("C");
+
+ initVertex(vA);
+ Assert.assertEquals(VertexState.INITED, vA.getState());
+ Assert.assertEquals(VertexState.INITED, vB.getState());
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ long[] sizes = new long[]{(100_000_000L)};
+ Event vmEvent = getVertexManagerEvent(sizes, 1_060_000_000L, vC);
+ sendTaskGeneratedEvent(vmEvent, EventProducerConsumerType.OUTPUT, vB, vC);
+ Assert.assertEquals(VertexState.INITIALIZING, vC.getState());
+
+ startVertex(vA);
+ completeAllTasksSuccessfully(vA);
+ Assert.assertEquals(VertexState.SUCCEEDED, vA.getState());
+ Assert.assertEquals(VertexState.RUNNING, vC.getState());
+ }
+
+ private void sendTaskGeneratedEvent(Event event, EventProducerConsumerType
generator,
+ Vertex taskVertex, Vertex edgeVertex) {
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(taskVertex.getVertexId(), 1), 1);
+ EventMetaData sourceInfo = new EventMetaData(generator,
+ taskVertex.getName(), edgeVertex.getName(), taId);
+ sendVertexEventRouteEvent(taskVertex, new TezEvent(event, sourceInfo));
+ }
+
+ private void sendVertexEventRouteEvent(Vertex sourceVertex, TezEvent...
tezEvents) {
+ dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(sourceVertex.getVertexId(),
+ Arrays.asList(tezEvents)));
+ dispatcher.await();
+ }
+
+ private VertexManagerEvent getVertexManagerEvent(long[] sizes, long
totalSize, Vertex vertex)
throws IOException {
ByteBuffer payload = null;
if (sizes != null) {
@@ -5924,7 +6067,7 @@ public class TestVertexImpl {
.build().toByteString()
.asReadOnlyByteBuffer();
}
- VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName,
payload);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(vertex.getName(),
payload);
return vmEvent;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
new file mode 100644
index 0000000..40a6bd3
--- /dev/null
+++
b/tez-dag/src/test/java/org/apache/tez/test/GraceShuffleVertexManagerForTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+/**
+ * A shuffle vertex manager that will set the vertex's parallelism upon
+ * completion of its single grandparent simulating a very simplified
+ * version of PigGraceShuffleVertexManager for testing purposes.
+ *
+ * This manager plugin should only be used for vertices that have a single
+ * grandparent.
+ */
+public final class GraceShuffleVertexManagerForTest extends
ShuffleVertexManager {
+
+ private static final Logger logger =
LoggerFactory.getLogger(GraceShuffleVertexManagerForTest.class);
+
+ private GraceConf graceConf;
+ private boolean isParallelismSet = false;
+
+ public GraceShuffleVertexManagerForTest(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ try {
+ Configuration conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ graceConf = GraceConf.fromConfiguration(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ getContext().registerForVertexStateUpdates(graceConf.grandparentVertex,
+ EnumSet.of(VertexState.SUCCEEDED));
+ logger.info("Watching {}", graceConf.grandparentVertex);
+ super.initialize();
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
{
+ logger.info("Received onVertexStateUpdated");
+
+ String vertexName = stateUpdate.getVertexName();
+ VertexState vertexState = stateUpdate.getVertexState();
+
+ Preconditions.checkState(graceConf != null,
+ "Received state notification {} for vertex {} in vertex {} before
manager was initialized",
+ vertexState, vertexName, getContext().getVertexName());
+
+ if (!shouldSetParallelism(stateUpdate)) {
+ return;
+ }
+ getContext().reconfigureVertex(graceConf.desiredParallelism, null, null);
+ isParallelismSet = true;
+
+ logger.info("Initialize parallelism for {} to {}",
+ getContext().getVertexName(), graceConf.desiredParallelism);
+ }
+
+ private boolean shouldSetParallelism(VertexStateUpdate update) {
+ return !isParallelismSet &&
+ update.getVertexState().equals(VertexState.SUCCEEDED) &&
+ update.getVertexName().equals(graceConf.grandparentVertex);
+ }
+
+ private static final class GraceConf {
+
+ static final String TEST_GRACE_GRANDPARENT_VERTEX =
"test.grace.grandparent-vertex";
+ static final String TEST_GRACE_DESIRED_PARALLELISM =
"test.grace.desired-parallelism";
+
+ final String grandparentVertex;
+ final int desiredParallelism;
+
+ GraceConf(GraceConfBuilder builder) {
+ grandparentVertex = builder.grandparentVertex;
+ desiredParallelism = builder.desiredParallelism;
+ }
+
+ static GraceConf fromConfiguration(Configuration conf) {
+ return newConfBuilder()
+ .setGrandparentVertex(conf.get(TEST_GRACE_GRANDPARENT_VERTEX))
+ .setDesiredParallelism(conf.getInt(TEST_GRACE_DESIRED_PARALLELISM,
-1))
+ .build();
+ }
+
+ Configuration toConfiguration() {
+ Configuration conf = new Configuration();
+ conf.setStrings(TEST_GRACE_GRANDPARENT_VERTEX, grandparentVertex);
+ conf.setInt(TEST_GRACE_DESIRED_PARALLELISM, desiredParallelism);
+ return conf;
+ }
+ }
+
+ public static GraceConfBuilder newConfBuilder() {
+ return new GraceConfBuilder();
+ }
+
+ public static final class GraceConfBuilder {
+
+ private String grandparentVertex;
+ private int desiredParallelism;
+
+ private GraceConfBuilder() {
+ }
+
+ public GraceConfBuilder setGrandparentVertex(String grandparentVertex) {
+ this.grandparentVertex = grandparentVertex;
+ return this;
+ }
+
+ public GraceConfBuilder setDesiredParallelism(int desiredParallelism) {
+ this.desiredParallelism = desiredParallelism;
+ return this;
+ }
+
+ public ByteString toByteString() throws IOException {
+ return TezUtils.createByteStringFromConf(build().toConfiguration());
+ }
+
+ private GraceConf build() {
+ Preconditions.checkNotNull(grandparentVertex,
+ "Grandparent vertex is required");
+ Preconditions.checkArgument(desiredParallelism > 0,
+ "Desired parallelism must be greater than 0");
+ return new GraceConf(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/71ec8564/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 6104a1d..c4058c4 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -508,14 +508,14 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
}
pendingStateUpdates.clear();
+ // track the tasks in this vertex
+ updatePendingTasks();
+
for (VertexManagerEvent vmEvent : pendingVMEvents) {
handleVertexManagerEvent(vmEvent);
}
pendingVMEvents.clear();
-
- // track the tasks in this vertex
- updatePendingTasks();
-
+
LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
" with " + totalNumBipartiteSourceTasks + " source tasks and " +
totalTasksToSchedule + " pending tasks");