Repository: tez Updated Branches: refs/heads/master 1c16b5bfc -> 70db632f5
TEZ-1346. Change Processor to require context constructors for creation, and remove the requirement of the initialize method requiring the context. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/70db632f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/70db632f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/70db632f Branch: refs/heads/master Commit: 70db632f5dc26ec66b69b669337a33ea555a0559 Parents: 1c16b5b Author: Siddharth Seth <[email protected]> Authored: Wed Jul 30 20:34:56 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Jul 30 20:35:27 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/api/AbstractLogicalIOProcessor.java | 19 ++-- .../tez/runtime/api/MergedLogicalInput.java | 2 +- .../org/apache/tez/runtime/api/Processor.java | 6 +- .../tez/runtime/task/TestTaskExecution.java | 5 + .../examples/BroadcastAndOneToOneExample.java | 9 ++ .../mapreduce/examples/IntersectDataGen.java | 5 + .../mapreduce/examples/IntersectExample.java | 9 ++ .../mapreduce/examples/IntersectValidate.java | 7 +- .../tez/mapreduce/examples/UnionExample.java | 9 ++ .../tez/mapreduce/examples/WordCount.java | 9 ++ .../processor/FilterByWordInputProcessor.java | 14 +-- .../processor/FilterByWordOutputProcessor.java | 11 +- .../apache/tez/mapreduce/processor/MRTask.java | 25 +++-- .../mapreduce/processor/SimpleMRProcessor.java | 7 +- .../mapreduce/processor/map/MapProcessor.java | 10 +- .../processor/reduce/ReduceProcessor.java | 11 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 17 ++-- .../TestLogicalIOProcessorRuntimeTask.java | 4 +- .../library/common/sort/impl/TezMerger.java | 2 +- .../library/processor/SimpleProcessor.java | 5 + .../library/processor/SleepProcessor.java | 14 ++- .../java/org/apache/tez/test/TestProcessor.java | 102 ++++++++++--------- 23 files changed, 197 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 404de28..e2e673f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -39,6 +39,7 @@ INCOMPATIBLE CHANGES TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager to require constructors for creation, and remove the initialize methods. TEZ-1133. Remove some unused methods from MRHelpers. + TEZ-1346. Change Processor to require context constructors for creation, and remove the requirement of the initialize method requiring the context. Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java index 27e1bc8..7688c14 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java @@ -28,17 +28,24 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor { - protected TezProcessorContext context; + private final TezProcessorContext context; - @Override - public void initialize(TezProcessorContext processorContext) throws Exception { - this.context = processorContext; - initialize(); + /** + * Constructor an instance of the LogicalProcessor. Classes extending this one to create a + * LogicalProcessor, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides + * the Processor with context information within the running task. + */ + public AbstractLogicalIOProcessor(TezProcessorContext context) { + this.context = context; } + @Override public abstract void initialize() throws Exception; - public TezProcessorContext getContext() { + public final TezProcessorContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java index 6f7b14c..656bb2c 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java @@ -61,7 +61,7 @@ public abstract class MergedLogicalInput implements LogicalInput { return inputs; } - public TezMergedInputContext getContext() { + public final TezMergedInputContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java index 49f35cd..358bd43 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java @@ -29,12 +29,10 @@ public interface Processor { /** * Initializes the <code>Processor</code> * - * @param processorContext - * @throws java.io.IOException + * @throws java.lang.Exception * if an error occurs */ - public void initialize(TezProcessorContext processorContext) - throws Exception; + public void initialize() throws Exception; /** * Handles user and system generated {@link Event}s. http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java index 98abf1c..3fd6e2e 100644 --- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java +++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java @@ -59,6 +59,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.impl.InputSpec; @@ -355,6 +356,10 @@ public class TestTaskExecution { private boolean signalFatalAndLoop = false; private boolean signalFatalAndComplete = false; + public TestProcessor(TezProcessorContext context) { + super(context); + } + @Override public void initialize() throws Exception { parseConf(getContext().getUserPayload()); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index 7e7d351..1a667a3 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -46,6 +46,7 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -60,6 +61,10 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { public static class InputProcessor extends SimpleProcessor { Text word = new Text(); + public InputProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(getOutputs().size() == 1); @@ -82,6 +87,10 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { public static class OneToOneProcessor extends SimpleProcessor { Text word = new Text(); + public OneToOneProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(inputs.size() == 2); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java index 9ed33f9..d5b0eb9 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java @@ -56,6 +56,7 @@ import org.apache.tez.mapreduce.committer.MROutputCommitter; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; import com.google.common.base.Preconditions; @@ -236,6 +237,10 @@ public class IntersectDataGen extends Configured implements Tool { long hashOutputFileSize; float overlapApprox = 0.2f; + public GenDataProcessor(TezProcessorContext context) { + super(context); + } + public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java index c5bf792..08a8029 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java @@ -62,6 +62,7 @@ import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; @@ -258,6 +259,10 @@ public class IntersectExample extends Configured implements Tool { * Reads key-values from the source and forwards the value as the key for the output */ public static class ForwardingProcessor extends SimpleProcessor { + public ForwardingProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkState(getInputs().size() == 1); @@ -279,6 +284,10 @@ public class IntersectExample extends Configured implements Tool { public static class IntersectProcessor extends SimpleProcessor { + public IntersectProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkState(getInputs().size() == 2); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java index 585ee63..bf5aa01 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java @@ -56,6 +56,7 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -237,7 +238,11 @@ public class IntersectValidate extends Configured implements Tool { public static class IntersectValidateProcessor extends SimpleProcessor { private static final Log LOG = LogFactory.getLog(IntersectValidateProcessor.class); - + + public IntersectValidateProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkState(getInputs().size() == 2); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java index fdbe187..1fdd1f3 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java @@ -60,6 +60,7 @@ import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.SimpleMRProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.Output; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.api.KeyValuesReader; @@ -77,6 +78,10 @@ public class UnionExample { IntWritable one = new IntWritable(1); Text word = new Text(); + public TokenProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 1); @@ -121,6 +126,10 @@ public class UnionExample { public static class UnionProcessor extends SimpleMRProcessor { IntWritable one = new IntWritable(1); + public UnionProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 2); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java index 00fc326..0de2b04 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java @@ -57,6 +57,7 @@ import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.SimpleMRProcessor; import org.apache.tez.runtime.api.Output; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.api.KeyValuesReader; @@ -72,6 +73,10 @@ public class WordCount extends Configured implements Tool { IntWritable one = new IntWritable(1); Text word = new Text(); + public TokenProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 1); @@ -92,6 +97,10 @@ public class WordCount extends Configured implements Tool { } public static class SumProcessor extends SimpleMRProcessor { + public SumProcessor(TezProcessorContext context) { + super(context); + } + @Override public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 1); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java index 1d78366..0314e83 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java @@ -32,8 +32,8 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.TezProcessorContext; @@ -41,21 +41,23 @@ import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; -public class FilterByWordInputProcessor implements LogicalIOProcessor { +public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory.getLog(FilterByWordInputProcessor.class); private String filterWord; - public FilterByWordInputProcessor() { + public FilterByWordInputProcessor(TezProcessorContext context) { + super(context); } + @Override - public void initialize(TezProcessorContext processorContext) throws Exception { - Configuration conf = TezUtils.createConfFromUserPayload(processorContext.getUserPayload()); + public void initialize() throws Exception { + Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME); if (filterWord == null) { - processorContext.fatalError(null, "No filter word specified"); + getContext().fatalError(null, "No filter word specified"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java index d061ea0..bad2b9c 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java @@ -25,6 +25,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.map.MapProcessor; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; +import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; @@ -35,17 +37,18 @@ import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; -public class FilterByWordOutputProcessor implements LogicalIOProcessor { +public class FilterByWordOutputProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory.getLog(MapProcessor.class); private TezProcessorContext processorContext; - public FilterByWordOutputProcessor() { + public FilterByWordOutputProcessor(TezProcessorContext context) { + super(context); } + @Override - public void initialize(TezProcessorContext processorContext) throws Exception { - this.processorContext = processorContext; + public void initialize() throws Exception { } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java index ee002d8..d867107 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java @@ -76,6 +76,7 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl; import org.apache.tez.mapreduce.output.MROutputLegacy; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.TezProcessorContext; @@ -83,7 +84,7 @@ import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; @SuppressWarnings("deprecation") -public abstract class MRTask { +public abstract class MRTask extends AbstractLogicalIOProcessor { static final Log LOG = LogFactory.getLog(MRTask.class); @@ -118,27 +119,29 @@ public abstract class MRTask { protected MRTaskReporter mrReporter; protected boolean useNewApi; - public MRTask(boolean isMap) { + public MRTask(TezProcessorContext processorContext, boolean isMap) { + super(processorContext); this.isMap = isMap; } // TODO how to update progress - public void initialize(TezProcessorContext context) throws IOException, + @Override + public void initialize() throws IOException, InterruptedException { DeprecatedKeys.init(); - processorContext = context; - counters = context.getCounters(); + processorContext = getContext(); + counters = processorContext.getCounters(); this.taskAttemptId = new TaskAttemptID( new TaskID( - Long.toString(context.getApplicationId().getClusterTimestamp()), - context.getApplicationId().getId(), + Long.toString(processorContext.getApplicationId().getClusterTimestamp()), + processorContext.getApplicationId().getId(), (isMap ? TaskType.MAP : TaskType.REDUCE), - context.getTaskIndex()), - context.getTaskAttemptNumber()); + processorContext.getTaskIndex()), + processorContext.getTaskAttemptNumber()); - byte[] userPayload = context.getUserPayload(); + byte[] userPayload = processorContext.getUserPayload(); Configuration conf = TezUtils.createConfFromUserPayload(userPayload); if (conf instanceof JobConf) { this.jobConf = (JobConf)conf; @@ -150,7 +153,7 @@ public abstract class MRTask { jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, - context.getDAGAttemptNumber()); + processorContext.getDAGAttemptNumber()); LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java index 5fdcec0..23877de 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java @@ -24,13 +24,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.processor.SimpleProcessor; import com.google.common.collect.Lists; public abstract class SimpleMRProcessor extends SimpleProcessor { private static final Log LOG = LogFactory.getLog(SimpleMRProcessor.class); - + + public SimpleMRProcessor(TezProcessorContext context) { + super(context); + } + @Override protected void postOp() throws Exception { if (getOutputs() == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java index a2dd7d1..ca6dbe2 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java @@ -53,19 +53,19 @@ import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.output.OnFileSortedOutput; @SuppressWarnings({ "unchecked", "rawtypes" }) -public class MapProcessor extends MRTask implements LogicalIOProcessor { +public class MapProcessor extends MRTask { private static final Log LOG = LogFactory.getLog(MapProcessor.class); - public MapProcessor(){ - super(true); + public MapProcessor(TezProcessorContext processorContext) { + super(processorContext, true); } @Override - public void initialize(TezProcessorContext processorContext) + public void initialize() throws IOException { try { - super.initialize(processorContext); + super.initialize(); } catch (InterruptedException e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index c5ade59..e4a2d93 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -42,7 +42,6 @@ import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.TezProcessorContext; @@ -55,22 +54,22 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput; @SuppressWarnings({ "unchecked", "rawtypes" }) -public class ReduceProcessor extends MRTask implements LogicalIOProcessor { +public class ReduceProcessor extends MRTask { private static final Log LOG = LogFactory.getLog(ReduceProcessor.class); private Counter reduceInputKeyCounter; private Counter reduceInputValueCounter; - public ReduceProcessor() { - super(false); + public ReduceProcessor(TezProcessorContext processorContext) { + super(processorContext, false); } @Override - public void initialize(TezProcessorContext processorContext) + public void initialize() throws IOException { try { - super.initialize(processorContext); + super.initialize(); } catch (InterruptedException e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 5511b72..b0d6ffa 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -105,7 +105,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap; private final ProcessorDescriptor processorDescriptor; - private final LogicalIOProcessor processor; + private LogicalIOProcessor processor; private TezProcessorContext processorContext; private final MemoryDistributor initialMemoryDistributor; @@ -154,7 +154,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.runOutputMap = new LinkedHashMap<String, LogicalOutput>(); this.processorDescriptor = taskSpec.getProcessorDescriptor(); - this.processor = createProcessor(processorDescriptor); this.serviceConsumerMetadata = serviceConsumerMetadata; this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>(); this.state = State.NEW; @@ -182,6 +181,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { Preconditions.checkState(this.state == State.NEW, "Already initialized"); this.state = State.INITED; + LOG.info("Creating processor" + ", processorClassName=" + processorDescriptor.getClassName()); + this.processorContext = createProcessorContext(); + this.processor = createProcessor(processorDescriptor.getClassName(), processorContext); + int numTasks = 0; int inputIndex = 0; @@ -466,9 +469,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private void initializeLogicalIOProcessor() throws Exception { LOG.info("Initializing processor" + ", processorClassName=" + processorDescriptor.getClassName()); - TezProcessorContext processorContext = createProcessorContext(); - this.processorContext = processorContext; - processor.initialize(processorContext); + processor.initialize(); LOG.info("Initialized processor" + ", processorClassName=" + processorDescriptor.getClassName()); } @@ -553,9 +554,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } private LogicalIOProcessor createProcessor( - ProcessorDescriptor processorDescriptor) { - Processor processor = ReflectionUtils.createClazzInstance(processorDescriptor - .getClassName()); + String processorClassName, TezProcessorContext processorContext) { + Processor processor = ReflectionUtils.createClazzInstance(processorClassName, + new Class[]{TezProcessorContext.class}, new Object[]{processorContext}); if (!(processor instanceof LogicalIOProcessor)) { throw new TezUncheckedException(processor.getClass().getName() + " is not a sub-type of LogicalIOProcessor." http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index dcf3303..7740cb0 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -43,6 +43,7 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.TezOutputContext; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -141,7 +142,8 @@ public class TestLogicalIOProcessorRuntimeTask { public static volatile int runCount = 0; - public TestProcessor() { + public TestProcessor(TezProcessorContext context) { + super(context); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java index f5f1478..217e63a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java @@ -408,7 +408,7 @@ public class TezMerger { }; public MergeQueue(Configuration conf, FileSystem fs, - Path[] inputs, boolean deleteInputs, + Path[] inputs, boolean deleteInputs, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize, boolean considerFinalMergeForProgress, http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java index 60eae30..85ba412 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java @@ -24,11 +24,16 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; + public SimpleProcessor(TezProcessorContext context) { + super(context); + } + public void run(Map<String, LogicalInput> _inputs, Map<String, LogicalOutput> _outputs) throws Exception { this.inputs = _inputs; http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java index fada3fd..f0e69ad 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java @@ -20,8 +20,8 @@ package org.apache.tez.runtime.library.processor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.TezProcessorContext; @@ -35,23 +35,27 @@ import java.util.Map; * * @see SleepProcessorConfig for configuring the SleepProcessor */ -public class SleepProcessor implements LogicalIOProcessor { +public class SleepProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory.getLog(SleepProcessor.class); private int timeToSleepMS; + public SleepProcessor(TezProcessorContext context) { + super(context); + } + @Override - public void initialize(TezProcessorContext processorContext) + public void initialize() throws Exception { - if (processorContext.getUserPayload() == null) { + if (getContext().getUserPayload() == null) { LOG.info("No processor user payload specified" + ", using default timeToSleep of 1 ms"); timeToSleepMS = 1; } else { SleepProcessorConfig cfg = new SleepProcessorConfig(); - cfg.fromUserPayload(processorContext.getUserPayload()); + cfg.fromUserPayload(getContext().getUserPayload()); timeToSleepMS = cfg.getTimeToSleepMS(); } LOG.info("Initialized SleepProcessor, timeToSleepMS=" + timeToSleepMS); http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java index db89e91..f0f7594 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; @@ -44,12 +45,11 @@ import com.google.common.collect.Sets; * fail. It fails and exits if configured to do so. If not, then it calls * doRead() on all inputs to let them fail. */ -public class TestProcessor implements LogicalIOProcessor { +public class TestProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory .getLog(TestProcessor.class); Configuration conf; - TezTaskContext processorContext; boolean doFail = false; long sleepMs; @@ -88,7 +88,19 @@ public class TestProcessor implements LogicalIOProcessor { public static String TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX = "tez.failing-processor.verify-task-index"; - + + /** + * Constructor an instance of the LogicalProcessor. Classes extending this one to create a + * LogicalProcessor, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides + * the Processor with context information within the running task. + */ + public TestProcessor(TezProcessorContext context) { + super(context); + } + public static ProcessorDescriptor getProcDesc(byte[] payload) { return new ProcessorDescriptor(TestProcessor.class.getName()). setUserPayload(payload); @@ -96,7 +108,7 @@ public class TestProcessor implements LogicalIOProcessor { void throwException(String msg) { RuntimeException e = new RuntimeException(msg); - processorContext.fatalError(e , msg); + getContext().fatalError(e , msg); throw e; } @@ -110,15 +122,13 @@ public class TestProcessor implements LogicalIOProcessor { } @Override - public void initialize(TezProcessorContext processorContext) throws Exception { - this.processorContext = processorContext; - if (processorContext.getUserPayload() != null) { - String vName = processorContext.getTaskVertexName(); - conf = TezUtils.createConfFromUserPayload(processorContext - .getUserPayload()); + public void initialize() throws Exception { + if (getContext().getUserPayload() != null) { + String vName = getContext().getTaskVertexName(); + conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); verifyValue = conf.getInt( getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_VALUE, vName, - processorContext.getTaskIndex()), -1); + getContext().getTaskIndex()), -1); if (verifyValue != -1) { LOG.info("Verify value: " + verifyValue); for (String verifyIndex : conf @@ -143,7 +153,7 @@ public class TestProcessor implements LogicalIOProcessor { failingTaskAttemptUpto = conf.getInt( getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0); LOG.info("Adding failing attempt : " + failingTaskAttemptUpto + - " dag: " + processorContext.getDAGName()); + " dag: " + getContext().getDAGName()); } } } @@ -173,31 +183,31 @@ public class TestProcessor implements LogicalIOProcessor { if (doFail) { if ( (failingTaskIndices.contains(failAll) || - failingTaskIndices.contains(processorContext.getTaskIndex())) && + failingTaskIndices.contains(getContext().getTaskIndex())) && (failingTaskAttemptUpto == failAll.intValue() || - failingTaskAttemptUpto >= processorContext.getTaskAttemptNumber())) { - String msg = "FailingProcessor: " + processorContext.getUniqueIdentifier() + - " dag: " + processorContext.getDAGName() + - " taskIndex: " + processorContext.getTaskIndex() + - " taskAttempt: " + processorContext.getTaskAttemptNumber(); + failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) { + String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() + + " dag: " + getContext().getDAGName() + + " taskIndex: " + getContext().getTaskIndex() + + " taskAttempt: " + getContext().getTaskAttemptNumber(); LOG.info(msg); throwException(msg); } } if (inputs.entrySet().size() > 0) { - String msg = "Reading input of current FailingProcessor: " + processorContext.getUniqueIdentifier() + - " dag: " + processorContext.getDAGName() + - " vertex: " + processorContext.getTaskVertexName() + - " taskIndex: " + processorContext.getTaskIndex() + - " taskAttempt: " + processorContext.getTaskAttemptNumber(); + String msg = "Reading input of current FailingProcessor: " + getContext().getUniqueIdentifier() + + " dag: " + getContext().getDAGName() + + " vertex: " + getContext().getTaskVertexName() + + " taskIndex: " + getContext().getTaskIndex() + + " taskAttempt: " + getContext().getTaskAttemptNumber(); LOG.info(msg); } //initialize sum to attempt number + 1 - int sum = processorContext.getTaskAttemptNumber() + 1; - LOG.info("initializing vertex= " + processorContext.getTaskVertexName() + - " taskIndex: " + processorContext.getTaskIndex() + - " taskAttempt: " + processorContext.getTaskAttemptNumber() + + int sum = getContext().getTaskAttemptNumber() + 1; + LOG.info("initializing vertex= " + getContext().getTaskVertexName() + + " taskIndex: " + getContext().getTaskIndex() + + " taskAttempt: " + getContext().getTaskAttemptNumber() + " sum= " + sum); //sum = summation of input values for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) { @@ -213,11 +223,11 @@ public class TestProcessor implements LogicalIOProcessor { } if (outputs.entrySet().size() > 0) { - String msg = "Writing output of current FailingProcessor: " + processorContext.getUniqueIdentifier() + - " dag: " + processorContext.getDAGName() + - " vertex: " + processorContext.getTaskVertexName() + - " taskIndex: " + processorContext.getTaskIndex() + - " taskAttempt: " + processorContext.getTaskAttemptNumber(); + String msg = "Writing output of current FailingProcessor: " + getContext().getUniqueIdentifier() + + " dag: " + getContext().getDAGName() + + " vertex: " + getContext().getTaskVertexName() + + " taskIndex: " + getContext().getTaskIndex() + + " taskAttempt: " + getContext().getTaskAttemptNumber(); LOG.info(msg); } for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) { @@ -231,28 +241,28 @@ public class TestProcessor implements LogicalIOProcessor { output.write(sum); } - LOG.info("Output for DAG: " + processorContext.getDAGName() - + " vertex: " + processorContext.getTaskVertexName() - + " task: " + processorContext.getTaskIndex() - + " attempt: " + processorContext.getTaskAttemptNumber() + LOG.info("Output for DAG: " + getContext().getDAGName() + + " vertex: " + getContext().getTaskVertexName() + + " task: " + getContext().getTaskIndex() + + " attempt: " + getContext().getTaskAttemptNumber() + " is: " + sum); if (verifyTaskIndices - .contains(new Integer(processorContext.getTaskIndex()))) { + .contains(new Integer(getContext().getTaskIndex()))) { if (verifyValue != -1 && verifyValue != sum) { // expected output value set and not equal to observed value String msg = "Expected output mismatch of current FailingProcessor: " - + processorContext.getUniqueIdentifier() + - " dag: " + processorContext.getDAGName() + - " vertex: " + processorContext.getTaskVertexName() + - " taskIndex: " + processorContext.getTaskIndex() + - " taskAttempt: " + processorContext.getTaskAttemptNumber(); + + getContext().getUniqueIdentifier() + + " dag: " + getContext().getDAGName() + + " vertex: " + getContext().getTaskVertexName() + + " taskIndex: " + getContext().getTaskIndex() + + " taskAttempt: " + getContext().getTaskAttemptNumber(); msg += "\n" + "Expected output: " + verifyValue + " got: " + sum; throwException(msg); } else { - LOG.info("Verified output for DAG: " + processorContext.getDAGName() - + " vertex: " + processorContext.getTaskVertexName() + " task: " - + processorContext.getTaskIndex() + " attempt: " - + processorContext.getTaskAttemptNumber() + " is: " + sum); + LOG.info("Verified output for DAG: " + getContext().getDAGName() + + " vertex: " + getContext().getTaskVertexName() + " task: " + + getContext().getTaskIndex() + " attempt: " + + getContext().getTaskAttemptNumber() + " is: " + sum); } } }
