mr-runner: support BoundedSource with BeamInputFormat.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a884a2f0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a884a2f0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a884a2f0 Branch: refs/heads/mr-runner Commit: a884a2f0b33b6621ef3a2fff6f5467109707df54 Parents: a8b366d Author: Pei He <[email protected]> Authored: Fri Jul 21 13:46:36 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:47 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 14 +- .../runners/mapreduce/MapReduceWordCount.java | 218 +++++++++++++++++++ .../mapreduce/translation/BeamInputFormat.java | 154 +++++++++++++ .../mapreduce/translation/BeamMapper.java | 30 +++ 4 files changed, 415 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index 2e8a8c9..d18eee8 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -84,7 +84,19 @@ <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${mapreduce.version}</version> </dependency> - + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>${mapreduce.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${mapreduce.version}</version> + </dependency> + <!-- Beam dependencies --> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java new file mode 100644 index 0000000..4ba3a29 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java @@ -0,0 +1,218 @@ +package org.apache.beam.runners.mapreduce; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.StringTokenizer; + +import javax.annotation.Nullable; +import org.apache.beam.runners.mapreduce.translation.BeamInputFormat; +import org.apache.beam.runners.mapreduce.translation.BeamMapper; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.log4j.BasicConfigurator; + +public class MapReduceWordCount { + + public static class CreateSource<T> extends OffsetBasedSource<T> { + private final List<byte[]> allElementsBytes; + private final long totalSize; + private final Coder<T> coder; + + public static <T> CreateSource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder) + throws CoderException, IOException { + ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder(); + long totalSize = 0L; + for (T element : elements) { + byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element); + allElementsBytes.add(bytes); + totalSize += bytes.length; + } + return new CreateSource<>(allElementsBytes.build(), totalSize, elemCoder); + } + + /** + * Create a new source with the specified bytes. The new source owns the input element bytes, + * which must not be modified after this constructor is called. + */ + private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) { + super(0, elementBytes.size(), 1); + this.allElementsBytes = ImmutableList.copyOf(elementBytes); + this.totalSize = totalSize; + this.coder = coder; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return totalSize; + } + + @Override + public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) + throws IOException { + return new BytesReader<>(this); + } + + @Override + public void validate() {} + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder; + } + + @Override + public long getMaxEndOffset(PipelineOptions options) throws Exception { + return allElementsBytes.size(); + } + + @Override + public OffsetBasedSource<T> createSourceForSubrange(long start, long end) { + List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end); + long primarySizeEstimate = + (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size()); + return new CreateSource<>(primaryElems, primarySizeEstimate, coder); + } + + @Override + public long getBytesPerOffset() { + if (allElementsBytes.size() == 0) { + return 1L; + } + return Math.max(1, totalSize / allElementsBytes.size()); + } + + private static class BytesReader<T> extends OffsetBasedReader<T> { + private int index; + /** + * Use an optional to distinguish between null next element (as Optional.absent()) and no next + * element (next is null). + */ + @Nullable + private Optional<T> next; + + public BytesReader(CreateSource<T> source) { + super(source); + index = -1; + } + + @Override + @Nullable + public T getCurrent() throws NoSuchElementException { + if (next == null) { + throw new NoSuchElementException(); + } + return next.orNull(); + } + + @Override + public void close() throws IOException {} + + @Override + protected long getCurrentOffset() { + return index; + } + + @Override + protected boolean startImpl() throws IOException { + return advanceImpl(); + } + + @Override + public synchronized CreateSource<T> getCurrentSource() { + return (CreateSource<T>) super.getCurrentSource(); + } + + @Override + protected boolean advanceImpl() throws IOException { + CreateSource<T> source = getCurrentSource(); + if (index + 1 >= source.allElementsBytes.size()) { + next = null; + return false; + } + index++; + next = + Optional.fromNullable( + CoderUtils.decodeFromByteArray(source.coder, source.allElementsBytes.get(index))); + return true; + } + } + } + + public static class TokenizerMapper + extends Mapper<Object, Text, Text, IntWritable>{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context + ) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + + public static class IntSumReducer + extends Reducer<Text, IntWritable, Text, IntWritable> { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable<IntWritable> values, Context context) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + + public static void main(String[] args) throws Exception { + BasicConfigurator.configure(); + + Configuration conf = new Configuration(); + + BoundedSource<KV<String, Integer>> source = CreateSource.fromIterable( + ImmutableList.of(KV.of("k1", 10), KV.of("k2", 2)), + KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())); + + conf.set( + "source", + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source))); + + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(MapReduceWordCount.class); + job.setInputFormatClass(BeamInputFormat.class); + job.setMapperClass(BeamMapper.class); + //job.setMapperClass(TokenizerMapper.class); + //job.setCombinerClass(IntSumReducer.class); + //job.setReducerClass(IntSumReducer.class); + //job.setOutputKeyClass(Text.class); + //job.setOutputValueClass(IntWritable.class); + //FileInputFormat.addInputPath(job, new Path(args[0])); + job.setOutputFormatClass(NullOutputFormat.class); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java new file mode 100644 index 0000000..8c4155a --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java @@ -0,0 +1,154 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}. + */ +public class BeamInputFormat<K, V> extends InputFormat { + + private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000; + + private BoundedSource<KV<K, V>> source; + private PipelineOptions options; + + public BeamInputFormat() { + } + + public BeamInputFormat(BoundedSource<KV<K, V>> source, PipelineOptions options) { + this.source = checkNotNull(source, "source"); + this.options = checkNotNull(options, "options"); + } + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(context.getConfiguration().get("source")), + ""); + try { + return FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options)) + .transform(new Function<BoundedSource<KV<K, V>>, InputSplit>() { + @Override + public InputSplit apply(BoundedSource<KV<K, V>> source) { + try { + return new BeamInputSplit(source.getEstimatedSizeBytes(options)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}) + .toList(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(context.getConfiguration().get("source")), + ""); + return new BeamRecordReader<>(source.createReader(options)); + } + + public static class BeamInputSplit extends InputSplit implements Writable { + private long estimatedSizeBytes; + + public BeamInputSplit() { + } + + BeamInputSplit(long estimatedSizeBytes) { + this.estimatedSizeBytes = estimatedSizeBytes; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return estimatedSizeBytes; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(estimatedSizeBytes); + } + + @Override + public void readFields(DataInput in) throws IOException { + estimatedSizeBytes = in.readLong(); + } + } + + private class BeamRecordReader<K, V> extends RecordReader { + + private final BoundedSource.BoundedReader<KV<K, V>> reader; + private boolean started; + + public BeamRecordReader(BoundedSource.BoundedReader<KV<K, V>> reader) { + this.reader = checkNotNull(reader, "reader"); + this.started = false; + } + + @Override + public void initialize( + InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!started) { + return reader.start(); + } else { + return reader.advance(); + } + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return reader.getCurrent().getKey(); + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + return reader.getCurrent().getValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + Double progress = reader.getFractionConsumed(); + if (progress != null) { + return progress.floatValue(); + } else { + return 0; + } + } + + @Override + public void close() throws IOException { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java new file mode 100644 index 0000000..88fc8d6 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -0,0 +1,30 @@ +package org.apache.beam.runners.mapreduce.translation; + +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.mapreduce.Mapper; + +/** + * Created by peihe on 21/07/2017. + */ +public class BeamMapper<KeyInT, ValueInT, KeyOutT, ValueOutT> + extends Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT> { + + private DoFnInvoker<KV<KeyInT, ValueInT>, KV<KeyOutT, ValueOutT>> doFnInvoker; + + @Override + protected void setup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { + } + + @Override + protected void map( + KeyInT key, + ValueInT value, + Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { + System.out.print(String.format("key: %s, value: %s", key, value)); + } + + @Override + protected void cleanup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) { + } +}
