Author: hitesh
Date: Wed Apr 24 00:21:27 2013
New Revision: 1471212
URL: http://svn.apache.org/r1471212
Log:
TEZ-75. InputSpec/OutputSpec should expose respective input/output class names.
(hitesh)
Modified:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
Modified:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/InputSpec.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/InputSpec.java
Wed Apr 24 00:21:27 2013
@@ -29,13 +29,16 @@ public class InputSpec implements Writab
private String vertexName;
private int inDegree;
+ private String inputClassName;
public InputSpec() {
}
- public InputSpec(String vertexName, int inDegree) {
+ public InputSpec(String vertexName, int inDegree,
+ String inputClassName) {
this.vertexName = vertexName;
this.inDegree = inDegree;
+ this.inputClassName = inputClassName;
}
/**
@@ -52,21 +55,31 @@ public class InputSpec implements Writab
public int getNumInputs() {
return this.inDegree;
}
+
+ /**
+ * @return Input class name
+ */
+ public String getInputClassName() {
+ return this.inputClassName;
+ }
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, vertexName);
out.writeInt(inDegree);
+ Text.writeString(out, inputClassName);
}
@Override
public void readFields(DataInput in) throws IOException {
vertexName = Text.readString(in);
this.inDegree = in.readInt();
+ inputClassName = Text.readString(in);
}
@Override
public String toString() {
- return "VertexName: " + vertexName + ", InDegree: " + inDegree;
+ return "VertexName: " + vertexName + ", InDegree: " + inDegree
+ + ", InputClassName=" + inputClassName;
}
}
Modified:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/OutputSpec.java
Wed Apr 24 00:21:27 2013
@@ -29,13 +29,16 @@ public class OutputSpec implements Writa
private String vertexName;
private int outDegree;
+ private String outputClassName;
public OutputSpec() {
}
- public OutputSpec(String vertexName, int outDegree) {
+ public OutputSpec(String vertexName, int outDegree,
+ String outputClassName) {
this.vertexName = vertexName;
this.outDegree = outDegree;
+ this.outputClassName = outputClassName;
}
/**
@@ -52,20 +55,30 @@ public class OutputSpec implements Writa
return this.outDegree;
}
+ /**
+ * @return Output class name
+ */
+ public String getOutputClassName() {
+ return this.outputClassName;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, vertexName);
out.writeInt(outDegree);
+ Text.writeString(out, outputClassName);
}
@Override
public void readFields(DataInput in) throws IOException {
vertexName = Text.readString(in);
this.outDegree = in.readInt();
+ outputClassName = Text.readString(in);
}
@Override
public String toString() {
- return "VertexName: " + vertexName + ", OutDegree: " + outDegree;
+ return "VertexName: " + vertexName + ", OutDegree: " + outDegree
+ + ", OutputClassName=" + outputClassName;
}
}
Modified:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
Wed Apr 24 00:21:27 2013
@@ -166,6 +166,8 @@ public class YarnTezDagChild {
return;
}
taskContext = containerTask.getTezEngineTaskContext();
+ LOG.info("XXXX: New container task context:"
+ + taskContext.toString());
taskAttemptId = taskContext.getTaskAttemptId();
Modified:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Wed Apr 24 00:21:27 2013
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -1293,9 +1294,10 @@ public class VertexImpl implements org.a
public synchronized List<InputSpec> getInputSpecList() {
inputSpecList = new ArrayList<InputSpec>(
this.getInputVerticesCount());
- for (Vertex srcVertex : this.getInputVertices().keySet()) {
- InputSpec inputSpec = new InputSpec(srcVertex.getName(),
- srcVertex.getTotalTasks());
+ for (Entry<Vertex, EdgeProperty> entry :
this.getInputVertices().entrySet()) {
+ InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
+ entry.getKey().getTotalTasks(),
+ entry.getValue().getInputClass());
LOG.info("DEBUG: For vertex : " + this.getName()
+ ", Using InputSpec : " + inputSpec);
// TODO DAGAM This should be based on the edge type.
@@ -1309,9 +1311,10 @@ public class VertexImpl implements org.a
public synchronized List<OutputSpec> getOutputSpecList() {
if (this.outputSpecList == null) {
outputSpecList = new
ArrayList<OutputSpec>(this.getOutputVerticesCount());
- for (Vertex targetVertex : this.getOutputVertices().keySet()) {
- OutputSpec outputSpec = new OutputSpec(targetVertex.getName(),
- targetVertex.getTotalTasks());
+ for (Entry<Vertex, EdgeProperty> entry :
this.getOutputVertices().entrySet()) {
+ OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
+ entry.getKey().getTotalTasks(),
+ entry.getValue().getOutputClass());
LOG.info("DEBUG: For vertex : " + this.getName()
+ ", Using OutputSpec : " + outputSpec);
// TODO DAGAM This should be based on the edge type.
Modified:
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
Wed Apr 24 00:21:27 2013
@@ -101,4 +101,22 @@ public class TezEngineTaskContext extend
outputSpecList.add(outputSpec);
}
}
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("taskModuleClassName=" + taskModuleClassName
+ + ", inputSpecListSize=" + inputSpecList.size()
+ + ", outputSpecListSize=" + outputSpecList.size());
+ sb.append(", inputSpecList=[");
+ for (InputSpec i : inputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("], outputSpecList=[");
+ for (OutputSpec i : outputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
}
Modified:
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
Wed Apr 24 00:21:27 2013
@@ -70,6 +70,7 @@ import org.apache.tez.common.counters.Te
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@@ -448,7 +449,7 @@ public class LocalJobRunner implements C
IDConverter.fromMRTaskAttemptId(reduceId), user,
localConf.getJobName(), "TODO_vertexName",
LocalFinalTask.class.getName(),
Collections.singletonList(new InputSpec("TODO_srcVertexName",
- mapIds.size())), null);
+ mapIds.size(), LocalMergedInput.class.getName())), null);
Injector injector = Guice.createInjector(new LocalFinalTask());
// move map output to reduce input
Modified:
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java?rev=1471212&r1=1471211&r2=1471212&view=diff
==============================================================================
---
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
(original)
+++
incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
Wed Apr 24 00:21:27 2013
@@ -34,6 +34,7 @@ import org.apache.tez.common.TezJobConfi
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
import org.apache.tez.engine.runtime.TezEngineFactory;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
import org.apache.tez.mapreduce.TezTestUtils;
@@ -96,7 +97,8 @@ public class TestReduceProcessor {
TezEngineTaskContext taskContext = new TezEngineTaskContext(
TezTestUtils.getMockTaskAttemptId(0, 0, 0, 0), "tez",
"tez", "TODO_vertexName", LocalFinalTask.class.getName(),
- Collections.singletonList(new InputSpec("TODO_srcVertexName", 1)),
null);
+ Collections.singletonList(new InputSpec("TODO_srcVertexName", 1,
+ LocalMergedInput.class.getName())), null);
job.set(JobContext.TASK_ATTEMPT_ID,
taskContext.getTaskAttemptId().toString());
Injector injector = Guice.createInjector(new LocalFinalTask());
TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);