Repository: crunch Updated Branches: refs/heads/master 02828f8f0 -> 2a72ef1f7
CRUNCH-579: Supported access to counters from original TaskContext Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2a72ef1f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2a72ef1f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2a72ef1f Branch: refs/heads/master Commit: 2a72ef1f792654b8091bdb789e4cdb07d537a1fc Parents: 02828f8 Author: mkwhitacre <[email protected]> Authored: Sun Nov 22 18:07:30 2015 -0600 Committer: Micah Whitacre <[email protected]> Committed: Tue Apr 19 15:18:43 2016 -0500 ---------------------------------------------------------------------- .../org/apache/crunch/io/CrunchOutputs.java | 24 +++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2a72ef1f/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index a9621ba..2e8dc8d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -22,6 +22,7 @@ import org.apache.crunch.CrunchRuntimeException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -30,6 +31,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -219,7 +221,7 @@ public class CrunchOutputs<K, V> { baseTaskId.isMap(), baseTaskId.getTaskID().getId(), baseTaskId.getId()); - return new TaskAttemptContextImpl(job.getConfiguration(), taskId); + return new TaskAttemptContextWrapper(baseContext, job.getConfiguration(), taskId); } private static void setJobID(Job job, JobID jobID, String namedOutput) { @@ -361,4 +363,24 @@ public class CrunchOutputs<K, V> { } } } + + private static class TaskAttemptContextWrapper extends TaskAttemptContextImpl { + + private final TaskAttemptContext baseContext; + + public TaskAttemptContextWrapper(TaskAttemptContext baseContext, Configuration config, TaskAttemptID taskId){ + super(config, taskId); + this.baseContext = baseContext; + } + + @Override + public Counter getCounter(Enum<?> counterName) { + return baseContext.getCounter(counterName); + } + + @Override + public Counter getCounter(String groupName, String counterName) { + return baseContext.getCounter(groupName, counterName); + } + } }
