mr-runner: support reduce side ParDos and WordCount.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6a3a18d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6a3a18d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6a3a18d

Branch: refs/heads/mr-runner
Commit: c6a3a18d2c71c8f523deb54b323f26408c7de207
Parents: d09fb42
Author: Pei He <p...@apache.org>
Authored: Thu Jul 27 10:52:32 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |   2 +-
 .../mapreduce/translation/BeamMapper.java       |  12 +-
 .../mapreduce/translation/BeamReducer.java      |  56 ++++---
 .../runners/mapreduce/translation/Graph.java    |   4 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   5 +
 .../mapreduce/translation/JobPrototype.java     | 164 ++++++++++++++-----
 .../mapreduce/translation/Operation.java        |   8 +-
 .../mapreduce/translation/OutputReceiver.java   |   3 +-
 .../ReifyTimestampAndWindowsParDoOperation.java |  46 ++++++
 .../translation/WindowAssignOperation.java      |  75 +++++++++
 .../mapreduce/translation/WriteOperation.java   |  13 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  13 +-
 12 files changed, 318 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index d18eee8..226c5c0 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
+    <version>2.2.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index 11ecc8d..b5e4edc 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -16,7 +16,7 @@ public class BeamMapper<ValueInT, ValueOutT>
 
   public static final String BEAM_PAR_DO_OPERATION_MAPPER = 
"beam-par-do-op-mapper";
 
-  private ParDoOperation parDoOperation;
+  private Operation operation;
 
   @Override
   protected void setup(
@@ -24,9 +24,9 @@ public class BeamMapper<ValueInT, ValueOutT>
     String serializedParDo = checkNotNull(
         context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
         BEAM_PAR_DO_OPERATION_MAPPER);
-    parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(serializedParDo), "ParDoOperation");
-    parDoOperation.start((TaskInputOutputContext) context);
+    operation = (Operation) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "Operation");
+    operation.start((TaskInputOutputContext) context);
   }
 
   @Override
@@ -34,12 +34,12 @@ public class BeamMapper<ValueInT, ValueOutT>
       Object key,
       WindowedValue<ValueInT> value,
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
-    parDoOperation.process(value);
+    operation.process(value);
   }
 
   @Override
   protected void cleanup(
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
-    parDoOperation.finish();
+    operation.finish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
index 8eb7938..9b8bd82 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -5,14 +5,17 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
@@ -20,49 +23,64 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  * Created by peihe on 25/07/2017.
  */
 public class BeamReducer<ValueInT, ValueOutT>
-    extends Reducer<Object, byte[], Object, WindowedValue<ValueOutT>> {
+    extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> {
 
+  public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder";
   public static final String BEAM_PAR_DO_OPERATION_REDUCER = 
"beam-par-do-op-reducer";
 
-  private ParDoOperation parDoOperation;
+  private Coder<Object> keyCoder;
+  private Coder<Object> valueCoder;
+  private Operation operation;
 
   @Override
   protected void setup(
-      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    String serializedValueCoder = checkNotNull(
+        context.getConfiguration().get(BEAM_REDUCER_KV_CODER),
+        BEAM_REDUCER_KV_CODER);
     String serializedParDo = checkNotNull(
         context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER),
         BEAM_PAR_DO_OPERATION_REDUCER);
-    parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(serializedParDo), "ParDoOperation");
-    parDoOperation.start((TaskInputOutputContext) context);
+    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) 
SerializableUtils
+        .deserializeFromByteArray(Base64.decodeBase64(serializedValueCoder), 
"Coder");
+    keyCoder = kvCoder.getKeyCoder();
+    valueCoder = kvCoder.getValueCoder();
+    operation = (Operation) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "Operation");
+    operation.start((TaskInputOutputContext) context);
   }
 
   @Override
   protected void reduce(
-      Object key,
+      BytesWritable key,
       Iterable<byte[]> values,
-      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
-    Iterable<Object> decodedValues = FluentIterable.from(values)
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values)
         .transform(new Function<byte[], Object>() {
           @Override
           public Object apply(byte[] input) {
             ByteArrayInputStream inStream = new ByteArrayInputStream(input);
             try {
-              // TODO: setup coders.
-              return 
NullableCoder.of(BigEndianLongCoder.of()).decode(inStream);
+              return valueCoder.decode(inStream);
             } catch (IOException e) {
               Throwables.throwIfUnchecked(e);
               throw new RuntimeException(e);
             }
-          }
-        });
-    parDoOperation.process(
-        WindowedValue.valueInGlobalWindow(KV.of(key, decodedValues)));
+          }}));
+
+    try {
+      operation.process(
+          WindowedValue.valueInGlobalWindow(
+              KV.of(keyCoder.decode(new ByteArrayInputStream(key.getBytes())), 
decodedValues)));
+    } catch (IOException e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected void cleanup(
-      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
-    parDoOperation.finish();
+      Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    operation.finish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 867d1af..e360419 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -18,6 +18,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
@@ -127,7 +128,8 @@ public class Graph {
 
     public void accept(GraphVisitor visitor) {
       PTransform<?, ?> transform = step.getTransform();
-      if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput) {
+      if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput
+          || transform instanceof Window.Assign) {
         visitor.visitParDo(this);
       } else if (transform instanceof GroupByKey) {
         visitor.visitGroupByKey(this);

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 0b8a876..8ee616d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -15,6 +15,7 @@ import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -90,6 +91,10 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, 
InputT, OutputT, W extends
     // Finally, advance the processing time to infinity to fire any timers.
     timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
+    runner.onTimers(timerInternals.getTimers(TimeDomain.EVENT_TIME));
+    runner.onTimers(timerInternals.getTimers(TimeDomain.PROCESSING_TIME));
+    
runner.onTimers(timerInternals.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+
     runner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 34266f4..576c6bf 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -12,14 +12,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
@@ -69,72 +75,150 @@ public class JobPrototype {
     // Setup DoFns in BeamMapper.
     // TODO: support more than one in-path.
     Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths());
-    List<Graph.Step> parDos = new ArrayList<>();
-    parDos.addAll(FluentIterable.from(inPath.steps())
-        .filter(new Predicate<Graph.Step>() {
-          @Override
-          public boolean apply(Graph.Step input) {
-            PTransform<?, ?> transform = input.getTransform();
-            return transform instanceof ParDo.SingleOutput
-                || transform instanceof ParDo.MultiOutput;
-          }})
-        .toList());
+
+    Operation mapperParDoRoot = chainParDosInPath(inPath);
+    Operation mapperParDoTail = getTailOperation(mapperParDoRoot);
     Graph.Step vertexStep = vertex.getStep();
     if (vertexStep.getTransform() instanceof ParDo.SingleOutput
-        || vertexStep.getTransform() instanceof ParDo.MultiOutput) {
-      parDos.add(vertexStep);
-    }
-
-    ParDoOperation root = null;
-    ParDoOperation prev = null;
-    for (Graph.Step step : parDos) {
-      ParDoOperation current = new NormalParDoOperation(
-          getDoFn(step.getTransform()),
+        || vertexStep.getTransform() instanceof ParDo.MultiOutput
+        || vertexStep.getTransform() instanceof Window.Assign) {
+      // TODO: add a TailVertex type to simplify the translation.
+      Operation vertexParDo = translateToOperation(vertexStep);
+      Operation mapperWrite = new WriteOperation(
+          getKeyCoder(inEdge.getCoder()),
+          getReifyValueCoder(inEdge.getCoder(), 
vertexStep.getWindowingStrategy()));
+      mapperParDoTail.attachOutput(vertexParDo, 0);
+      vertexParDo.attachOutput(mapperWrite, 0);
+    } else if (vertexStep.getTransform() instanceof GroupByKey) {
+      Operation reifyOperation = new ReifyTimestampAndWindowsParDoOperation(
           PipelineOptionsFactory.create(),
-          (TupleTag<Object>) step.getOutputs().iterator().next(),
+          new TupleTag<>(),
           ImmutableList.<TupleTag<?>>of(),
-          step.getWindowingStrategy());
-      if (root == null) {
-        root = current;
-      } else {
-        // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero.
-        current.attachInput(prev, 0);
-      }
-      prev = current;
+          vertexStep.getWindowingStrategy());
+      Operation mapperWrite = new WriteOperation(
+          getKeyCoder(inEdge.getCoder()),
+          getReifyValueCoder(inEdge.getCoder(), 
vertexStep.getWindowingStrategy()));
+      mapperParDoTail.attachOutput(reifyOperation, 0);
+      reifyOperation.attachOutput(mapperWrite, 0);
+    } else {
+      throw new UnsupportedOperationException("Transform: " + 
vertexStep.getTransform());
     }
-    // TODO: get coders from pipeline.
-    WriteOperation writeOperation = new WriteOperation(inEdge.getCoder());
-    writeOperation.attachInput(prev, 0);
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(byte[].class);
     conf.set(
         BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
-        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root)));
+        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(mapperParDoRoot)));
     job.setMapperClass(BeamMapper.class);
 
     if (vertexStep.getTransform() instanceof GroupByKey) {
       // Setup BeamReducer
-      ParDoOperation operation = new GroupAlsoByWindowsParDoOperation(
+      Operation gabwOperation = new GroupAlsoByWindowsParDoOperation(
           PipelineOptionsFactory.create(),
           (TupleTag<Object>) vertexStep.getOutputs().iterator().next(),
           ImmutableList.<TupleTag<?>>of(),
           vertexStep.getWindowingStrategy(),
           inEdge.getCoder());
-      // TODO: handle the map output key type.
-      job.setMapOutputKeyClass(BytesWritable.class);
-      job.setMapOutputValueClass(byte[].class);
+      Graph.Edge outEdge = Iterables.getOnlyElement(vertex.getOutgoing());
+      Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths());
+      Operation reducerParDoRoot = chainParDosInPath(outPath);
+      Operation reducerParDoTail = getTailOperation(reducerParDoRoot);
+
+      Operation reducerTailParDo = 
translateToOperation(outEdge.getTail().getStep());
+      if (reducerParDoRoot == null) {
+        gabwOperation.attachOutput(reducerTailParDo, 0);
+      } else {
+        gabwOperation.attachOutput(reducerParDoRoot, 0);
+        reducerParDoTail.attachOutput(reducerTailParDo, 0);
+      }
+      conf.set(
+          BeamReducer.BEAM_REDUCER_KV_CODER,
+          Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+              KvCoder.of(
+                  getKeyCoder(inEdge.getCoder()),
+                  getReifyValueCoder(inEdge.getCoder(), 
vertexStep.getWindowingStrategy())))));
       conf.set(
           BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
-          
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(operation)));
+          
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(gabwOperation)));
       job.setReducerClass(BeamReducer.class);
     }
     job.setOutputFormatClass(NullOutputFormat.class);
     return job;
   }
 
-  private DoFn<Object, Object> getDoFn(PTransform<?, ?> transform) {
+  private Coder<Object> getKeyCoder(Coder<?> coder) {
+    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) 
checkNotNull(coder, "coder");
+    return kvCoder.getKeyCoder();
+  }
+
+  private Coder<Object> getReifyValueCoder(
+      Coder<?> coder, WindowingStrategy<?, ?> windowingStrategy) {
+    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) 
checkNotNull(coder, "coder");
+    return (Coder) WindowedValue.getFullCoder(
+        kvCoder.getValueCoder(), 
windowingStrategy.getWindowFn().windowCoder());
+  }
+
+  private Operation getTailOperation(@Nullable Operation operation) {
+    if (operation == null) {
+      return null;
+    }
+    if (operation.getOutputReceivers().isEmpty()) {
+      return operation;
+    }
+    OutputReceiver receiver = 
Iterables.getOnlyElement(operation.getOutputReceivers());
+    if (receiver.getReceivingOperations().isEmpty()) {
+      return operation;
+    }
+    return 
getTailOperation(Iterables.getOnlyElement(receiver.getReceivingOperations()));
+  }
+
+  private Operation chainParDosInPath(Graph.NodePath path) {
+    List<Graph.Step> parDos = new ArrayList<>();
+    // TODO: we should not need this filter.
+    parDos.addAll(FluentIterable.from(path.steps())
+        .filter(new Predicate<Graph.Step>() {
+          @Override
+          public boolean apply(Graph.Step input) {
+            PTransform<?, ?> transform = input.getTransform();
+            return !(transform instanceof Read.Bounded);
+          }})
+        .toList());
+
+    Operation root = null;
+    Operation prev = null;
+    for (Graph.Step step : parDos) {
+      Operation current = translateToOperation(step);
+      if (prev == null) {
+        root = current;
+      } else {
+        // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero.
+        prev.attachOutput(current, 0);
+      }
+      prev = current;
+    }
+    return root;
+  }
+
+  private Operation translateToOperation(Graph.Step parDoStep) {
+    PTransform<?, ?> transform = parDoStep.getTransform();
+    DoFn<Object, Object> doFn;
     if (transform instanceof ParDo.SingleOutput) {
-      return ((ParDo.SingleOutput) transform).getFn();
+      return new NormalParDoOperation(
+          ((ParDo.SingleOutput) transform).getFn(),
+          PipelineOptionsFactory.create(),
+          (TupleTag<Object>) parDoStep.getOutputs().iterator().next(),
+          ImmutableList.<TupleTag<?>>of(),
+          parDoStep.getWindowingStrategy());
+    } else if (transform instanceof ParDo.MultiOutput) {
+      return new NormalParDoOperation(
+          ((ParDo.MultiOutput) transform).getFn(),
+          PipelineOptionsFactory.create(),
+          (TupleTag<Object>) parDoStep.getOutputs().iterator().next(),
+          ImmutableList.<TupleTag<?>>of(),
+          parDoStep.getWindowingStrategy());
+    } else if (transform instanceof Window.Assign) {
+      return new WindowAssignOperation<>(1, 
parDoStep.getWindowingStrategy().getWindowFn());
     } else {
-      return ((ParDo.MultiOutput) transform).getFn();
+      throw new UnsupportedOperationException("Transform: " + transform);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index 5700e89..6951909 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -60,10 +60,10 @@ public abstract class Operation implements Serializable {
   }
 
   /**
-   * Adds an input to this ParDoOperation, coming from the given output of the 
given source.
+   * Adds an output to this Operation.
    */
-  public void attachInput(Operation source, int outputNum) {
-    OutputReceiver fanOut = source.receivers[outputNum];
-    fanOut.addOutput(this);
+  public void attachOutput(Operation output, int outputNum) {
+    OutputReceiver fanOut = receivers[outputNum];
+    fanOut.addOutput(output);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
index 3347672..6aeefd2 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -23,8 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * OutputReceiver that forwards each input it receives to each of a list of 
down stream
- * ParDoOperations.
+ * OutputReceiver that forwards each input it receives to each of a list of 
down stream operations.
  */
 public class OutputReceiver implements Serializable {
   private final List<Operation> receivingOperations = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
new file mode 100644
index 0000000..ec954bb
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -0,0 +1,46 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Created by peihe on 27/07/2017.
+ */
+public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation {
+
+  public ReifyTimestampAndWindowsParDoOperation(
+      PipelineOptions options,
+      TupleTag<Object> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+  }
+
+  @Override
+  DoFn<Object, Object> getDoFn() {
+    return (DoFn) new ReifyTimestampAndWindowsDoFn<>();
+  }
+
+  public class ReifyTimestampAndWindowsDoFn<K, V>
+      extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
+      KV<K, V> kv = c.element();
+      K key = kv.getKey();
+      V value = kv.getValue();
+      c.output(KV.of(
+          key,
+          WindowedValue.of(
+              value,
+              c.timestamp(),
+              window,
+              c.pane())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
new file mode 100644
index 0000000..144ef16
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
@@ -0,0 +1,75 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * Created by peihe on 27/07/2017.
+ */
+public class WindowAssignOperation<T, W extends BoundedWindow> extends 
Operation {
+  private final WindowFn<T, W> windowFn;
+
+  public WindowAssignOperation(int numOutputs, WindowFn<T, W> windowFn) {
+    super(numOutputs);
+    this.windowFn = checkNotNull(windowFn, "windowFn");
+  }
+
+  @Override
+  public void process(Object elem) {
+    WindowedValue windowedValue = (WindowedValue) elem;
+    try {
+      Collection<W> windows = windowFn.assignWindows(new 
AssignContextInternal<>(windowFn, windowedValue));
+      for (W window : windows) {
+        OutputReceiver receiver = 
Iterables.getOnlyElement(getOutputReceivers());
+        receiver.process(WindowedValue.of(
+            windowedValue.getValue(),
+            windowedValue.getTimestamp(),
+            window,
+            windowedValue.getPane()));
+      }
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private class AssignContextInternal<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    AssignContextInternal(WindowFn<InputT, W> fn, WindowedValue<InputT> value) 
{
+      fn.super();
+      checkArgument(
+          Iterables.size(value.getWindows()) == 1,
+          String.format(
+              "%s passed to window assignment must be in a single window, but 
it was in %s: %s",
+              WindowedValue.class.getSimpleName(),
+              Iterables.size(value.getWindows()),
+              value.getWindows()));
+      this.value = value;
+    }
+
+    @Override
+    public InputT element() {
+      return value.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return value.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
index 97201d0..0585032 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
@@ -7,8 +7,10 @@ import java.io.ByteArrayOutputStream;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
@@ -18,15 +20,14 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 public class WriteOperation extends Operation {
 
   private final Coder<Object> keyCoder;
-  private final Coder<Object> nullableValueCoder;
+  private final Coder<Object> valueCoder;
 
   private transient TaskInputOutputContext<Object, Object, Object, Object> 
taskContext;
 
-  public WriteOperation(Coder<?> coder) {
+  public WriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) {
     super(0);
-    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) 
checkNotNull(coder, "coder");
-    this.keyCoder = kvCoder.getKeyCoder();
-    this.nullableValueCoder = NullableCoder.of(kvCoder.getValueCoder());
+    this.keyCoder = checkNotNull(keyCoder, "keyCoder");
+    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
   }
 
   @Override
@@ -42,7 +43,7 @@ public class WriteOperation extends Operation {
       keyCoder.encode(windowedElem.getValue().getKey(), keyStream);
 
       ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
-      nullableValueCoder.encode(windowedElem.getValue().getValue(), 
valueStream);
+      valueCoder.encode(windowedElem.getValue().getValue(), valueStream);
       taskContext.write(new BytesWritable(keyStream.toByteArray()), 
valueStream.toByteArray());
     } catch (Exception e) {
       Throwables.throwIfUnchecked(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index 5fa499a..a548ba7 100644
--- 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -7,10 +7,14 @@ import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.log4j.BasicConfigurator;
+import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -64,17 +68,18 @@ public class WordCountTest {
     String input = "/Users/peihe/github/beam/LICENSE";
     String output =  "./output";
     MapReducePipelineOptions options = 
PipelineOptionsFactory.as(MapReducePipelineOptions.class);
-    options.setJarClass(this.getClass());
+    //options.setJarClass(this.getClass());
     options.setRunner(MapReduceRunner.class);
     Pipeline p = Pipeline.create(options);
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords 
transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
     p.apply("ReadLines", TextIO.read().from(input))
+        .apply(Window.<String>into(SlidingWindows.of(Duration.millis(100))))
         .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-//        .apply(MapElements.via(new FormatAsTextFn()))
-//        .apply("WriteCounts", TextIO.write().to(output));
+        .apply(Count.<String>perElement())
+        .apply(MapElements.via(new FormatAsTextFn()));
+        //.apply("WriteCounts", TextIO.write().to(output));
 
     p.run();
   }

Reply via email to