CRUNCH-185 - Make the MemPipeline#done return a PipelineResult with the Counters inside
Signed-off-by: tzolov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/241b86d2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/241b86d2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/241b86d2 Branch: refs/heads/master Commit: 241b86d2f21c0cc2e569eeb5e8f35ce0f87e95d3 Parents: b34c2f2 Author: tzolov <[email protected]> Authored: Wed Mar 27 00:18:26 2013 +0100 Committer: tzolov <[email protected]> Committed: Wed Mar 27 09:26:44 2013 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/StageResultsCountersIT.java | 135 +++++++++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 6 +- 2 files changed, 140 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/241b86d2/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java b/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java new file mode 100644 index 0000000..19fc302 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/StageResultsCountersIT.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.crunch.PipelineResult.StageResult; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.hadoop.mapreduce.Counter; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class StageResultsCountersIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + public static HashSet<String> SPECIAL_KEYWORDS = Sets.newHashSet("AND", "OR", "NOT"); + + public static String KEYWORDS_COUNTER_GROUP = "KEYWORDS_COUNTER_GROUP"; + + @After + public void after() { + MemPipeline.clearCounters(); + } + + @Test + public void testStageResultsCountersMRWritables() throws Exception { + testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()), + WritableTypeFamily.getInstance()); + } + + @Test + public void testStageResultsCountersMRAvro() throws Exception { + testSpecialKeywordCount(new MRPipeline(StageResultsCountersIT.class, tmpDir.getDefaultConfiguration()), + AvroTypeFamily.getInstance()); + } + + @Test + public void testStageResultsCountersMemWritables() throws Exception { + testSpecialKeywordCount(MemPipeline.getInstance(), WritableTypeFamily.getInstance()); + } + + @Test + public void testStageResultsCountersMemAvro() throws Exception { + testSpecialKeywordCount(MemPipeline.getInstance(), AvroTypeFamily.getInstance()); + } + + public void testSpecialKeywordCount(Pipeline pipeline, PTypeFamily tf) throws Exception { + + String rowsInputPath = tmpDir.copyResourceFileName("shakes.txt"); + + PipelineResult result = coutSpecialKeywords(pipeline, rowsInputPath, tf); + + assertTrue(result.succeeded()); + + Map<String, Long> keywordsMap = countersToMap(result.getStageResults(), KEYWORDS_COUNTER_GROUP); + + assertEquals(3, keywordsMap.size()); + + assertEquals("{NOT=157, AND=596, OR=81}", keywordsMap.toString()); + } + + private static PipelineResult coutSpecialKeywords(Pipeline pipeline, String inputFileName, PTypeFamily tf) { + + pipeline.read(From.textFile(inputFileName)).parallelDo(new DoFn<String, Void>() { + + @Override + public void process(String text, Emitter<Void> emitter) { + + if (!StringUtils.isBlank(text)) { + + String[] tokens = text.toUpperCase().split("\\s"); + + for (String token : tokens) { + if (SPECIAL_KEYWORDS.contains(token)) { + getCounter(KEYWORDS_COUNTER_GROUP, token).increment(1); + } + } + } + } + }, tf.nulls()).materialize(); // TODO can we avoid the materialize ? + + return pipeline.done(); + } + + private static Map<String, Long> countersToMap(List<StageResult> stages, String counterGroupName) { + + Map<String, Long> countersMap = Maps.newHashMap(); + + for (StageResult sr : stages) { + Iterator<Counter> iterator = sr.getCounters().getGroup(counterGroupName).iterator(); + while (iterator.hasNext()) { + Counter counter = (Counter) iterator.next(); + countersMap.put(counter.getDisplayName(), counter.getValue()); + } + } + + return countersMap; + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/241b86d2/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index d7c1a4f..af5f122 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -66,6 +66,10 @@ public class MemPipeline implements Pipeline { return COUNTERS; } + public static Counters clearCounters() { + return COUNTERS = new Counters(); + } + public static Pipeline getInstance() { return INSTANCE; } @@ -254,7 +258,7 @@ public class MemPipeline implements Pipeline { @Override public PipelineResult run() { activeTargets.clear(); - return PipelineResult.EMPTY; + return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS))); } @Override
