mr-runner: support reduce side ParDos and WordCount.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6a3a18d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6a3a18d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6a3a18d Branch: refs/heads/mr-runner Commit: c6a3a18d2c71c8f523deb54b323f26408c7de207 Parents: d09fb42 Author: Pei He <p...@apache.org> Authored: Thu Jul 27 10:52:32 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:48 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 2 +- .../mapreduce/translation/BeamMapper.java | 12 +- .../mapreduce/translation/BeamReducer.java | 56 ++++--- .../runners/mapreduce/translation/Graph.java | 4 +- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 + .../mapreduce/translation/JobPrototype.java | 164 ++++++++++++++----- .../mapreduce/translation/Operation.java | 8 +- .../mapreduce/translation/OutputReceiver.java | 3 +- .../ReifyTimestampAndWindowsParDoOperation.java | 46 ++++++ .../translation/WindowAssignOperation.java | 75 +++++++++ .../mapreduce/translation/WriteOperation.java | 13 +- .../beam/runners/mapreduce/WordCountTest.java | 13 +- 12 files changed, 318 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index d18eee8..226c5c0 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>2.1.0-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index 11ecc8d..b5e4edc 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -16,7 +16,7 @@ public class BeamMapper<ValueInT, ValueOutT> public static final String BEAM_PAR_DO_OPERATION_MAPPER = "beam-par-do-op-mapper"; - private ParDoOperation parDoOperation; + private Operation operation; @Override protected void setup( @@ -24,9 +24,9 @@ public class BeamMapper<ValueInT, ValueOutT> String serializedParDo = checkNotNull( context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER), BEAM_PAR_DO_OPERATION_MAPPER); - parDoOperation = (ParDoOperation) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(serializedParDo), "ParDoOperation"); - parDoOperation.start((TaskInputOutputContext) context); + operation = (Operation) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedParDo), "Operation"); + operation.start((TaskInputOutputContext) context); } @Override @@ -34,12 +34,12 @@ public class BeamMapper<ValueInT, ValueOutT> Object key, WindowedValue<ValueInT> value, Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { - parDoOperation.process(value); + operation.process(value); } @Override protected void cleanup( Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { - parDoOperation.finish(); + operation.finish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java index 8eb7938..9b8bd82 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java @@ -5,14 +5,17 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.io.IOException; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.NullableCoder; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -20,49 +23,64 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; * Created by peihe on 25/07/2017. */ public class BeamReducer<ValueInT, ValueOutT> - extends Reducer<Object, byte[], Object, WindowedValue<ValueOutT>> { + extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>> { + public static final String BEAM_REDUCER_KV_CODER = "beam-reducer-kv-coder"; public static final String BEAM_PAR_DO_OPERATION_REDUCER = "beam-par-do-op-reducer"; - private ParDoOperation parDoOperation; + private Coder<Object> keyCoder; + private Coder<Object> valueCoder; + private Operation operation; @Override protected void setup( - Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + String serializedValueCoder = checkNotNull( + context.getConfiguration().get(BEAM_REDUCER_KV_CODER), + BEAM_REDUCER_KV_CODER); String serializedParDo = checkNotNull( context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER), BEAM_PAR_DO_OPERATION_REDUCER); - parDoOperation = (ParDoOperation) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(serializedParDo), "ParDoOperation"); - parDoOperation.start((TaskInputOutputContext) context); + KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) SerializableUtils + .deserializeFromByteArray(Base64.decodeBase64(serializedValueCoder), "Coder"); + keyCoder = kvCoder.getKeyCoder(); + valueCoder = kvCoder.getValueCoder(); + operation = (Operation) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedParDo), "Operation"); + operation.start((TaskInputOutputContext) context); } @Override protected void reduce( - Object key, + BytesWritable key, Iterable<byte[]> values, - Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { - Iterable<Object> decodedValues = FluentIterable.from(values) + Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + List<Object> decodedValues = Lists.newArrayList(FluentIterable.from(values) .transform(new Function<byte[], Object>() { @Override public Object apply(byte[] input) { ByteArrayInputStream inStream = new ByteArrayInputStream(input); try { - // TODO: setup coders. - return NullableCoder.of(BigEndianLongCoder.of()).decode(inStream); + return valueCoder.decode(inStream); } catch (IOException e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - } - }); - parDoOperation.process( - WindowedValue.valueInGlobalWindow(KV.of(key, decodedValues))); + }})); + + try { + operation.process( + WindowedValue.valueInGlobalWindow( + KV.of(keyCoder.decode(new ByteArrayInputStream(key.getBytes())), decodedValues))); + } catch (IOException e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } } @Override protected void cleanup( - Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { - parDoOperation.finish(); + Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + operation.finish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index 867d1af..e360419 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -18,6 +18,7 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.lang.builder.ReflectionToStringBuilder; @@ -127,7 +128,8 @@ public class Graph { public void accept(GraphVisitor visitor) { PTransform<?, ?> transform = step.getTransform(); - if (transform instanceof ParDo.SingleOutput || transform instanceof ParDo.MultiOutput) { + if (transform instanceof ParDo.SingleOutput || transform instanceof ParDo.MultiOutput + || transform instanceof Window.Assign) { visitor.visitParDo(this); } else if (transform instanceof GroupByKey) { visitor.visitGroupByKey(this); http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java index 0b8a876..8ee616d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -15,6 +15,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -90,6 +91,10 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends // Finally, advance the processing time to infinity to fire any timers. timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + runner.onTimers(timerInternals.getTimers(TimeDomain.EVENT_TIME)); + runner.onTimers(timerInternals.getTimers(TimeDomain.PROCESSING_TIME)); + runner.onTimers(timerInternals.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + runner.persist(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 34266f4..576c6bf 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -12,14 +12,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; @@ -69,72 +75,150 @@ public class JobPrototype { // Setup DoFns in BeamMapper. // TODO: support more than one in-path. Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths()); - List<Graph.Step> parDos = new ArrayList<>(); - parDos.addAll(FluentIterable.from(inPath.steps()) - .filter(new Predicate<Graph.Step>() { - @Override - public boolean apply(Graph.Step input) { - PTransform<?, ?> transform = input.getTransform(); - return transform instanceof ParDo.SingleOutput - || transform instanceof ParDo.MultiOutput; - }}) - .toList()); + + Operation mapperParDoRoot = chainParDosInPath(inPath); + Operation mapperParDoTail = getTailOperation(mapperParDoRoot); Graph.Step vertexStep = vertex.getStep(); if (vertexStep.getTransform() instanceof ParDo.SingleOutput - || vertexStep.getTransform() instanceof ParDo.MultiOutput) { - parDos.add(vertexStep); - } - - ParDoOperation root = null; - ParDoOperation prev = null; - for (Graph.Step step : parDos) { - ParDoOperation current = new NormalParDoOperation( - getDoFn(step.getTransform()), + || vertexStep.getTransform() instanceof ParDo.MultiOutput + || vertexStep.getTransform() instanceof Window.Assign) { + // TODO: add a TailVertex type to simplify the translation. + Operation vertexParDo = translateToOperation(vertexStep); + Operation mapperWrite = new WriteOperation( + getKeyCoder(inEdge.getCoder()), + getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy())); + mapperParDoTail.attachOutput(vertexParDo, 0); + vertexParDo.attachOutput(mapperWrite, 0); + } else if (vertexStep.getTransform() instanceof GroupByKey) { + Operation reifyOperation = new ReifyTimestampAndWindowsParDoOperation( PipelineOptionsFactory.create(), - (TupleTag<Object>) step.getOutputs().iterator().next(), + new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), - step.getWindowingStrategy()); - if (root == null) { - root = current; - } else { - // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero. - current.attachInput(prev, 0); - } - prev = current; + vertexStep.getWindowingStrategy()); + Operation mapperWrite = new WriteOperation( + getKeyCoder(inEdge.getCoder()), + getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy())); + mapperParDoTail.attachOutput(reifyOperation, 0); + reifyOperation.attachOutput(mapperWrite, 0); + } else { + throw new UnsupportedOperationException("Transform: " + vertexStep.getTransform()); } - // TODO: get coders from pipeline. - WriteOperation writeOperation = new WriteOperation(inEdge.getCoder()); - writeOperation.attachInput(prev, 0); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(byte[].class); conf.set( BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root))); + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(mapperParDoRoot))); job.setMapperClass(BeamMapper.class); if (vertexStep.getTransform() instanceof GroupByKey) { // Setup BeamReducer - ParDoOperation operation = new GroupAlsoByWindowsParDoOperation( + Operation gabwOperation = new GroupAlsoByWindowsParDoOperation( PipelineOptionsFactory.create(), (TupleTag<Object>) vertexStep.getOutputs().iterator().next(), ImmutableList.<TupleTag<?>>of(), vertexStep.getWindowingStrategy(), inEdge.getCoder()); - // TODO: handle the map output key type. - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(byte[].class); + Graph.Edge outEdge = Iterables.getOnlyElement(vertex.getOutgoing()); + Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths()); + Operation reducerParDoRoot = chainParDosInPath(outPath); + Operation reducerParDoTail = getTailOperation(reducerParDoRoot); + + Operation reducerTailParDo = translateToOperation(outEdge.getTail().getStep()); + if (reducerParDoRoot == null) { + gabwOperation.attachOutput(reducerTailParDo, 0); + } else { + gabwOperation.attachOutput(reducerParDoRoot, 0); + reducerParDoTail.attachOutput(reducerTailParDo, 0); + } + conf.set( + BeamReducer.BEAM_REDUCER_KV_CODER, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray( + KvCoder.of( + getKeyCoder(inEdge.getCoder()), + getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy()))))); conf.set( BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray(operation))); + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(gabwOperation))); job.setReducerClass(BeamReducer.class); } job.setOutputFormatClass(NullOutputFormat.class); return job; } - private DoFn<Object, Object> getDoFn(PTransform<?, ?> transform) { + private Coder<Object> getKeyCoder(Coder<?> coder) { + KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); + return kvCoder.getKeyCoder(); + } + + private Coder<Object> getReifyValueCoder( + Coder<?> coder, WindowingStrategy<?, ?> windowingStrategy) { + KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); + return (Coder) WindowedValue.getFullCoder( + kvCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder()); + } + + private Operation getTailOperation(@Nullable Operation operation) { + if (operation == null) { + return null; + } + if (operation.getOutputReceivers().isEmpty()) { + return operation; + } + OutputReceiver receiver = Iterables.getOnlyElement(operation.getOutputReceivers()); + if (receiver.getReceivingOperations().isEmpty()) { + return operation; + } + return getTailOperation(Iterables.getOnlyElement(receiver.getReceivingOperations())); + } + + private Operation chainParDosInPath(Graph.NodePath path) { + List<Graph.Step> parDos = new ArrayList<>(); + // TODO: we should not need this filter. + parDos.addAll(FluentIterable.from(path.steps()) + .filter(new Predicate<Graph.Step>() { + @Override + public boolean apply(Graph.Step input) { + PTransform<?, ?> transform = input.getTransform(); + return !(transform instanceof Read.Bounded); + }}) + .toList()); + + Operation root = null; + Operation prev = null; + for (Graph.Step step : parDos) { + Operation current = translateToOperation(step); + if (prev == null) { + root = current; + } else { + // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero. + prev.attachOutput(current, 0); + } + prev = current; + } + return root; + } + + private Operation translateToOperation(Graph.Step parDoStep) { + PTransform<?, ?> transform = parDoStep.getTransform(); + DoFn<Object, Object> doFn; if (transform instanceof ParDo.SingleOutput) { - return ((ParDo.SingleOutput) transform).getFn(); + return new NormalParDoOperation( + ((ParDo.SingleOutput) transform).getFn(), + PipelineOptionsFactory.create(), + (TupleTag<Object>) parDoStep.getOutputs().iterator().next(), + ImmutableList.<TupleTag<?>>of(), + parDoStep.getWindowingStrategy()); + } else if (transform instanceof ParDo.MultiOutput) { + return new NormalParDoOperation( + ((ParDo.MultiOutput) transform).getFn(), + PipelineOptionsFactory.create(), + (TupleTag<Object>) parDoStep.getOutputs().iterator().next(), + ImmutableList.<TupleTag<?>>of(), + parDoStep.getWindowingStrategy()); + } else if (transform instanceof Window.Assign) { + return new WindowAssignOperation<>(1, parDoStep.getWindowingStrategy().getWindowFn()); } else { - return ((ParDo.MultiOutput) transform).getFn(); + throw new UnsupportedOperationException("Transform: " + transform); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java index 5700e89..6951909 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -60,10 +60,10 @@ public abstract class Operation implements Serializable { } /** - * Adds an input to this ParDoOperation, coming from the given output of the given source. + * Adds an output to this Operation. */ - public void attachInput(Operation source, int outputNum) { - OutputReceiver fanOut = source.receivers[outputNum]; - fanOut.addOutput(this); + public void attachOutput(Operation output, int outputNum) { + OutputReceiver fanOut = receivers[outputNum]; + fanOut.addOutput(output); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java index 3347672..6aeefd2 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java @@ -23,8 +23,7 @@ import java.util.ArrayList; import java.util.List; /** - * OutputReceiver that forwards each input it receives to each of a list of down stream - * ParDoOperations. + * OutputReceiver that forwards each input it receives to each of a list of down stream operations. */ public class OutputReceiver implements Serializable { private final List<Operation> receivingOperations = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java new file mode 100644 index 0000000..ec954bb --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java @@ -0,0 +1,46 @@ +package org.apache.beam.runners.mapreduce.translation; + +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Created by peihe on 27/07/2017. + */ +public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation { + + public ReifyTimestampAndWindowsParDoOperation( + PipelineOptions options, + TupleTag<Object> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + WindowingStrategy<?, ?> windowingStrategy) { + super(options, mainOutputTag, sideOutputTags, windowingStrategy); + } + + @Override + DoFn<Object, Object> getDoFn() { + return (DoFn) new ReifyTimestampAndWindowsDoFn<>(); + } + + public class ReifyTimestampAndWindowsDoFn<K, V> + extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + KV<K, V> kv = c.element(); + K key = kv.getKey(); + V value = kv.getValue(); + c.output(KV.of( + key, + WindowedValue.of( + value, + c.timestamp(), + window, + c.pane()))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java new file mode 100644 index 0000000..144ef16 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java @@ -0,0 +1,75 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; + +/** + * Created by peihe on 27/07/2017. + */ +public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation { + private final WindowFn<T, W> windowFn; + + public WindowAssignOperation(int numOutputs, WindowFn<T, W> windowFn) { + super(numOutputs); + this.windowFn = checkNotNull(windowFn, "windowFn"); + } + + @Override + public void process(Object elem) { + WindowedValue windowedValue = (WindowedValue) elem; + try { + Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, windowedValue)); + for (W window : windows) { + OutputReceiver receiver = Iterables.getOnlyElement(getOutputReceivers()); + receiver.process(WindowedValue.of( + windowedValue.getValue(), + windowedValue.getTimestamp(), + window, + windowedValue.getPane())); + } + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + private class AssignContextInternal<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + AssignContextInternal(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java index 97201d0..0585032 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java @@ -7,8 +7,10 @@ import java.io.ByteArrayOutputStream; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.TaskInputOutputContext; @@ -18,15 +20,14 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; public class WriteOperation extends Operation { private final Coder<Object> keyCoder; - private final Coder<Object> nullableValueCoder; + private final Coder<Object> valueCoder; private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; - public WriteOperation(Coder<?> coder) { + public WriteOperation(Coder<Object> keyCoder, Coder<Object> valueCoder) { super(0); - KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); - this.keyCoder = kvCoder.getKeyCoder(); - this.nullableValueCoder = NullableCoder.of(kvCoder.getValueCoder()); + this.keyCoder = checkNotNull(keyCoder, "keyCoder"); + this.valueCoder = checkNotNull(valueCoder, "valueCoder"); } @Override @@ -42,7 +43,7 @@ public class WriteOperation extends Operation { keyCoder.encode(windowedElem.getValue().getKey(), keyStream); ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); - nullableValueCoder.encode(windowedElem.getValue().getValue(), valueStream); + valueCoder.encode(windowedElem.getValue().getValue(), valueStream); taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray()); } catch (Exception e) { Throwables.throwIfUnchecked(e); http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java index 5fa499a..a548ba7 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -7,10 +7,14 @@ import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.log4j.BasicConfigurator; +import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -64,17 +68,18 @@ public class WordCountTest { String input = "/Users/peihe/github/beam/LICENSE"; String output = "./output"; MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class); - options.setJarClass(this.getClass()); + //options.setJarClass(this.getClass()); options.setRunner(MapReduceRunner.class); Pipeline p = Pipeline.create(options); // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. p.apply("ReadLines", TextIO.read().from(input)) + .apply(Window.<String>into(SlidingWindows.of(Duration.millis(100)))) .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); -// .apply(MapElements.via(new FormatAsTextFn())) -// .apply("WriteCounts", TextIO.write().to(output)); + .apply(Count.<String>perElement()) + .apply(MapElements.via(new FormatAsTextFn())); + //.apply("WriteCounts", TextIO.write().to(output)); p.run(); }