TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
instead of tasks (bikas)
(cherry picked from commit 7b45e9a142830e7dd8b0263d50dbaaef5fb0da76)
Conflicts:
CHANGES.txt
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/17d5c18f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/17d5c18f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/17d5c18f
Branch: refs/heads/branch-0.7
Commit: 17d5c18fbb7debdb8abe24db63474476e839addd
Parents: 37074a9
Author: Bikas Saha <[email protected]>
Authored: Wed Aug 5 11:05:54 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Dec 11 17:23:28 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/VertexManagerPlugin.java | 45 +++-
.../tez/dag/api/VertexManagerPluginContext.java | 38 ++-
.../apache/tez/runtime/api/DagIdentifier.java | 26 ++
.../tez/runtime/api/TaskAttemptIdentifier.java | 26 ++
.../apache/tez/runtime/api/TaskIdentifier.java | 26 ++
.../tez/runtime/api/VertexIdentifier.java | 28 ++
.../runtime/api/events/VertexManagerEvent.java | 22 +-
.../tez/dag/records/DagIdentifierImpl.java | 69 +++++
.../dag/records/TaskAttemptIdentifierImpl.java | 70 +++++
.../tez/dag/records/TaskIdentifierImpl.java | 70 +++++
.../tez/dag/records/VertexIdentifierImpl.java | 77 ++++++
.../java/org/apache/tez/dag/app/dag/Vertex.java | 4 +-
.../dag/impl/ImmediateStartVertexManager.java | 13 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 +++-
.../tez/dag/app/dag/impl/VertexManager.java | 62 ++---
.../apache/tez/dag/app/dag/impl/TestCommit.java | 5 +-
.../impl/TestImmediateStartVertexManager.java | 20 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 42 +--
.../tez/dag/app/dag/impl/TestVertexManager.java | 5 +-
.../tez/test/VertexManagerPluginForTest.java | 6 +-
.../vertexmanager/InputReadyVertexManager.java | 36 ++-
.../vertexmanager/ShuffleVertexManager.java | 42 ++-
.../TestInputReadyVertexManager.java | 127 ++++-----
.../vertexmanager/TestShuffleVertexManager.java | 265 +++++++++++--------
.../org/apache/tez/test/TestAMRecovery.java | 41 +--
.../tez/test/TestExceptionPropagation.java | 7 +-
.../apache/tez/test/dag/MultiAttemptDAG.java | 29 +-
28 files changed, 920 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d705ec..04076ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
+ instead of tasks
TEZ-2824. Add javadocs for Vertex.setConf and DAG.setConf.
TEZ-2963. RecoveryService#handleSummaryEvent exception with HDFS transparent
encryption + kerberos authentication.
TEZ-2966. Tez does not honor mapreduce.task.timeout
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index 6aa18d6..b66a66a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.api;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -25,6 +27,7 @@ import
org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
/**
@@ -59,20 +62,58 @@ public abstract class VertexManagerPlugin {
*/
public abstract void initialize() throws Exception;
+ @Deprecated
/**
+ * This is replaced by {@link VertexManagerPlugin#onVertexStarted(List)}
* Notification that the vertex is ready to start running tasks
* @param completions Source vertices and all their tasks that have already
completed
* @throws Exception
*/
- public abstract void onVertexStarted(Map<String, List<Integer>> completions)
throws Exception;
+ public void onVertexStarted(Map<String, List<Integer>> completions) throws
Exception {
+ throw new UnsupportedOperationException();
+ }
/**
+ * Notification that the vertex is ready to start running tasks
+ * @param completions All the source task attempts that have already
completed
+ * @throws Exception
+ */
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws
Exception {
+ Map<String, List<Integer>> completionsMap = new HashMap<String,
List<Integer>>();
+ for (TaskAttemptIdentifier attempt : completions) {
+ String vName =
attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ List<Integer> tasks = completionsMap.get(vName);
+ if (tasks == null) {
+ tasks = new LinkedList<Integer>();
+ completionsMap.put(vName, tasks);
+ }
+ tasks.add(attempt.getTaskIdentifier().getIdentifier());
+ }
+ onVertexStarted(completionsMap);
+ }
+
+ @Deprecated
+ /**
+ * This has been replaced by
+ * {@link VertexManagerPlugin#onSourceTaskCompleted(TaskAttemptIdentifier)}
* Notification of a source vertex completion.
* @param srcVertexName
* @param taskId Index of the task that completed
* @throws Exception
*/
- public abstract void onSourceTaskCompleted(String srcVertexName, Integer
taskId) throws Exception;
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Notification of a source vertex task completion.
+ * @param attempt Identifier of the task attempt that completed
+ * @throws Exception
+ */
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws
Exception {
+
onSourceTaskCompleted(attempt.getTaskIdentifier().getVertexIdentifier().getName(),
+ attempt.getTaskIdentifier().getIdentifier());
+ }
/**
* Notification of an event directly sent to this vertex manager
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 345ea43..1ce47b6 100644
---
a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -44,11 +44,35 @@ import com.google.common.base.Preconditions;
@Public
public interface VertexManagerPluginContext {
+ public class ScheduleTaskRequest {
+ int taskIndex;
+ TaskLocationHint locationHint;
+
+ public static ScheduleTaskRequest create(int taskIndex, @Nullable
TaskLocationHint locationHint) {
+ return new ScheduleTaskRequest(taskIndex, locationHint);
+ }
+
+ private ScheduleTaskRequest(int taskIndex, @Nullable TaskLocationHint
locationHint) {
+ Preconditions.checkState(taskIndex >= 0);
+ this.taskIndex = taskIndex;
+ this.locationHint = locationHint;
+ }
+
+ public int getTaskIndex() {
+ return taskIndex;
+ }
+
+ public TaskLocationHint getTaskLocationHint() {
+ return locationHint;
+ }
+ }
+
+ @Deprecated
public class TaskWithLocationHint {
Integer taskIndex;
TaskLocationHint locationHint;
public TaskWithLocationHint(Integer taskIndex, @Nullable TaskLocationHint
locationHint) {
- Preconditions.checkNotNull(taskIndex);
+ Preconditions.checkState(taskIndex != null);
this.taskIndex = taskIndex;
this.locationHint = locationHint;
}
@@ -161,7 +185,7 @@ public interface VertexManagerPluginContext {
* destination tasks may need to be updated to account for the new task
* parallelism. This method can be called to update the parallelism multiple
* times until any of the tasks of the vertex have been scheduled (by
invoking
- * {@link #scheduleVertexTasks(List)}. If needed, the original source edge
+ * {@link #scheduleTasks(List)}. If needed, the original source edge
* properties may be obtained via {@link #getInputVertexEdgeProperties()}
*
* @param parallelism
@@ -218,13 +242,21 @@ public interface VertexManagerPluginContext {
*/
public void addRootInputEvents(String inputName,
Collection<InputDataInformationEvent> events);
+ @Deprecated
/**
+ * Replaced by {@link #scheduleTasks(List)}
* Notify the vertex to start the given tasks
* @param tasks Indices of the tasks to be started
*/
public void scheduleVertexTasks(List<TaskWithLocationHint> tasks);
/**
+ * Notify the vertex to schedule the given tasks
+ * @param tasks Identifier and metadata for the tasks to schedule
+ */
+ public void scheduleTasks(List<ScheduleTaskRequest> tasks);
+
+ /**
* Get the names of the non-vertex inputs of this vertex. These are primary
* sources of data.
* @return Names of inputs to this vertex. Maybe null if there are no inputs
@@ -291,4 +323,4 @@ public interface VertexManagerPluginContext {
// TODO must be done later after TEZ-1714
//public void vertexManagerDone();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
new file mode 100644
index 0000000..dd63b4c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/DagIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface DagIdentifier {
+
+ public String getName();
+
+ public int getIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..101fa91
--- /dev/null
+++
b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskAttemptIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskAttemptIdentifier {
+
+ public int getIdentifier();
+
+ public TaskIdentifier getTaskIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
new file mode 100644
index 0000000..8ef066b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskIdentifier.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface TaskIdentifier {
+
+ public int getIdentifier();
+
+ public VertexIdentifier getVertexIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
new file mode 100644
index 0000000..16e88ad
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/VertexIdentifier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface VertexIdentifier {
+
+ public int getIdentifier();
+
+ public String getName();
+
+ public DagIdentifier getDagIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
index 484087e..9e73fe5 100644
---
a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
+++
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -20,16 +20,19 @@ package org.apache.tez.runtime.api.events;
import java.nio.ByteBuffer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import com.google.common.base.Preconditions;
/**
* Event used to send information from a Task to the VertexManager for a
vertex.
* This may be used to send statistics like samples etc to the VertexManager
for
- * automatic plan recofigurations based on observed statistics
+ * automatic plan reconfigurations based on observed statistics
*/
@Unstable
@Public
@@ -40,6 +43,8 @@ public class VertexManagerEvent extends Event {
*/
private final String targetVertexName;
+ private TaskAttemptIdentifier producerAttempt;
+
/**
* User payload to be sent
*/
@@ -68,4 +73,19 @@ public class VertexManagerEvent extends Event {
public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
}
+
+ /**
+ * Get metadata about the task attempt that produced the event.
+ * This method will provide a valid return value only when invoked in the
+ * {@link VertexManagerPlugin}
+ * @return attempt metadata
+ */
+ public TaskAttemptIdentifier getProducerAttemptIdentifier() {
+ return producerAttempt;
+ }
+
+ @Private
+ public void setProducerAttemptIdentifier(TaskAttemptIdentifier
producerAttempt) {
+ this.producerAttempt = producerAttempt;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
new file mode 100644
index 0000000..099cb58
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/DagIdentifierImpl.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+
+public class DagIdentifierImpl implements DagIdentifier {
+
+ private final TezDAGID dagId;
+ private final String dagName;
+
+ public DagIdentifierImpl(String dagName, TezDAGID dagId) {
+ this.dagId = dagId;
+ this.dagName = dagName;
+ }
+
+ @Override
+ public String getName() {
+ return dagName;
+ }
+
+ @Override
+ public int getIdentifier() {
+ return dagId.getId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ DagIdentifierImpl other = (DagIdentifierImpl) o;
+ return this.dagId.equals(other.dagId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Dag: " + dagName + ":[" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return dagId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
new file mode 100644
index 0000000..b834111
--- /dev/null
+++
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+
+public class TaskAttemptIdentifierImpl implements TaskAttemptIdentifier {
+
+ private final TaskIdentifier taskIdentifier;
+ private final TezTaskAttemptID attemptId;
+
+ public TaskAttemptIdentifierImpl(String dagName, String vertexName,
TezTaskAttemptID attemptId) {
+ this.attemptId = attemptId;
+ this.taskIdentifier = new TaskIdentifierImpl(dagName, vertexName,
attemptId.getTaskID());
+ }
+
+ @Override
+ public int getIdentifier() {
+ return attemptId.getId();
+ }
+
+ @Override
+ public TaskIdentifier getTaskIdentifier() {
+ return taskIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ TaskAttemptIdentifierImpl other = (TaskAttemptIdentifierImpl) o;
+ return this.attemptId.equals(other.attemptId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return taskIdentifier.toString() + " Attempt: [" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return attemptId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
new file mode 100644
index 0000000..fb0848a
--- /dev/null
+++
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskIdentifierImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class TaskIdentifierImpl implements TaskIdentifier {
+
+ private final VertexIdentifier vertexIdentifier;
+ private final TezTaskID taskId;
+
+ public TaskIdentifierImpl(String dagName, String vertexName, TezTaskID
taskId) {
+ this.taskId = taskId;
+ this.vertexIdentifier = new VertexIdentifierImpl(dagName, vertexName,
taskId.getVertexID());
+ }
+
+ @Override
+ public int getIdentifier() {
+ return taskId.getId();
+ }
+
+ @Override
+ public VertexIdentifier getVertexIdentifier() {
+ return vertexIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ TaskIdentifierImpl other = (TaskIdentifierImpl) o;
+ return this.taskId.equals(other.taskId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return vertexIdentifier.toString() + " Task [" + getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
----------------------------------------------------------------------
diff --git
a/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
new file mode 100644
index 0000000..4480f74
--- /dev/null
+++
b/tez-common/src/main/java/org/apache/tez/dag/records/VertexIdentifierImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+import org.apache.tez.runtime.api.DagIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+
+public class VertexIdentifierImpl implements VertexIdentifier {
+
+ private final DagIdentifier dagIdentifier;
+ private final TezVertexID vertexId;
+ private final String vertexName;
+
+ public VertexIdentifierImpl(String dagName, String vertexName, TezVertexID
vertexId) {
+ this.vertexId = vertexId;
+ this.vertexName = vertexName;
+ this.dagIdentifier = new DagIdentifierImpl(dagName, vertexId.getDAGId());
+ }
+
+ @Override
+ public String getName() {
+ return vertexName;
+ }
+
+ @Override
+ public int getIdentifier() {
+ return vertexId.getId();
+ }
+
+ @Override
+ public DagIdentifier getDagIdentifier() {
+ return dagIdentifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if(o == null) {
+ return false;
+ }
+ if (o.getClass() == this.getClass()) {
+ VertexIdentifierImpl other = (VertexIdentifierImpl) o;
+ return this.vertexId.equals(other.vertexId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return dagIdentifier.toString() + " Vertex: " + vertexName + ":[" +
getIdentifier() + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 88dcc8d..1c92314 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,8 +37,8 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -150,7 +150,7 @@ public interface Vertex extends Comparable<Vertex> {
int getInputVerticesCount();
int getOutputVerticesCount();
- void scheduleTasks(List<TaskWithLocationHint> tasks);
+ void scheduleTasks(List<ScheduleTaskRequest> tasks);
void scheduleSpeculativeTask(TezTaskID taskId);
Resource getTaskResource();
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 5e179bd..50624dd 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -28,10 +28,11 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import java.util.EnumSet;
@@ -56,7 +57,7 @@ public class ImmediateStartVertexManager extends
VertexManagerPlugin {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
managedTasks =
getContext().getVertexNumTasks(getContext().getVertexName());
Map<String, EdgeProperty> edges =
getContext().getInputVertexEdgeProperties();
for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
@@ -90,14 +91,14 @@ public class ImmediateStartVertexManager extends
VertexManagerPlugin {
}
tasksScheduled = true;
- List<TaskWithLocationHint> tasksToStart =
Lists.newArrayListWithCapacity(managedTasks);
+ List<ScheduleTaskRequest> tasksToStart =
Lists.newArrayListWithCapacity(managedTasks);
for (int i = 0; i < managedTasks; ++i) {
- tasksToStart.add(new TaskWithLocationHint(i, null));
+ tasksToStart.add(ScheduleTaskRequest.create(i, null));
}
if (!tasksToStart.isEmpty()) {
LOG.info("Starting " + tasksToStart.size() + " in " +
getContext().getVertexName());
- getContext().scheduleVertexTasks(tasksToStart);
+ getContext().scheduleTasks(tasksToStart);
}
// all tasks scheduled. Can call vertexManagerDone().
// TODO TEZ-1714 for locking issues getContext().vertexManagerDone();
@@ -134,7 +135,7 @@ public class ImmediateStartVertexManager extends
VertexManagerPlugin {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index cc1f489..2f2b0c2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -78,7 +78,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -150,6 +150,7 @@ import
org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -161,6 +162,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.OutputStatistics;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
@@ -1526,16 +1528,16 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
getGroupInputSpecList(taskIndex));
}
-
+
@Override
- public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
+ public void scheduleTasks(List<ScheduleTaskRequest> tasksToSchedule) {
try {
unsetTasksNotYetScheduled();
// update state under write lock
writeLock.lock();
try {
- for (TaskWithLocationHint task : tasksToSchedule) {
- if (numTasks <= task.getTaskIndex().intValue()) {
+ for (ScheduleTaskRequest task : tasksToSchedule) {
+ if (numTasks <= task.getTaskIndex()) {
throw new TezUncheckedException(
"Invalid taskId: " + task.getTaskIndex() + " for vertex: " +
logIdentifier);
}
@@ -1544,7 +1546,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
if (taskLocationHints == null) {
taskLocationHints = new TaskLocationHint[numTasks];
}
- taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+ taskLocationHints[task.getTaskIndex()] = locationHint;
}
}
} finally {
@@ -1553,8 +1555,8 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
readLock.lock();
try {
- for (TaskWithLocationHint task : tasksToSchedule) {
- TezTaskID taskId = TezTaskID.getInstance(vertexId,
task.getTaskIndex().intValue());
+ for (ScheduleTaskRequest task : tasksToSchedule) {
+ TezTaskID taskId = TezTaskID.getInstance(vertexId,
task.getTaskIndex());
TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
getTaskLocationHint(taskId)));
@@ -2834,8 +2836,24 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
+ private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG
dag,
+ List<TezTaskAttemptID> taIds) {
+ List<TaskAttemptIdentifier> attempts = new
ArrayList<TaskAttemptIdentifier>(taIds.size());
+ String dagName = dag.getName();
+ for (TezTaskAttemptID taId : taIds) {
+ String vertexName =
dag.getVertex(taId.getTaskID().getVertexID()).getName();
+ attempts.add(getTaskAttemptIdentifier(dagName, vertexName, taId));
+ }
+ return attempts;
+ }
+
+ private static TaskAttemptIdentifier getTaskAttemptIdentifier(String
dagName, String vertexName,
+ TezTaskAttemptID taId) {
+ return new TaskAttemptIdentifierImpl(dagName, vertexName, taId);
+ }
+
private void recoveryCodeSimulatingStart() throws AMUserCodeException {
- vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+ vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag,
pendingReportedSrcCompletions));
// This code is duplicated from startVertex() because recovery does not
follow normal
// transitions. To be removed after recovery code is fixed.
maybeSendConfiguredEvent();
@@ -3647,7 +3665,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
startedTime = clock.getTime();
try {
- vertexManager.onVertexStarted(pendingReportedSrcCompletions);
+ vertexManager.onVertexStarted(getTaskAttemptIdentifiers(dag,
pendingReportedSrcCompletions));
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() +", vertex=" +
logIdentifier;
LOG.error(msg, e);
@@ -3912,8 +3930,11 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
if (vertex.getState() == VertexState.RUNNING) {
try {
// Inform the vertex manager about the source task completing.
- vertex.vertexManager.onSourceTaskCompleted(completionEvent
- .getTaskAttemptId().getTaskID());
+ TezTaskAttemptID taId = completionEvent.getTaskAttemptId();
+ vertex.vertexManager.onSourceTaskCompleted(
+ getTaskAttemptIdentifier(vertex.dag.getName(),
+
vertex.dag.getVertex(taId.getTaskID().getVertexID()).getName(),
+ taId));
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource() + ", vertex:" +
vertex.getLogIdentifier();
LOG.error(msg, e);
@@ -4400,6 +4421,12 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ if (srcTaId.getTaskID().getVertexID().equals(vertexId)) {
+ // this is the producer tasks' vertex
+ vmEvent.setProducerAttemptIdentifier(
+ getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
+ }
if (target == this) {
if (!vmIsInitialized.get()) {
// The VM hasn't been setup yet, defer event consumption
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 803159a..f9cce0e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -65,10 +65,9 @@ import
org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -85,7 +84,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "deprecation"})
public class VertexManager {
final VertexManagerPluginDescriptor pluginDesc;
final UserGroupInformation dagUgi;
@@ -202,10 +201,21 @@ public class VertexManager {
}
@Override
- public synchronized void scheduleVertexTasks(List<TaskWithLocationHint>
tasks) {
+ public synchronized void scheduleTasks(List<ScheduleTaskRequest> tasks) {
checkAndThrowIfDone();
managedVertex.scheduleTasks(tasks);
}
+
+ @Override
+ public synchronized void scheduleVertexTasks(List<TaskWithLocationHint>
tasks) {
+ checkAndThrowIfDone();
+ List<ScheduleTaskRequest> schedTasks = new
ArrayList<ScheduleTaskRequest>(tasks.size());
+ for (TaskWithLocationHint task : tasks) {
+ schedTasks.add(ScheduleTaskRequest.create(
+ task.getTaskIndex(), task.getTaskLocationHint()));
+ }
+ scheduleTasks(schedTasks);
+ }
@Nullable
@Override
@@ -455,30 +465,12 @@ public class VertexManager {
}
}
- public void onVertexStarted(List<TezTaskAttemptID> completions) throws
AMUserCodeException {
- Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
- if (completions != null && !completions.isEmpty()) {
- for (TezTaskAttemptID tezTaskAttemptID : completions) {
- Integer taskId = Integer.valueOf(tezTaskAttemptID.getTaskID().getId());
- String vertexName =
- appContext.getCurrentDAG().getVertex(
- tezTaskAttemptID.getTaskID().getVertexID()).getName();
- List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
- if (taskIdList == null) {
- taskIdList = Lists.newArrayList();
- pluginCompletionsMap.put(vertexName, taskIdList);
- }
- taskIdList.add(taskId);
- }
- }
- enqueueAndScheduleNextEvent(new
VertexManagerEventOnVertexStarted(pluginCompletionsMap));
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws
AMUserCodeException {
+ enqueueAndScheduleNextEvent(new
VertexManagerEventOnVertexStarted(completions));
}
- public void onSourceTaskCompleted(TezTaskID tezTaskId) throws
AMUserCodeException {
- Integer taskId = Integer.valueOf(tezTaskId.getId());
- String vertexName =
-
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
- enqueueAndScheduleNextEvent(new
VertexManagerEventSourceTaskCompleted(taskId, vertexName));
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws
AMUserCodeException {
+ enqueueAndScheduleNextEvent(new
VertexManagerEventSourceTaskCompleted(attempt));
}
public void onVertexManagerEventReceived(
@@ -576,31 +568,29 @@ public class VertexManager {
}
class VertexManagerEventOnVertexStarted extends VertexManagerEvent {
- private final Map<String, List<Integer>> pluginCompletionsMap;
+ private final List<TaskAttemptIdentifier> pluginCompletions;
- public VertexManagerEventOnVertexStarted(Map<String, List<Integer>>
pluginCompletionsMap) {
- this.pluginCompletionsMap = pluginCompletionsMap;
+ public VertexManagerEventOnVertexStarted(List<TaskAttemptIdentifier>
pluginCompletions) {
+ this.pluginCompletions = pluginCompletions;
}
@Override
public void invoke() throws Exception {
- plugin.onVertexStarted(pluginCompletionsMap);
+ plugin.onVertexStarted(pluginCompletions);
}
}
class VertexManagerEventSourceTaskCompleted extends VertexManagerEvent {
- private final Integer taskId;
- private final String vertexName;
+ private final TaskAttemptIdentifier attempt;
- public VertexManagerEventSourceTaskCompleted(Integer taskId, String
vertexName) {
- this.taskId = taskId;
- this.vertexName = vertexName;
+ public VertexManagerEventSourceTaskCompleted(TaskAttemptIdentifier
attempt) {
+ this.attempt = attempt;
}
@Override
public void invoke() throws Exception {
- plugin.onSourceTaskCompleted(vertexName, taskId);
+ plugin.onSourceTaskCompleted(attempt);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 6d23df3..83421a2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -84,7 +84,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -101,6 +100,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.*;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -788,9 +788,10 @@ public class TestCommit {
Assert.assertEquals(VertexState.COMMITTING, v1.getState());
// reschedule task
VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex1",
ByteBuffer.wrap(new byte[0]));
+ TezTaskAttemptID taId =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
TezEvent tezEvent = new TezEvent(vmEvent,
new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1",
- null, null));
+ null, taId));
v1.handle(new VertexEventRouteEvent(v1.getVertexId(),
Collections.singletonList(tezEvent)));
waitUntil(dag, DAGState.FAILED);
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
index 6d071a7..a17c7c5 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestImmediateStartVertexManager.java
@@ -28,9 +28,10 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -92,24 +93,25 @@ public class TestImmediateStartVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks =
(List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext).scheduleVertexTasks(anyList());
+ }}).when(mockContext).scheduleTasks(anyList());
+ List<TaskAttemptIdentifier> emptyCompletions = null;
ImmediateStartVertexManager manager = new
ImmediateStartVertexManager(mockContext);
manager.initialize();
- manager.onVertexStarted(null);
- verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+ manager.onVertexStarted(emptyCompletions);
+ verify(mockContext, times(0)).scheduleTasks(anyList());
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
- verify(mockContext, times(1)).scheduleVertexTasks(anyList());
+ verify(mockContext, times(1)).scheduleTasks(anyList());
Assert.assertEquals(4, scheduledTasks.size());
// simulate race between onVertexStarted and notifications
@@ -123,8 +125,8 @@ public class TestImmediateStartVertexManager {
return null;
}}).when(mockContext).registerForVertexStateUpdates(anyString(), anySet());
raceManager.initialize();
- raceManager.onVertexStarted(null);
- verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+ raceManager.onVertexStarted(emptyCompletions);
+ verify(mockContext, times(2)).scheduleTasks(anyList());
Assert.assertEquals(4, scheduledTasks.size());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e49f97d..7db5bd9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -93,7 +93,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -160,6 +160,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -169,6 +170,7 @@ import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
@@ -2736,10 +2738,10 @@ public class TestVertexImpl {
// verify all events have been put in pending.
// this is not necessary after legacy routing has been removed
Assert.assertEquals(5, v4.pendingTaskEvents.size());
- List<TaskWithLocationHint> taskList = new
LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new
LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v4.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v4.scheduleTasks(taskList);
dispatcher.await();
@@ -2948,7 +2950,7 @@ public class TestVertexImpl {
startVertex(v1);
v3.reconfigureVertex(10, null, null);
checkTasks(v3, 10);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new
Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0,
null)));
try {
v3.reconfigureVertex(5, null, null);
Assert.fail();
@@ -2973,7 +2975,7 @@ public class TestVertexImpl {
checkTasks(v3, 10);
taskEventDispatcher.events.clear();
TaskLocationHint mockLocation = mock(TaskLocationHint.class);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new
Integer(0), mockLocation)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0,
mockLocation)));
dispatcher.await();
Assert.assertEquals(1, taskEventDispatcher.events.size());
TaskEventScheduleTask event = (TaskEventScheduleTask)
taskEventDispatcher.events.get(0);
@@ -2993,7 +2995,7 @@ public class TestVertexImpl {
VertexImpl v1 = vertices.get("vertex1");
startVertex(vertices.get("vertex2"));
startVertex(v1);
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new
Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0,
null)));
try {
v3.reconfigureVertex(5, null, null);
Assert.fail();
@@ -3029,7 +3031,7 @@ public class TestVertexImpl {
new VertexEventRouteEvent(v3.getVertexId(), taskEvents));
dispatcher.await();
Assert.assertEquals(2, v3.pendingTaskEvents.size());
- v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new
Integer(0), null)));
+ v3.scheduleTasks(Collections.singletonList(ScheduleTaskRequest.create(0,
null)));
dispatcher.await();
Assert.assertEquals(0, v3.pendingTaskEvents.size());
// send events and test that they are not buffered anymore
@@ -5014,10 +5016,10 @@ public class TestVertexImpl {
Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
}
- List<TaskWithLocationHint> taskList = new
LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskList = new
LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v1.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v1.scheduleTasks(taskList);
dispatcher.await();
@@ -5062,10 +5064,10 @@ public class TestVertexImpl {
Assert.assertEquals(true, initializerManager2.hasShutDown);
// scheduling start to trigger edge routing to begin
- taskList = new
LinkedList<VertexManagerPluginContext.TaskWithLocationHint>();
+ taskList = new
LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
// scheduling start to trigger edge routing to begin
for (int i=0; i<v2.getTotalTasks(); ++i) {
- taskList.add(new TaskWithLocationHint(i, null));
+ taskList.add(ScheduleTaskRequest.create(i, null));
}
v2.scheduleTasks(taskList);
dispatcher.await();
@@ -5882,8 +5884,10 @@ public class TestVertexImpl {
RootInputInitializerManagerControlled initializerManager1 =
v1.getRootInputInitializerManager();
initializerManager1.completeInputInitialization();
- Event vmEvent = VertexManagerEvent.create(v1.getName(),
ByteBuffer.wrap(new byte[0]));
- TezEvent tezEvent = new TezEvent(vmEvent, null);
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(v1.getName(),
ByteBuffer.wrap(new byte[0]));
+ TezTaskAttemptID taId1 =
TezTaskAttemptID.getInstance(TezTaskID.getInstance(v1.getVertexId(), 0), 0);
+ TezEvent tezEvent = new TezEvent(vmEvent, new
EventMetaData(EventProducerConsumerType.OUTPUT,
+ v1.getName(), null, taId1));
dispatcher.getEventHandler().handle(new
VertexEventRouteEvent(v1.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
@@ -6125,7 +6129,7 @@ public class TestVertexImpl {
verify(historyEventHandler, atLeast(1)).handle(argCaptor.capture());
verifyHistoryEvents(argCaptor.getAllValues(),
HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED, 1);
- v3.scheduleTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+ v3.scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
dispatcher.await();
assertTrue(v3.pendingTaskEvents.size() == 0);
// recovery events is not only handled one time
@@ -6160,11 +6164,11 @@ public class TestVertexImpl {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
@@ -6305,15 +6309,15 @@ public class TestVertexImpl {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId)
{
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
if (this.exLocation == VMExceptionLocation.OnSourceTaskCompleted) {
throw new RuntimeException(this.exLocation.name());
}
- super.onSourceTaskCompleted(srcVertexName, attemptId);
+ super.onSourceTaskCompleted(attempt);
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (this.exLocation == VMExceptionLocation.OnVertexStarted) {
throw new RuntimeException(this.exLocation.name());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 81cb42a..9c16f5e 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -183,11 +184,11 @@ public class TestVertexManager {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
index 84e060b..1cdaeca 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -25,7 +25,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.InputDescriptor;
@@ -34,6 +33,7 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
public class VertexManagerPluginForTest extends VertexManagerPlugin {
@@ -99,14 +99,14 @@ public class VertexManagerPluginForTest extends
VertexManagerPlugin {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (pluginConfig.getReconfigureOnStart()) {
getContext().reconfigureVertex(pluginConfig.getNumTasks(), null, null);
}
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 30e3e81..f05cd95 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,7 +18,6 @@
package org.apache.tez.dag.library.vertexmanager;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -34,17 +33,16 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
@Private
public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -57,7 +55,7 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
TaskLocationHint oneToOneLocationHints[];
int numOneToOneEdges;
int numConfiguredSources;
- Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
+ List<TaskAttemptIdentifier> pendingCompletions = Lists.newLinkedList();
AtomicBoolean configured;
AtomicBoolean started;
@@ -144,10 +142,8 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
private void trySchedulingPendingCompletions() {
if (readyToSchedule() && !pendingCompletions.isEmpty()) {
- for (Map.Entry<String, Collection<Integer>> entry :
pendingCompletions.asMap().entrySet()) {
- for (Integer i : entry.getValue()) {
- onSourceTaskCompleted(entry.getKey(), i);
- }
+ for (TaskAttemptIdentifier attempt : pendingCompletions) {
+ onSourceTaskCompleted(attempt);
}
}
}
@@ -180,10 +176,10 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
}
@Override
- public synchronized void onVertexStarted(Map<String, List<Integer>>
completions) {
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
- pendingCompletions.putAll(entry.getKey(), entry.getValue());
- }
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier>
completions) {
+ if (completions != null) {
+ pendingCompletions.addAll(completions);
+ }
// allow scheduling
started.set(true);
@@ -192,12 +188,14 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
}
@Override
- public synchronized void onSourceTaskCompleted(String srcVertexName, Integer
taskId) {
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier
attempt) {
+ String srcVertexName =
attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ int taskId = attempt.getTaskIdentifier().getIdentifier();
if (readyToSchedule()) {
// configured and started. try to schedule
handleSourceTaskFinished(srcVertexName, taskId);
} else {
- pendingCompletions.put(srcVertexName, taskId);
+ pendingCompletions.add(attempt);
}
}
@@ -245,7 +243,7 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
}
// all source vertices will full dependencies are done
- List<TaskWithLocationHint> tasksToStart = null;
+ List<ScheduleTaskRequest> tasksToStart = null;
if (numOneToOneEdges == 0) {
// no 1-1 dependency. Start all tasks
int numTasks = taskIsStarted.length;
@@ -253,7 +251,7 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
tasksToStart = Lists.newArrayListWithCapacity(numTasks);
for (int i=0; i<numTasks; ++i) {
taskIsStarted[i] = true;
- tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i), null));
+ tasksToStart.add(ScheduleTaskRequest.create(i, null));
}
} else {
// start only the ready 1-1 tasks
@@ -268,13 +266,13 @@ public class InputReadyVertexManager extends
VertexManagerPlugin {
LOG.info("Starting task " + i + " for vertex: "
+ getContext().getVertexName() + " with location: "
+ ((locationHint != null) ? locationHint.getAffinitizedTask() :
"null"));
- tasksToStart.add(new TaskWithLocationHint(Integer.valueOf(i),
locationHint));
+ tasksToStart.add(ScheduleTaskRequest.create(Integer.valueOf(i),
locationHint));
}
}
}
if (tasksToStart != null && !tasksToStart.isEmpty()) {
- getContext().scheduleVertexTasks(tasksToStart);
+ getContext().scheduleTasks(tasksToStart);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index fb47d47..e63d2a1 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -43,11 +44,13 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -64,6 +67,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -136,6 +140,8 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
int totalTasksToSchedule = 0;
private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+ private Set<TaskIdentifier> taskWithVmEvents = Sets.newHashSet();
+
//Track source vertex and its finished tasks
private final Map<String, SourceVertexInfo> srcVertexInfo =
Maps.newConcurrentMap();
boolean sourceVerticesScheduled = false;
@@ -441,7 +447,7 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
@Override
- public synchronized void onVertexStarted(Map<String, List<Integer>>
completions) {
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier>
completions) {
// examine edges after vertex started because until then these may not
have been defined
Map<String, EdgeProperty> inputs =
getContext().getInputVertexEdgeProperties();
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
@@ -470,10 +476,8 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
totalTasksToSchedule + " pending tasks");
if (completions != null) {
- for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
- for (Integer taskId : entry.getValue()) {
- onSourceTaskCompleted(entry.getKey(), taskId);
- }
+ for (TaskAttemptIdentifier attempt : completions) {
+ onSourceTaskCompleted(attempt);
}
}
onVertexStartedDone.set(true);
@@ -482,7 +486,9 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
}
@Override
- public synchronized void onSourceTaskCompleted(String srcVertexName, Integer
srcTaskId) {
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier
attempt) {
+ String srcVertexName =
attempt.getTaskIdentifier().getVertexIdentifier().getName();
+ int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
updateSourceTaskCount();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
@@ -503,8 +509,17 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
@Override
public synchronized void onVertexManagerEventReceived(VertexManagerEvent
vmEvent) {
- // TODO handle duplicates from retries
- if (enableAutoParallelism) {
+ // currently events from multiple attempts of the same task can be ignored
because
+ // their output will be the same. However, with pipelined events that may
not hold.
+ TaskIdentifier producerTask =
vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
+ if (!taskWithVmEvents.add(producerTask)) {
+ LOG.info("Ignoring vertex manager event from: " + producerTask);
+ return;
+ }
+
+ numVertexManagerEventsReceived++;
+
+ if (vmEvent.getUserPayload() != null) {
// save output size
VertexManagerEventPayloadProto proto;
try {
@@ -669,13 +684,16 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
}
getContext().doneReconfiguringVertex();
}
- List<TaskWithLocationHint> scheduledTasks =
Lists.newArrayListWithCapacity(numTasksToSchedule);
+ List<ScheduleTaskRequest> scheduledTasks =
Lists.newArrayListWithCapacity(numTasksToSchedule);
+
while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
numTasksToSchedule--;
- scheduledTasks.add(new TaskWithLocationHint(pendingTasks.get(0), null));
+ Integer taskIndex = pendingTasks.get(0);
+ scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null));
pendingTasks.remove(0);
}
- getContext().scheduleVertexTasks(scheduledTasks);
+
+ getContext().scheduleTasks(scheduledTasks);
if (pendingTasks.size() == 0) {
// done scheduling all tasks
// TODO TEZ-1714 locking issues. getContext().vertexManagerDone();