Updated Branches: refs/heads/master 36bde4162 -> 3eb5f0a8a
CRUNCH-231: Support legacy Mappers and Reducers in Crunch. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3eb5f0a8 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3eb5f0a8 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3eb5f0a8 Branch: refs/heads/master Commit: 3eb5f0a8a7b0918e79232a301ad3db8af0756e9b Parents: 36bde41 Author: Josh Wills <[email protected]> Authored: Fri Jun 28 20:30:56 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jul 22 15:22:12 2013 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/lib/MapredIT.java | 134 ++++++++ .../java/org/apache/crunch/lib/MapreduceIT.java | 120 +++++++ .../main/java/org/apache/crunch/lib/Mapred.java | 301 +++++++++++++++++ .../java/org/apache/crunch/lib/Mapreduce.java | 335 +++++++++++++++++++ .../java/org/apache/crunch/lib/MapredTest.java | 134 ++++++++ .../org/apache/crunch/lib/MapreduceTest.java | 117 +++++++ 6 files changed, 1141 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java new file mode 100644 index 0000000..7c09790 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.PipelineResult.StageResult; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; + +public class MapredIT extends CrunchTestSupport implements Serializable { + private static class TestMapper implements Mapper<IntWritable, Text, Text, LongWritable> { + @Override + public void configure(JobConf arg0) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void map(IntWritable k, Text v, OutputCollector<Text, LongWritable> out, + Reporter reporter) throws IOException { + reporter.getCounter("written", "out").increment(1L); + out.collect(v, new LongWritable(v.getLength())); + } + } + + private static class TestReducer implements Reducer<IntWritable, Text, Text, LongWritable> { + + @Override + public void configure(JobConf arg0) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void reduce(IntWritable key, Iterator<Text> iter, + OutputCollector<Text, LongWritable> out, Reporter reporter) throws IOException { + boolean hasThou = false; + String notThou = ""; + while (iter.hasNext()) { + String next = iter.next().toString(); + if (next != null && next.contains("thou")) { + reporter.getCounter("thou", "count").increment(1); + hasThou = true; + } else { + notThou = next; + } + } + out.collect(new Text(notThou), hasThou ? new LongWritable(1L) : new LongWritable(0L)); + } + } + + @Test + public void testMapper() throws Exception { + Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration()); + Path shakesPath = tempDir.copyResourcePath("shakes.txt"); + PCollection<String> in = p.read(From.textFile(shakesPath)); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(String input) { + return Pair.of(new IntWritable(input.length()), new Text(input)); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<Text, LongWritable> out = Mapred.map(two, TestMapper.class, Text.class, LongWritable.class); + out.write(To.sequenceFile(tempDir.getPath("temp"))); + PipelineResult res = p.done(); + assertEquals(1, res.getStageResults().size()); + StageResult sr = res.getStageResults().get(0); + assertEquals(3667, sr.getCounters().findCounter("written", "out").getValue()); + } + + @Test + public void testReducer() throws Exception { + Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration()); + Path shakesPath = tempDir.copyResourcePath("shakes.txt"); + PCollection<String> in = p.read(From.textFile(shakesPath)); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(String input) { + return Pair.of(new IntWritable(input.length()), new Text(input)); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<Text, LongWritable> out = Mapred.reduce(two.groupByKey(), TestReducer.class, Text.class, LongWritable.class); + out.write(To.sequenceFile(tempDir.getPath("temp"))); + PipelineResult res = p.done(); + assertEquals(1, res.getStageResults().size()); + StageResult sr = res.getStageResults().get(0); + assertEquals(108, sr.getCounters().findCounter("thou", "count").getValue()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java new file mode 100644 index 0000000..ab453e0 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.PipelineResult.StageResult; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.junit.Before; +import org.junit.Test; + +public class MapreduceIT extends CrunchTestSupport implements Serializable { + private static class TestMapper extends Mapper<IntWritable, Text, IntWritable, Text> { + @Override + protected void map(IntWritable k, Text v, Mapper<IntWritable, Text, IntWritable, Text>.Context ctxt) { + try { + ctxt.getCounter("written", "out").increment(1L); + ctxt.write(new IntWritable(v.getLength()), v); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static class TestReducer extends Reducer<IntWritable, Text, Text, LongWritable> { + protected void reduce(IntWritable key, Iterable<Text> values, + org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, Text, LongWritable>.Context ctxt) { + boolean hasWhere = false; + String notWhere = ""; + for (Text t : values) { + String next = t.toString(); + if (next.contains("where")) { + hasWhere = true; + ctxt.getCounter("words", "where").increment(1); + } else { + notWhere = next; + } + } + try { + ctxt.write(new Text(notWhere), hasWhere ? new LongWritable(1L) : new LongWritable(0L)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Test + public void testMapper() throws Exception { + Pipeline p = new MRPipeline(MapreduceIT.class, tempDir.getDefaultConfiguration()); + Path shakesPath = tempDir.copyResourcePath("shakes.txt"); + PCollection<String> in = p.read(From.textFile(shakesPath)); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(String input) { + return Pair.of(new IntWritable(input.length()), new Text(input)); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<IntWritable, Text> out = Mapreduce.map(two, TestMapper.class, IntWritable.class, Text.class); + out.write(To.sequenceFile(tempDir.getPath("temp"))); + PipelineResult res = p.done(); + assertEquals(1, res.getStageResults().size()); + StageResult sr = res.getStageResults().get(0); + assertEquals(3667, sr.getCounters().findCounter("written", "out").getValue()); + } + + @Test + public void testReducer() throws Exception { + Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration()); + Path shakesPath = tempDir.copyResourcePath("shakes.txt"); + PCollection<String> in = p.read(From.textFile(shakesPath)); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(String input) { + return Pair.of(new IntWritable(input.length()), new Text(input)); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<Text, LongWritable> out = Mapreduce.reduce(two.groupByKey(), TestReducer.class, Text.class, LongWritable.class); + out.write(To.sequenceFile(tempDir.getPath("temp"))); + PipelineResult res = p.done(); + assertEquals(1, res.getStageResults().size()); + StageResult sr = res.getStageResults().get(0); + assertEquals(19, sr.getCounters().findCounter("words", "where").getValue()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java b/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java new file mode 100644 index 0000000..be10ff6 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Set; + +import javassist.util.proxy.MethodFilter; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +/** + * Static functions for working with legacy Mappers and Reducers that live under the org.apache.hadoop.mapred.* + * package as part of Crunch pipelines. + */ +public class Mapred { + + public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( + PTable<K1, V1> input, + Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, + Class<K2> keyClass, Class<V2> valueClass) { + return input.parallelDo(new MapperFn<K1, V1, K2, V2>(mapperClass), tableOf(keyClass, valueClass)); + } + + public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( + PGroupedTable<K1, V1> input, + Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, + Class<K2> keyClass, Class<V2> valueClass) { + return input.parallelDo(new ReducerFn<K1, V1, K2, V2>(reducerClass), tableOf(keyClass, valueClass)); + } + + private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf( + Class<K> keyClass, Class<V> valueClass) { + return Writables.tableOf(Writables.writables(keyClass), Writables.writables(valueClass)); + } + + private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable> extends + DoFn<Pair<K1, V1>, Pair<K2, V2>> implements Reporter { + private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass; + private transient Mapper<K1, V1, K2, V2> instance; + private transient OutputCollectorImpl<K2, V2> outputCollector; + + public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> mapperClass) { + this.mapperClass = Preconditions.checkNotNull(mapperClass); + } + + @Override + public void initialize() { + if (instance == null) { + this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration()); + } + instance.configure(new JobConf(getConfiguration())); + outputCollector = new OutputCollectorImpl<K2, V2>(); + } + + @Override + public void process(Pair<K1, V1> input, Emitter<Pair<K2, V2>> emitter) { + outputCollector.set(emitter); + try { + instance.map(input.first(), input.second(), outputCollector, this); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public void cleanup(Emitter<Pair<K2, V2>> emitter) { + try { + instance.close(); + } catch (IOException e) { + throw new CrunchRuntimeException("Error closing mapper = " + mapperClass, e); + } + } + + @Override + public void progress() { + super.progress(); + } + + @Override + public void setStatus(String status) { + super.setStatus(status); + } + + public Counters.Counter getCounter(Enum<?> counter) { + return proxyCounter(super.getCounter(counter)); + } + + public Counters.Counter getCounter(String group, String name) { + return proxyCounter(super.getCounter(group, name)); + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + + @Override + public void incrCounter(Enum<?> counter, long by) { + super.increment(counter, by); + } + + @Override + public void incrCounter(String group, String name, long by) { + super.increment(group, name, by); + } + + public float getProgress() { + return 0.5f; + } + } + + + private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable> extends + DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>> implements Reporter { + private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass; + private transient Reducer<K1, V1, K2, V2> instance; + private transient OutputCollectorImpl<K2, V2> outputCollector; + + public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> reducerClass) { + this.reducerClass = Preconditions.checkNotNull(reducerClass); + } + + @Override + public void initialize() { + if (instance == null) { + this.instance = ReflectionUtils.newInstance(reducerClass, getConfiguration()); + } + instance.configure(new JobConf(getConfiguration())); + outputCollector = new OutputCollectorImpl<K2, V2>(); + } + + @Override + public void process(Pair<K1, Iterable<V1>> input, Emitter<Pair<K2, V2>> emitter) { + outputCollector.set(emitter); + try { + instance.reduce(input.first(), input.second().iterator(), outputCollector, this); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public void cleanup(Emitter<Pair<K2, V2>> emitter) { + try { + instance.close(); + } catch (IOException e) { + throw new CrunchRuntimeException("Error closing mapper = " + reducerClass, e); + } + } + + @Override + public void progress() { + super.progress(); + } + + @Override + public void setStatus(String status) { + super.setStatus(status); + } + + public Counters.Counter getCounter(Enum<?> counter) { + return proxyCounter(super.getCounter(counter)); + } + + public Counters.Counter getCounter(String group, String name) { + return proxyCounter(super.getCounter(group, name)); + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + + @Override + public void incrCounter(Enum<?> counter, long by) { + super.increment(counter, by); + } + + @Override + public void incrCounter(String group, String name, long by) { + super.increment(group, name, by); + } + + public float getProgress() { + return 0.5f; + } + } + + private static class OutputCollectorImpl<K, V> implements OutputCollector<K, V> { + private Emitter<Pair<K, V>> emitter; + + public OutputCollectorImpl() { } + + public void set(Emitter<Pair<K, V>> emitter) { + this.emitter = emitter; + } + + @Override + public void collect(K k, V v) throws IOException { + emitter.emit(Pair.of(k, v)); + } + } + + private static Counters.Counter proxyCounter(Counter c) { + ProxyFactory proxyFactory = new ProxyFactory(); + proxyFactory.setSuperclass(Counters.Counter.class); + proxyFactory.setFilter(CCMethodHandler.FILTER); + CCMethodHandler handler = new CCMethodHandler(c); + try { + return (Counters.Counter) proxyFactory.create(new Class[0], new Object[0], handler); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + private static class CCMethodHandler implements MethodHandler { + private static final Set<String> HANDLED = ImmutableSet.of("increment", + "getCounter", "getValue", "getName", "getDisplayName", "setValue", + "getUnderlyingCounter", "readFields", "write"); + public static final MethodFilter FILTER = new MethodFilter() { + @Override + public boolean isHandled(Method m) { + return HANDLED.contains(m.getName()); + } + }; + + private final Counter c; + + public CCMethodHandler(Counter c) { + this.c = c; + } + + @Override + public Object invoke(Object obj, Method m, Method m2, Object[] args) throws Throwable { + String name = m.getName(); + if ("increment".equals(name)) { + c.increment((Long) args[0]); + return null; + } else if ("getCounter".equals(name) || "getValue".equals(name)) { + return c.getValue(); + } else if ("setValue".equals(name)) { + c.setValue((Long) args[0]); + return null; + } else if ("getDisplayName".equals(name)) { + return c.getDisplayName(); + } else if ("getName".equals(name)) { + return c.getName(); + } else if ("getUnderlyingCounter".equals(name)) { + return c; + } else if ("readFields".equals(name)) { + c.readFields((DataInput) args[0]); + return null; + } else if ("write".equals(name)) { + c.write((DataOutput) args[0]); + return null; + } + throw new IllegalStateException("Unhandled Counters.Counter method = " + name); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java b/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java new file mode 100644 index 0000000..a0a6b3e --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Proxy; + +import javassist.util.proxy.MethodFilter; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Preconditions; + +/** + * Static functions for working with legacy Mappers and Reducers that live under the org.apache.hadoop.mapreduce.* + * package as part of Crunch pipelines. + */ +public class Mapreduce { + + public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> map( + PTable<K1, V1> input, + Class<? extends Mapper<K1, V1, K2, V2>> mapperClass, + Class<K2> keyClass, Class<V2> valueClass) { + return input.parallelDo(new MapperFn<K1, V1, K2, V2>(mapperClass), tableOf(keyClass, valueClass)); + } + + public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2> reduce( + PGroupedTable<K1, V1> input, + Class<? extends Reducer<K1, V1, K2, V2>> reducerClass, + Class<K2> keyClass, Class<V2> valueClass) { + return input.parallelDo(new ReducerFn<K1, V1, K2, V2>(reducerClass), tableOf(keyClass, valueClass)); + } + + private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf( + Class<K> keyClass, Class<V> valueClass) { + return Writables.tableOf(Writables.writables(keyClass), Writables.writables(valueClass)); + } + + private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable> extends + DoFn<Pair<K1, V1>, Pair<K2, V2>> { + private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass; + private transient Mapper<K1, V1, K2, V2> instance; + private transient Mapper.Context context; + private transient CtxtMethodHandler handler; + private transient Method setupMethod; + private transient Method mapMethod; + private transient Method cleanupMethod; + + public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> mapperClass) { + this.mapperClass = Preconditions.checkNotNull(mapperClass); + } + + @Override + public void initialize() { + if (instance == null) { + this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration()); + try { + for (Method m : mapperClass.getDeclaredMethods()) { + if ("setup".equals(m.getName())) { + this.setupMethod = m; + this.setupMethod.setAccessible(true); + } else if ("cleanup".equals(m.getName())) { + this.cleanupMethod = m; + this.cleanupMethod.setAccessible(true); + } else if ("map".equals(m.getName())) { + this.mapMethod = m; + this.mapMethod.setAccessible(true); + } + } + + if (mapMethod == null) { + throw new CrunchRuntimeException("No map method for class: " + mapperClass); + } + + ProxyFactory proxyFactory = new ProxyFactory(); + proxyFactory.setSuperclass(Mapper.Context.class); + proxyFactory.setFilter(CtxtMethodHandler.FILTER); + Class[] paramTypes = new Class[] { Mapper.class }; + Object[] args = new Object[] { instance }; + if (!Modifier.isAbstract(Mapper.Context.class.getModifiers())) { + paramTypes = new Class[] { Mapper.class, + Configuration.class, TaskAttemptID.class, + RecordReader.class, RecordWriter.class, + OutputCommitter.class, + Class.forName("org.apache.hadoop.mapreduce.StatusReporter"), + InputSplit.class + }; + args = new Object[] { instance, getConfiguration(), getTaskAttemptID(), + null, null, NO_OP_OUTPUT_COMMITTER, null, null + }; + } + this.handler = new CtxtMethodHandler(this.getContext()); + this.context = (Mapper.Context) proxyFactory.create(paramTypes, args, handler); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + if (setupMethod != null) { + try { + setupMethod.invoke(instance, context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + @Override + public void process(Pair<K1, V1> input, Emitter<Pair<K2, V2>> emitter) { + handler.set(emitter); + try { + mapMethod.invoke(instance, input.first(), input.second(), context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public void cleanup(Emitter<Pair<K2, V2>> emitter) { + if (cleanupMethod != null) { + handler.set(emitter); + try { + cleanupMethod.invoke(instance, context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + } + + private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable> extends + DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>> { + private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass; + private transient Reducer<K1, V1, K2, V2> instance; + private transient CtxtMethodHandler handler; + private transient Reducer.Context context; + private transient Method setupMethod; + private transient Method reduceMethod; + private transient Method cleanupMethod; + + public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> reducerClass) { + this.reducerClass = Preconditions.checkNotNull(reducerClass); + } + + @Override + public void initialize() { + if (instance == null) { + this.instance = ReflectionUtils.newInstance(reducerClass, getConfiguration()); + try { + for (Method m : reducerClass.getDeclaredMethods()) { + if ("setup".equals(m.getName())) { + this.setupMethod = m; + this.setupMethod.setAccessible(true); + } else if ("cleanup".equals(m.getName())) { + this.cleanupMethod = m; + this.cleanupMethod.setAccessible(true); + } else if ("reduce".equals(m.getName())) { + this.reduceMethod = m; + this.reduceMethod.setAccessible(true); + } + } + + if (reduceMethod == null) { + throw new CrunchRuntimeException("No reduce method for class: " + reducerClass); + } + + ProxyFactory proxyFactory = new ProxyFactory(); + proxyFactory.setSuperclass(Reducer.Context.class); + proxyFactory.setFilter(CtxtMethodHandler.FILTER); + Class[] paramTypes = new Class[] { Reducer.class }; + Object[] args = new Object[] { instance }; + if (!Modifier.isAbstract(Reducer.Context.class.getModifiers())) { + Class rkvi = Class.forName("org.apache.hadoop.mapred.RawKeyValueIterator"); + Object rawKeyValueIterator = Proxy.newProxyInstance(rkvi.getClassLoader(), + new Class[] { rkvi }, new InvocationHandler() { + @Override + public Object invoke(Object obj, Method m, Object[] args) throws Throwable { + if ("next".equals(m.getName())) { + return true; + } + return null; + } + }); + paramTypes = new Class[] { Reducer.class, + Configuration.class, TaskAttemptID.class, + rkvi, + Counter.class, Counter.class, + RecordWriter.class, + OutputCommitter.class, + Class.forName("org.apache.hadoop.mapreduce.StatusReporter"), + RawComparator.class, + Class.class, Class.class + }; + args = new Object[] { instance, getConfiguration(), getTaskAttemptID(), + rawKeyValueIterator, null, null, null, + NO_OP_OUTPUT_COMMITTER, null, null, + NullWritable.class, NullWritable.class + }; + } + this.handler = new CtxtMethodHandler(this.getContext()); + this.context = (Reducer.Context) proxyFactory.create(paramTypes, args, handler); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + if (setupMethod != null) { + try { + setupMethod.invoke(instance, context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + @Override + public void process(Pair<K1, Iterable<V1>> input, Emitter<Pair<K2, V2>> emitter) { + handler.set(emitter); + try { + reduceMethod.invoke(instance, input.first(), input.second(), context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public void cleanup(Emitter<Pair<K2, V2>> emitter) { + if (cleanupMethod != null) { + handler.set(emitter); + try { + cleanupMethod.invoke(instance, context); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + } + + private static class CtxtMethodHandler implements MethodHandler { + public static final MethodFilter FILTER = new MethodFilter() { + @Override + public boolean isHandled(Method m) { + return true; + } + }; + + private final TaskInputOutputContext ctxt; + private Emitter emitter; + + public CtxtMethodHandler(TaskInputOutputContext ctxt) { + this.ctxt = ctxt; + } + + public void set(Emitter emitter) { + this.emitter = emitter; + } + + @Override + public Object invoke(Object instance, Method m, Method arg2, Object[] args) throws Throwable { + String name = m.getName(); + if ("write".equals(name)) { + emitter.emit(Pair.of(args[0], args[1])); + return null; + } else { + return m.invoke(ctxt, args); + } + } + } + + private static final OutputCommitter NO_OP_OUTPUT_COMMITTER = new OutputCommitter() { + @Override + public void abortTask(TaskAttemptContext arg0) throws IOException { + } + + @Override + public void commitTask(TaskAttemptContext arg0) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { + return false; + } + + @Override + public void setupJob(JobContext arg0) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext arg0) throws IOException { + } + }; + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java new file mode 100644 index 0000000..f7fdf20 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; + +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Before; +import org.junit.Test; + +public class MapredTest implements Serializable { + + private static class TestMapper implements Mapper<IntWritable, Text, Text, LongWritable> { + @Override + public void configure(JobConf arg0) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void map(IntWritable k, Text v, OutputCollector<Text, LongWritable> out, + Reporter reporter) throws IOException { + reporter.getCounter("written", "out").increment(1L); + out.collect(new Text(v), new LongWritable(v.getLength())); + } + } + + private static class TestReducer implements Reducer<IntWritable, Text, Text, LongWritable> { + + @Override + public void configure(JobConf arg0) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void reduce(IntWritable key, Iterator<Text> iter, + OutputCollector<Text, LongWritable> out, Reporter reporter) throws IOException { + boolean hasBall = false; + String notBall = ""; + while (iter.hasNext()) { + String next = iter.next().toString(); + if ("ball".equals(next)) { + reporter.getCounter("foo", "bar").increment(1); + hasBall = true; + } else { + notBall = next; + } + } + out.collect(new Text(notBall), hasBall ? new LongWritable(1L) : new LongWritable(0L)); + } + } + + private static Pair<Text, LongWritable> $(String one, int two) { + return Pair.of(new Text(one), new LongWritable(two)); + } + + @Before + public void setUp() { + MemPipeline.clearCounters(); + } + + @Test + public void testMapper() throws Exception { + PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "foot", 2, "ball", 3, "bazzar"); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(Pair<Integer, String> input) { + return Pair.of(new IntWritable(input.first()), new Text(input.second())); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<Text, LongWritable> out = Mapred.map(two, TestMapper.class, Text.class, LongWritable.class); + assertEquals(ImmutableList.of($("foot", 4), $("ball", 4), $("bazzar", 6)), + Lists.newArrayList(out.materialize())); + } + + @Test + public void testReducer() throws Exception { + PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "foot", 1, "ball", 2, "base", 2, "ball", 3, "basket", 3, "ball", 4, "hockey"); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(Pair<Integer, String> input) { + return Pair.of(new IntWritable(input.first()), new Text(input.second())); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + PTable<Text, LongWritable> out = Mapred.reduce(two.groupByKey(), TestReducer.class, Text.class, LongWritable.class); + assertEquals(ImmutableList.of($("foot", 1), $("base", 1), $("basket", 1), $("hockey", 0)), + Lists.newArrayList(out.materialize())); + assertEquals(3, MemPipeline.getCounters().findCounter("foo", "bar").getValue()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java new file mode 100644 index 0000000..0606690 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.avro.Avros; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MapreduceTest { + private static class TestMapper extends Mapper<IntWritable, Text, IntWritable, Text> { + @Override + protected void map(IntWritable k, Text v, Mapper<IntWritable, Text, IntWritable, Text>.Context ctxt) { + try { + ctxt.write(new IntWritable(v.getLength()), v); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static class TestReducer extends Reducer<IntWritable, Text, Text, LongWritable> { + protected void reduce(IntWritable key, Iterable<Text> values, + org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, Text, LongWritable>.Context ctxt) { + boolean hasBall = false; + String notBall = ""; + for (Text t : values) { + String next = t.toString(); + if ("ball".equals(next)) { + hasBall = true; + ctxt.getCounter("foo", "bar").increment(1); + } else { + notBall = next; + } + } + try { + ctxt.write(new Text(notBall), hasBall ? new LongWritable(1L) : new LongWritable(0L)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static Pair<Text, LongWritable> $1(String one, int two) { + return Pair.of(new Text(one), new LongWritable(two)); + } + + private static Pair<IntWritable, Text> $2(int one, String two) { + return Pair.of(new IntWritable(one), new Text(two)); + } + + @Before + public void setUp() { + MemPipeline.clearCounters(); + } + + @Test + public void testMapper() throws Exception { + PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "foot", 2, "ball", 3, "bazzar"); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(Pair<Integer, String> input) { + return Pair.of(new IntWritable(input.first()), new Text(input.second())); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + + PTable<IntWritable, Text> out = Mapreduce.map(two, TestMapper.class, IntWritable.class, Text.class); + assertEquals(ImmutableList.of($2(4, "foot"), $2(4, "ball"), $2(6, "bazzar")), + Lists.newArrayList(out.materialize())); + } + + @Test + public void testReducer() throws Exception { + PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "foot", 1, "ball", 2, "base", 2, "ball", 3, "basket", 3, "ball", 4, "hockey"); + PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>, Pair<IntWritable, Text>>() { + @Override + public Pair<IntWritable, Text> map(Pair<Integer, String> input) { + return Pair.of(new IntWritable(input.first()), new Text(input.second())); + } + }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class))); + PTable<Text, LongWritable> out = Mapreduce.reduce(two.groupByKey(), TestReducer.class, Text.class, LongWritable.class); + assertEquals(ImmutableList.of($1("foot", 1), $1("base", 1), $1("basket", 1), $1("hockey", 0)), + Lists.newArrayList(out.materialize())); + assertEquals(3, MemPipeline.getCounters().findCounter("foo", "bar").getValue()); + } +}
