mr-runner: support BoundedSource with BeamInputFormat.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a884a2f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a884a2f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a884a2f0

Branch: refs/heads/mr-runner
Commit: a884a2f0b33b6621ef3a2fff6f5467109707df54
Parents: a8b366d
Author: Pei He <[email protected]>
Authored: Fri Jul 21 13:46:36 2017 +0800
Committer: Pei He <[email protected]>
Committed: Thu Aug 31 14:13:47 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |  14 +-
 .../runners/mapreduce/MapReduceWordCount.java   | 218 +++++++++++++++++++
 .../mapreduce/translation/BeamInputFormat.java  | 154 +++++++++++++
 .../mapreduce/translation/BeamMapper.java       |  30 +++
 4 files changed, 415 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 2e8a8c9..d18eee8 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -84,7 +84,19 @@
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${mapreduce.version}</version>
     </dependency>
-    
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${mapreduce.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${mapreduce.version}</version>
+    </dependency>
+
     <!-- Beam dependencies -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
new file mode 100644
index 0000000..4ba3a29
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
@@ -0,0 +1,218 @@
+package org.apache.beam.runners.mapreduce;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.StringTokenizer;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.mapreduce.translation.BeamInputFormat;
+import org.apache.beam.runners.mapreduce.translation.BeamMapper;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.log4j.BasicConfigurator;
+
+public class MapReduceWordCount {
+
+  public static class CreateSource<T> extends OffsetBasedSource<T> {
+    private final List<byte[]> allElementsBytes;
+    private final long totalSize;
+    private final Coder<T> coder;
+
+    public static <T> CreateSource<T> fromIterable(Iterable<T> elements, 
Coder<T> elemCoder)
+        throws CoderException, IOException {
+      ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
+      long totalSize = 0L;
+      for (T element : elements) {
+        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+        allElementsBytes.add(bytes);
+        totalSize += bytes.length;
+      }
+      return new CreateSource<>(allElementsBytes.build(), totalSize, 
elemCoder);
+    }
+
+    /**
+     * Create a new source with the specified bytes. The new source owns the 
input element bytes,
+     * which must not be modified after this constructor is called.
+     */
+    private CreateSource(List<byte[]> elementBytes, long totalSize, Coder<T> 
coder) {
+      super(0, elementBytes.size(), 1);
+      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
+      this.totalSize = totalSize;
+      this.coder = coder;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return totalSize;
+    }
+
+    @Override
+    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options)
+        throws IOException {
+      return new BytesReader<>(this);
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    @Override
+    public long getMaxEndOffset(PipelineOptions options) throws Exception {
+      return allElementsBytes.size();
+    }
+
+    @Override
+    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+      List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) 
end);
+      long primarySizeEstimate =
+          (long) (totalSize * primaryElems.size() / (double) 
allElementsBytes.size());
+      return new CreateSource<>(primaryElems, primarySizeEstimate, coder);
+    }
+
+    @Override
+    public long getBytesPerOffset() {
+      if (allElementsBytes.size() == 0) {
+        return 1L;
+      }
+      return Math.max(1, totalSize / allElementsBytes.size());
+    }
+
+    private static class BytesReader<T> extends OffsetBasedReader<T> {
+      private int index;
+      /**
+       * Use an optional to distinguish between null next element (as 
Optional.absent()) and no next
+       * element (next is null).
+       */
+      @Nullable
+      private Optional<T> next;
+
+      public BytesReader(CreateSource<T> source) {
+        super(source);
+        index = -1;
+      }
+
+      @Override
+      @Nullable
+      public T getCurrent() throws NoSuchElementException {
+        if (next == null) {
+          throw new NoSuchElementException();
+        }
+        return next.orNull();
+      }
+
+      @Override
+      public void close() throws IOException {}
+
+      @Override
+      protected long getCurrentOffset() {
+        return index;
+      }
+
+      @Override
+      protected boolean startImpl() throws IOException {
+        return advanceImpl();
+      }
+
+      @Override
+      public synchronized CreateSource<T> getCurrentSource() {
+        return (CreateSource<T>) super.getCurrentSource();
+      }
+
+      @Override
+      protected boolean advanceImpl() throws IOException {
+        CreateSource<T> source = getCurrentSource();
+        if (index + 1 >= source.allElementsBytes.size()) {
+          next = null;
+          return false;
+        }
+        index++;
+        next =
+            Optional.fromNullable(
+                CoderUtils.decodeFromByteArray(source.coder, 
source.allElementsBytes.get(index)));
+        return true;
+      }
+    }
+  }
+
+  public static class TokenizerMapper
+      extends Mapper<Object, Text, Text, IntWritable>{
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context
+    ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducer
+      extends Reducer<Text, IntWritable, Text, IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicConfigurator.configure();
+
+    Configuration conf = new Configuration();
+
+    BoundedSource<KV<String, Integer>> source = CreateSource.fromIterable(
+        ImmutableList.of(KV.of("k1", 10), KV.of("k2", 2)),
+        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+    conf.set(
+        "source",
+        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
+
+    Job job = Job.getInstance(conf, "word count");
+    job.setJarByClass(MapReduceWordCount.class);
+    job.setInputFormatClass(BeamInputFormat.class);
+    job.setMapperClass(BeamMapper.class);
+    //job.setMapperClass(TokenizerMapper.class);
+    //job.setCombinerClass(IntSumReducer.class);
+    //job.setReducerClass(IntSumReducer.class);
+    //job.setOutputKeyClass(Text.class);
+    //job.setOutputValueClass(IntWritable.class);
+    //FileInputFormat.addInputPath(job, new Path(args[0]));
+    job.setOutputFormatClass(NullOutputFormat.class);
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
new file mode 100644
index 0000000..8c4155a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -0,0 +1,154 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}.
+ */
+public class BeamInputFormat<K, V> extends InputFormat {
+
+  private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 
* 1000;
+
+  private BoundedSource<KV<K, V>> source;
+  private PipelineOptions options;
+
+  public BeamInputFormat() {
+  }
+
+  public BeamInputFormat(BoundedSource<KV<K, V>> source, PipelineOptions 
options) {
+    this.source = checkNotNull(source, "source");
+    this.options = checkNotNull(options, "options");
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+    source = (BoundedSource<KV<K,V>>) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(context.getConfiguration().get("source")),
+        "");
+    try {
+      return 
FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options))
+          .transform(new Function<BoundedSource<KV<K, V>>, InputSplit>() {
+            @Override
+            public InputSplit apply(BoundedSource<KV<K, V>> source) {
+              try {
+                return new 
BeamInputSplit(source.getEstimatedSizeBytes(options));
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }})
+          .toList();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public RecordReader createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException, 
InterruptedException {
+    source = (BoundedSource<KV<K,V>>) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(context.getConfiguration().get("source")),
+        "");
+    return new BeamRecordReader<>(source.createReader(options));
+  }
+
+  public static class BeamInputSplit extends InputSplit implements Writable {
+    private long estimatedSizeBytes;
+
+    public BeamInputSplit() {
+    }
+
+    BeamInputSplit(long estimatedSizeBytes) {
+      this.estimatedSizeBytes = estimatedSizeBytes;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return estimatedSizeBytes;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[0];
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(estimatedSizeBytes);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      estimatedSizeBytes = in.readLong();
+    }
+  }
+
+  private class BeamRecordReader<K, V> extends RecordReader {
+
+    private final BoundedSource.BoundedReader<KV<K, V>> reader;
+    private boolean started;
+
+    public BeamRecordReader(BoundedSource.BoundedReader<KV<K, V>> reader) {
+      this.reader = checkNotNull(reader, "reader");
+      this.started = false;
+    }
+
+    @Override
+    public void initialize(
+        InputSplit split, TaskAttemptContext context) throws IOException, 
InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (!started) {
+        return reader.start();
+      } else {
+        return reader.advance();
+      }
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException, InterruptedException {
+      return reader.getCurrent().getKey();
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException, InterruptedException {
+      return reader.getCurrent().getValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      Double progress = reader.getFractionConsumed();
+      if (progress != null) {
+        return progress.floatValue();
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
new file mode 100644
index 0000000..88fc8d6
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -0,0 +1,30 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Created by peihe on 21/07/2017.
+ */
+public class BeamMapper<KeyInT, ValueInT, KeyOutT, ValueOutT>
+    extends Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT> {
+
+  private DoFnInvoker<KV<KeyInT, ValueInT>, KV<KeyOutT, ValueOutT>> 
doFnInvoker;
+
+  @Override
+  protected void setup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context 
context) {
+  }
+
+  @Override
+  protected void map(
+      KeyInT key,
+      ValueInT value,
+      Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) {
+    System.out.print(String.format("key: %s, value: %s", key, value));
+  }
+
+  @Override
+  protected void cleanup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context 
context) {
+  }
+}

Reply via email to