Updated Branches: refs/heads/apache-crunch-0.8 834260caa -> deca72853
CRUNCH-330: Add option for disabling named output counters. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/deca7285 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/deca7285 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/deca7285 Branch: refs/heads/apache-crunch-0.8 Commit: deca728532a18999c36348daafa9b15a49b73c81 Parents: 834260c Author: Josh Wills <[email protected]> Authored: Thu Jan 23 12:46:01 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Jan 23 13:53:43 2014 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/MultipleOutputIT.java | 34 ++++++++++++++++++++ .../org/apache/crunch/io/CrunchOutputs.java | 19 +++++++---- 2 files changed, 46 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/deca7285/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java index 96971f8..ffc09c3 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java @@ -19,6 +19,7 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.io.File; import java.io.FilenameFilter; @@ -27,9 +28,12 @@ import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.crunch.PipelineResult.StageResult; import org.apache.crunch.fn.Aggregators; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; +import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.To; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; @@ -98,6 +102,36 @@ public class MultipleOutputIT { assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size()); } + @Test + public void testCountersEnabled() throws IOException { + PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()), + WritableTypeFamily.getInstance()); + + assertEquals(1, result.getStageResults().size()); + StageResult stageResult = result.getStageResults().get(0); + + String counterGroup = CrunchOutputs.class.getName(); + assertEquals(3, stageResult.getCounterNames().get(counterGroup).size()); + assertEquals(1l, stageResult.getCounterValue(counterGroup, "out1")); + assertEquals(1l, stageResult.getCounterValue(counterGroup, "out2")); + assertEquals(0l, stageResult.getCounterValue(counterGroup, "out3")); + } + + @Test + public void testCountersDisabled() throws IOException { + Configuration configuration = tmpDir.getDefaultConfiguration(); + configuration.setBoolean(CrunchOutputs.CRUNCH_DISABLE_OUTPUT_COUNTERS, true); + + PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, configuration), + WritableTypeFamily.getInstance()); + + assertEquals(1, result.getStageResults().size()); + StageResult stageResult = result.getStageResults().get(0); + + assertFalse(stageResult.getCounterNames().containsKey(CrunchOutputs.CRUNCH_OUTPUTS)); + } + + public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { String inputPath = tmpDir.copyResourceFileName("letters.txt"); String outputPathEven = tmpDir.getFileName("even"); http://git-wip-us.apache.org/repos/asf/crunch/blob/deca7285/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index cd1ebce..55fcc89 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -42,7 +42,8 @@ import java.util.Map; */ public class CrunchOutputs<K, V> { public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir"; - + public static final String CRUNCH_DISABLE_OUTPUT_COUNTERS = "crunch.disable.output.counters"; + private static final char RECORD_SEP = ','; private static final char FIELD_SEP = ';'; private static final Joiner JOINER = Joiner.on(FIELD_SEP); @@ -98,11 +99,12 @@ public class CrunchOutputs<K, V> { private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; private static final String COUNTERS_GROUP = CrunchOutputs.class.getName(); - private TaskInputOutputContext<?, ?, K, V> baseContext; - private Map<String, OutputConfig> namedOutputs; - private Map<String, RecordWriter<K, V>> recordWriters; - private Map<String, TaskAttemptContext> taskContextCache; - + private final TaskInputOutputContext<?, ?, K, V> baseContext; + private final Map<String, OutputConfig> namedOutputs; + private final Map<String, RecordWriter<K, V>> recordWriters; + private final Map<String, TaskAttemptContext> taskContextCache; + private final boolean disableOutputCounters; + /** * Creates and initializes multiple outputs support, * it should be instantiated in the Mapper/Reducer setup method. @@ -114,6 +116,7 @@ public class CrunchOutputs<K, V> { namedOutputs = getNamedOutputs(context); recordWriters = Maps.newHashMap(); taskContextCache = Maps.newHashMap(); + this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); } @SuppressWarnings("unchecked") @@ -124,7 +127,9 @@ public class CrunchOutputs<K, V> { namedOutput + "'"); } TaskAttemptContext taskContext = getContext(namedOutput); - baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1); + if (!disableOutputCounters) { + baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1); + } getRecordWriter(taskContext, namedOutput).write(key, value); }
