TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. Contributed by Kuhu Shukla.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dceb365c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dceb365c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dceb365c Branch: refs/heads/TEZ-3334 Commit: dceb365c2a82b2252463bdf0374bb2f5ee944ecc Parents: 2737c5d Author: Siddharth Seth <[email protected]> Authored: Fri Oct 7 10:20:42 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Oct 7 10:20:42 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ProgressHelper.java | 89 ++++++++++++++++++++ .../tez/runtime/api/AbstractLogicalInput.java | 2 +- .../tez/runtime/api/MergedLogicalInput.java | 2 +- .../runtime/api/ProgressFailedException.java | 46 ++++++++++ .../org/apache/tez/mapreduce/input/MRInput.java | 9 +- .../tez/mapreduce/input/MRInputLegacy.java | 3 +- .../mapreduce/processor/map/MapProcessor.java | 54 +++++------- .../processor/reduce/ReduceProcessor.java | 38 ++------- .../tez/mapreduce/processor/MapUtils.java | 9 +- .../processor/map/TestMapProcessor.java | 78 ++--------------- .../processor/reduce/TestReduceProcessor.java | 2 +- .../api/impl/TezProcessorContextImpl.java | 2 +- .../common/readers/UnorderedKVReader.java | 26 +++--- .../common/shuffle/impl/ShuffleManager.java | 4 +- .../input/ConcatenatedMergedKeyValueInput.java | 11 ++- .../input/ConcatenatedMergedKeyValuesInput.java | 11 ++- .../library/input/OrderedGroupedKVInput.java | 3 +- .../input/OrderedGroupedMergedKVInput.java | 3 +- .../runtime/library/input/UnorderedKVInput.java | 9 +- .../library/processor/SimpleProcessor.java | 42 ++------- .../library/processor/SleepProcessor.java | 39 ++------- .../processor/FilterByWordInputProcessor.java | 39 ++------- .../processor/FilterByWordOutputProcessor.java | 3 +- 24 files changed, 253 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b8c9d51..63eda94 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. TEZ-3460. Fix precommit release audit warning. TEZ-3368. NPE in DelayedContainerManager TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java new file mode 100644 index 0000000..407a20e --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.runtime.api.AbstractLogicalInput; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.ProgressFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ProgressHelper { + private static final Logger LOG = LoggerFactory.getLogger(ProgressHelper.class); + private String processorName; + protected final Map<String, LogicalInput> inputs; + final ProcessorContext processorContext; + + volatile ScheduledExecutorService scheduledExecutorService; + Runnable monitorProgress = new Runnable() { + @Override + public void run() { + try { + float progSum = 0.0f; + float progress; + if (inputs != null && inputs.size() != 0) { + for (LogicalInput input : inputs.values()) { + if (input instanceof AbstractLogicalInput) { + progSum += ((AbstractLogicalInput) input).getProgress(); + } + } + progress = (1.0f) * progSum / inputs.size(); + } else { + progress = 1.0f; + } + processorContext.setProgress(progress); + } catch (ProgressFailedException pe) { + LOG.warn("Encountered ProgressFailedException during Processor progress update" + + pe); + } catch (InterruptedException ie) { + LOG.warn("Encountered InterruptedException during Processor progress update" + + ie); + } + } + }; + + public ProgressHelper(Map<String, LogicalInput> _inputs, ProcessorContext context, String processorName) { + this.inputs = _inputs; + this.processorContext = context; + this.processorName = processorName; + } + + public void scheduleProgressTaskService(long delay, long period) { + scheduledExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("TaskProgressService{" + processorName+ ":" + processorContext.getTaskVertexName() + + "} #%d").build()); + scheduledExecutorService.scheduleWithFixedDelay(monitorProgress, delay, period, + TimeUnit.MILLISECONDS); + } + + public void shutDownProgressTaskService() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java index 4c95eb9..a97f3fa 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java @@ -82,7 +82,7 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput return inputContext; } - public float getProgress() throws IOException, InterruptedException { + public float getProgress() throws ProgressFailedException, InterruptedException { return 0.0f; } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java index 3195a17..e3c3624 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java @@ -94,7 +94,7 @@ public abstract class MergedLogicalInput implements LogicalInput { */ public abstract void setConstituentInputIsReady(Input input); - public float getProgress() throws IOException, InterruptedException { + public float getProgress() throws ProgressFailedException, InterruptedException { return 0.0f; } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java new file mode 100644 index 0000000..07995cc --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.dag.api.TezException; + +@Public +@Evolving +/** + * Exception invoked when getProgress fails + */ +public class ProgressFailedException extends TezException { + + private static final long serialVersionUID = -114180015419275775L; + + public ProgressFailedException() { + super("Progress update failed"); + } + + public ProgressFailedException(Throwable cause) { + super("Progress update failed", cause); + } + + public ProgressFailedException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index b83d1a3..1b0ffed 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -612,8 +613,12 @@ public class MRInput extends MRInputBase { } @Override - public float getProgress() throws IOException,InterruptedException { - return (mrReader != null) ? mrReader.getProgress() : 0.0f; + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return (mrReader != null) ? mrReader.getProgress() : 0.0f; + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } } void processSplitEvent(InputDataInformationEvent event) http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java index 9b5ed1c..70be7ee 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -87,7 +88,7 @@ public class MRInputLegacy extends MRInput { return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader(); } - public float getProgress() throws IOException, InterruptedException { + public float getProgress() throws ProgressFailedException, InterruptedException { return super.getProgress(); } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java index 9888cd4..5cec62c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.tez.common.ProgressHelper; +import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,7 +46,6 @@ import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutputLegacy; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; -import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; @@ -63,32 +62,7 @@ public class MapProcessor extends MRTask{ protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; - Timer progressTimer = new Timer(); - TimerTask progressTask = new TimerTask() { - - @Override - public void run() { - try { - float progSum = 0.0f; - if (inputs != null && inputs.size() != 0) { - for(LogicalInput input : inputs.values()) { - if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); - } - } - float progress = (1.0f) * progSum / inputs.size(); - getContext().setProgress(progress); - mrReporter.setProgress(progress); - } - } catch (IOException e) { - LOG.warn("Encountered IOException during Processor progress update" - + e.getMessage()); - } catch (InterruptedException e) { - LOG.warn("Encountered InterruptedException during Processor progress" - + "update" + e.getMessage()); - } - } - }; + private ProgressHelper progressHelper; public MapProcessor(ProcessorContext processorContext) { super(processorContext, true); @@ -101,8 +75,9 @@ public class MapProcessor extends MRTask{ } public void close() throws IOException { - progressTimer.cancel(); - + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -110,6 +85,7 @@ public class MapProcessor extends MRTask{ Map<String, LogicalOutput> _outputs) throws Exception { this.inputs = _inputs; this.outputs = _outputs; + progressHelper = new ProgressHelper(this.inputs, getContext(), this.getClass().getSimpleName()); LOG.info("Running map: " + processorContext.getUniqueIdentifier()); if (_inputs.size() != 1 @@ -130,7 +106,7 @@ public class MapProcessor extends MRTask{ LogicalOutput out = _outputs.values().iterator().next(); initTask(out); - progressTimer.schedule(progressTask, 0, 100); + progressHelper.scheduleProgressTaskService(0, 100); // Sanity check if (!(in instanceof MRInputLegacy)) { throw new IOException(new TezException( @@ -315,7 +291,17 @@ public class MapProcessor extends MRTask{ @Override public float getProgress() throws IOException, InterruptedException { - return in.getProgress(); + try { + return in.getProgress(); + } catch (ProgressFailedException e) { + if (e.getCause() instanceof IOException) { + throw (IOException)e.getCause(); + } + if (e.getCause() instanceof InterruptedException) { + throw (InterruptedException)e.getCause(); + } + } + throw new RuntimeException("Could not get Processor progress"); } @Override @@ -366,6 +352,8 @@ public class MapProcessor extends MRTask{ public float getProgress() throws IOException { try { return mrInput.getProgress(); + } catch (ProgressFailedException pe) { + throw new IOException(pe); } catch (InterruptedException ie) { throw new IOException(ie); } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java index 8ec6091..4b79c78 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java @@ -22,9 +22,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.tez.common.ProgressHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -44,7 +43,6 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.output.MROutputLegacy; import org.apache.tez.mapreduce.processor.MRTask; import org.apache.tez.mapreduce.processor.MRTaskReporter; -import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; @@ -68,32 +66,7 @@ public class ReduceProcessor extends MRTask { protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; - Timer progressTimer = new Timer(); - TimerTask progressTask = new TimerTask() { - - @Override - public void run() { - try { - float progSum = 0.0f; - if (inputs != null && inputs.size() != 0) { - for(LogicalInput input : inputs.values()) { - if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); - } - } - float progress = (1.0f) * progSum / inputs.size(); - getContext().setProgress(progress); - mrReporter.setProgress(progress); - } - } catch (IOException e) { - LOG.warn("Encountered IOException during Processor progress update" - + e.getMessage()); - } catch (InterruptedException e) { - LOG.warn("Encountered InterruptedException during Processor progress" - + "update" + e.getMessage()); - } - } - }; + private ProgressHelper progressHelper; public ReduceProcessor(ProcessorContext processorContext) { super(processorContext, false); @@ -106,7 +79,9 @@ public class ReduceProcessor extends MRTask { } public void close() throws IOException { - progressTimer.cancel(); + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @@ -115,6 +90,7 @@ public class ReduceProcessor extends MRTask { Map<String, LogicalOutput> _outputs) throws Exception { this.inputs = _inputs; this.outputs = _outputs; + progressHelper = new ProgressHelper(this.inputs, processorContext, this.getClass().getSimpleName()); LOG.info("Running reduce: " + processorContext.getUniqueIdentifier()); if (_outputs.size() <= 0 || _outputs.size() > 1) { @@ -139,7 +115,7 @@ public class ReduceProcessor extends MRTask { out.start(); initTask(out); - progressTimer.schedule(progressTask, 0, 100); + progressHelper.scheduleProgressTaskService(0, 100); this.statusUpdate(); Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf); http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 8309966..b69dc0c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -118,7 +118,7 @@ public class MapUtils { } private static InputSplit - createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) + createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file, int numKVs) throws IOException { FileInputFormat.setInputPaths(job, workDir); @@ -132,7 +132,7 @@ public class MapUtils { Random r = new Random(System.currentTimeMillis()); LongWritable key = new LongWritable(); Text value = new Text(); - for (int i = 10; i > 0; i--) { + for (int i = numKVs; i > 0; i--) { key.set(r.nextInt(1000)); value.set(Integer.toString(i)); writer.append(key, value); @@ -189,9 +189,10 @@ public class MapUtils { outMeta.close(); } - public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput) throws IOException { + public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput, + int numKVs) throws IOException { jobConf.setInputFormat(SequenceFileInputFormat.class); - InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput); + InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput, numKVs); writeSplitFiles(fs, jobConf, split); } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 96827ff..7c5e2a7 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.db.FloatSplitter; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; import org.junit.Assert; @@ -149,7 +150,7 @@ public class TestMapProcessor { Path mapInput = new Path(workDir, "map0"); - MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput); + MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10); InputSpec mapInputSpec = new InputSpec("NullSrcVertex", InputDescriptor.create(MRInputLegacy.class.getName()) @@ -221,7 +222,7 @@ public class TestMapProcessor { Path mapInput = new Path(workDir, "map0"); - generateInputSplit(localFs, workDir, jobConf, mapInput); + MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000); InputSpec mapInputSpec = new InputSpec("NullSrcVertex", InputDescriptor.create(MRInputLegacy.class.getName()) @@ -246,84 +247,17 @@ public class TestMapProcessor { @Override public void run() { float prog = task.getProgress(); - if(prog > 0.0 && prog < 1.0) + if(prog > 0.0f && prog < 1.0f) progressUpdate = prog; } }); task.initialize(); - scheduler.scheduleAtFixedRate(monitorProgress, 0, 10, + scheduler.scheduleAtFixedRate(monitorProgress, 0, 1, TimeUnit.MILLISECONDS); task.run(); Assert.assertTrue("Progress Updates should be captured!", - progressUpdate != 0.0f); + progressUpdate > 0.0f && progressUpdate < 1.0f); task.close(); } - - public static void generateInputSplit(FileSystem fs, Path workDir, - JobConf jobConf, Path mapInput) - throws IOException { - jobConf.setInputFormat(SequenceFileInputFormat.class); - FileInputFormat.setInputPaths(jobConf, workDir); - - LOG.info("Generating data at path: " + mapInput); - // create a file with length entries - SequenceFile.Writer writer = - SequenceFile.createWriter(fs, jobConf, mapInput, - LongWritable.class, Text.class); - try { - Random r = new Random(System.currentTimeMillis()); - LongWritable key = new LongWritable(); - Text value = new Text(); - for (int i = 100000; i > 0; i--) { - key.set(r.nextInt(1000)); - value.set(Integer.toString(i)); - writer.append(key, value); - LOG.info("<k, v> : <" + key.get() + ", " + value + ">"); - } - } finally { - writer.close(); - } - - SequenceFileInputFormat<LongWritable, Text> format = - new SequenceFileInputFormat<LongWritable, Text>(); - InputSplit[] splits = format.getSplits(jobConf, 1); - System.err.println("#split = " + splits.length + " ; " + - "#locs = " + splits[0].getLocations().length + "; " + - "loc = " + splits[0].getLocations()[0] + "; " + - "off = " + splits[0].getLength() + "; " + - "file = " + ((FileSplit)splits[0]).getPath()); - writeSplitFiles(fs, jobConf, splits[0]); - } - - private static void writeSplitFiles(FileSystem fs, JobConf conf, - InputSplit split) throws IOException { - Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, - MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT); - LOG.info("Writing split to: " + jobSplitFile); - FSDataOutputStream out = FileSystem.create(fs, jobSplitFile, - new FsPermission(JOB_FILE_PERMISSION)); - - long offset = out.getPos(); - Text.writeString(out, split.getClass().getName()); - split.write(out); - out.close(); - - String[] locations = split.getLocations(); - - JobSplit.SplitMetaInfo info = null; - info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); - - Path jobSplitMetaInfoFile = new Path( - conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR), - MRJobConfig.JOB_SPLIT_METAINFO); - - FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile, - new FsPermission(JOB_FILE_PERMISSION)); - outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER); - WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION); - WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written - info.write(outMeta); - outMeta.close(); - } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 1922c53..ca3792f 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -139,7 +139,7 @@ public class TestReduceProcessor { jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); Path mapInput = new Path(workDir, "map0"); - MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput); + MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10); InputSpec mapInputSpec = new InputSpec("NullSrcVertex", InputDescriptor.create(MRInputLegacy.class.getName()) http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index c00b977..d03f48e 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -92,7 +92,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce @Override public void setProgress(float progress) { - if (runtimeTask.getProgress() != progress) { + if (Math.abs(progress - runtimeTask.getProgress()) >= 0.001f) { runtimeTask.setProgress(progress); notifyProgress(); } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index 3f44c4f..f4400db 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.readers; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.IOInterruptedException; @@ -73,8 +74,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { // TODO Remove this once per I/O counters are separated properly. Relying on // the counter at the moment will generate aggregate numbers. private int numRecordsRead = 0; - private long totalBytesRead = 0; - private long totalFileBytes = 0; + private final AtomicLong totalBytesRead = new AtomicLong(0); + private final AtomicLong totalFileBytes = new AtomicLong(0); public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf, @@ -148,16 +149,15 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { return value; } - public float getProgress() { - int numInputs = shuffleManager.getNumInputs(); - if (totalFileBytes > 0 && numInputs > 0) { - return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead : - 0l)) / - totalFileBytes) * ( - shuffleManager.getNumCompletedInputs().floatValue() / - (1.0f * numInputs)); + public float getProgress() throws IOException, InterruptedException { + final int numInputs = shuffleManager.getNumInputs(); + if (totalFileBytes.get() > 0 && numInputs > 0) { + return ((1.0f) * (totalBytesRead.get() + ((currentReader != null) ? currentReader.bytesRead : + 0.0f)) / + totalFileBytes.get()) * (shuffleManager.getNumCompletedInputsFloat() / + (1.0f * numInputs)); } - return 0l; + return 0.0f; } /** * Tries reading the next key and value from the current reader. @@ -189,7 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { */ private boolean moveToNextInput() throws IOException { if (currentReader != null) { // Close the current reader. - totalBytesRead += currentReader.bytesRead; + totalBytesRead.getAndAdd(currentReader.bytesRead); currentReader.close(); /** * clear reader explicitly. Otherwise this could point to stale reference when next() is @@ -210,7 +210,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { return false; // No more inputs } else { currentReader = openIFileReader(currentFetchedInput); - totalFileBytes += currentReader.getLength(); + totalFileBytes.getAndAdd(currentReader.getLength()); return true; } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index a726924..1ebd3a4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -880,8 +880,8 @@ public class ShuffleManager implements FetcherCallback { return numInputs; } - public AtomicInteger getNumCompletedInputs() { - return numCompletedInputs; + public float getNumCompletedInputsFloat() { + return numCompletedInputs.floatValue(); } /////////////////// End of methods for walking the available inputs http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index 743b628..a0059cf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; +import org.apache.tez.runtime.api.ProgressFailedException; import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.api.MergedInputContext; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -89,7 +90,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { return currentReader.getCurrentValue(); } - public float getProgress() { + public float getProgress() throws IOException, InterruptedException { return (1.0f)*(currentReaderIndex + 1)/getInputs().size(); } } @@ -110,7 +111,11 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { } @Override - public float getProgress() throws IOException, InterruptedException { - return concatenatedMergedKeyValueReader.getProgress(); + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return concatenatedMergedKeyValueReader.getProgress(); + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index fa51f47..2555a57 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; +import org.apache.tez.runtime.api.ProgressFailedException; import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.api.MergedInputContext; import org.apache.tez.runtime.library.api.KeyValuesReader; @@ -91,7 +92,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { return currentReader.getCurrentValues(); } - public float getProgress() { + public float getProgress() throws IOException, InterruptedException { return (1.0f)*(currentReaderIndex + 1)/getInputs().size(); } } @@ -112,7 +113,11 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { } @Override - public float getProgress() throws IOException, InterruptedException { - return concatenatedMergedKeyValuesReader.getProgress(); + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return concatenatedMergedKeyValuesReader.getProgress(); + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index c86e2fb..8e653ed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import org.apache.tez.runtime.api.ProgressFailedException; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.apache.tez.runtime.library.common.Constants; import org.slf4j.Logger; @@ -267,7 +268,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { } @Override - public float getProgress() throws IOException, InterruptedException { + public float getProgress() throws ProgressFailedException, InterruptedException { int totalInputs = getNumPhysicalInputs(); if (totalInputs != 0) { synchronized (this) { http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java index 5d6668d..49d4043 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -250,7 +251,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput { } } } - public float getProgress() throws IOException, InterruptedException { + public float getProgress() throws ProgressFailedException, InterruptedException { float totalProgress = 0.0f; for(Input input : getInputs()) { totalProgress += ((OrderedGroupedKVInput)input).getProgress(); http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index f83b9aa..62d7b99 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.runtime.api.ProgressFailedException; import org.apache.tez.runtime.library.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -292,7 +293,11 @@ public class UnorderedKVInput extends AbstractLogicalInput { } @Override - public float getProgress() { - return kvReader.getProgress(); + public float getProgress() throws ProgressFailedException, InterruptedException { + try { + return kvReader.getProgress(); + } catch (IOException e) { + throw new ProgressFailedException("getProgress encountered IOException ", e); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java index 66c3625..c237bc1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java @@ -17,15 +17,12 @@ */ package org.apache.tez.runtime.library.processor; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.apache.tez.runtime.api.AbstractLogicalInput; +import org.apache.tez.common.ProgressHelper; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; @@ -49,31 +46,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; - Timer progressTimer = new Timer(); - TimerTask progressTask = new TimerTask() { - - @Override - public void run() { - try { - float progSum = 0.0f; - if (getInputs() != null) { - for(LogicalInput input : getInputs().values()) { - if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); - } - } - float progress = (1.0f) * progSum / inputs.size(); - getContext().setProgress(progress); - } - } catch (IOException e) { - LOG.warn("Encountered IOException during Processor progress update" - + e.getMessage()); - } catch (InterruptedException e) { - LOG.warn("Encountered InterruptedException during Processor progress" - + "update" + e.getMessage()); - } - } - }; + protected ProgressHelper progressHelper; public SimpleProcessor(ProcessorContext context) { super(context); @@ -83,6 +56,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { throws Exception { this.inputs = _inputs; this.outputs = _outputs; + progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName()); preOp(); run(); postOp(); @@ -106,7 +80,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { for (LogicalInput input : getInputs().values()) { input.start(); } - progressTimer.schedule(progressTask, 0, 100); + progressHelper.scheduleProgressTaskService(0, 100); } if (getOutputs() != null) { for (LogicalOutput output : getOutputs().values()) { @@ -136,7 +110,9 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - progressTimer.cancel(); + if( progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } public Map<String, LogicalInput> getInputs() { @@ -146,8 +122,4 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor { public Map<String, LogicalOutput> getOutputs() { return outputs; } - - public Timer getProgressTimer() { - return progressTimer; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java index 92bbce8..3efcd21 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java @@ -18,21 +18,18 @@ package org.apache.tez.runtime.library.processor; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import com.google.common.base.Charsets; +import org.apache.tez.common.ProgressHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; @@ -53,32 +50,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { private int timeToSleepMS; protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; - - Timer progressTimer = new Timer(); - TimerTask progressTask = new TimerTask() { - - @Override - public void run() { - try { - float progSum = 0.0f; - if (inputs != null) { - for(LogicalInput input : inputs.values()) { - if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); - } - } - float progress = (1.0f) * progSum / inputs.size(); - getContext().setProgress(progress); - } - } catch (IOException e) { - LOG.warn("Encountered IOException during Processor progress update" + - e.getMessage()); - } catch (InterruptedException e) { - LOG.warn("Encountered InterruptedException during Processor progress" + - "update" + e.getMessage()); - } - } - }; + private ProgressHelper progressHelper; public SleepProcessor(ProcessorContext context) { super(context); @@ -105,12 +77,13 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { Map<String, LogicalOutput> _outputs) throws Exception { inputs = _inputs; outputs = _outputs; + progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName()); LOG.info("Running the Sleep Processor, sleeping for " + timeToSleepMS + " ms"); for (LogicalInput input : _inputs.values()) { input.start(); } - progressTimer.schedule(progressTask, 0, 100); + progressHelper.scheduleProgressTaskService(0, 100); for (LogicalOutput output : _outputs.values()) { output.start(); } @@ -128,7 +101,9 @@ public class SleepProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - progressTimer.cancel(); + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } /** http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java index 513c782..15d6e82 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java @@ -18,12 +18,10 @@ package org.apache.tez.mapreduce.examples.processor; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.tez.common.ProgressHelper; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +34,6 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; -import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; @@ -53,31 +50,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { private String filterWord; protected Map<String, LogicalInput> inputs; protected Map<String, LogicalOutput> outputs; - Timer progressTimer = new Timer(); - TimerTask progressTask = new TimerTask() { - - @Override - public void run() { - try { - float progSum = 0.0f; - if (inputs != null) { - for(LogicalInput input : inputs.values()) { - if (input instanceof AbstractLogicalInput) { - progSum += ((AbstractLogicalInput) input).getProgress(); - } - } - float progress = (1.0f) * progSum / inputs.size(); - getContext().setProgress(progress); - } - } catch (IOException e) { - LOG.warn("Encountered IOException during Processor progress update" + - e.getMessage()); - } catch (InterruptedException e) { - LOG.warn("Encountered InterruptedException during Processor progress" + - "update" + e.getMessage()); - } - } - }; + private ProgressHelper progressHelper; public FilterByWordInputProcessor(ProcessorContext context) { super(context); @@ -101,7 +74,9 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { @Override public void close() throws Exception { - progressTimer.cancel(); + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -109,7 +84,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { Map<String, LogicalOutput> _outputs) throws Exception { this.inputs = _inputs; this.outputs = _outputs; - + this.progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName()); if (_inputs.size() != 1) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input"); } @@ -134,7 +109,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { if (! (lo instanceof UnorderedKVOutput)) { throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput"); } - progressTimer.schedule(progressTask, 0, 100); + progressHelper.scheduleProgressTaskService(0, 100); MRInputLegacy mrInput = (MRInputLegacy) li; mrInput.init(); UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo; http://git-wip-us.apache.org/repos/asf/tez/blob/dceb365c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java index 5872527..7acaf7e 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.examples.processor; import java.util.List; +import org.apache.tez.common.ProgressHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.mapreduce.output.MROutput; @@ -51,7 +52,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor { @Override public void close() throws Exception { LOG.info("Broadcast Output Processor closing. Nothing to do"); - getProgressTimer().cancel(); + super.close(); } @Override
