This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new c4170bf [NEMO-129] Support Beam's WindowedWordCount example (#123)
c4170bf is described below
commit c4170bf59217e29ee64478350ba42fc95b1ee2d1
Author: Taegeon Um <[email protected]>
AuthorDate: Fri Oct 19 11:13:29 2018 +0900
[NEMO-129] Support Beam's WindowedWordCount example (#123)
JIRA: [NEMO-129: Support Beam's WindowedWordCount
example](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-129)
**Major changes:**
- Create `GroupByKeyAndWindowDoFnTransform` to group elements according to
the key and window. Use `GroupAlsoByWindowViaWindowSetNewDoFn` that is a
wrapper class for group by key and window in beam. If there is no window, use
existing `GroupByKeyTransform` for performance.
**Minor changes to note:**
- refactor `DoFnTransform` and create `AbstractDoFnTransform` to reuse
codes in both `DoFnTransform` and `GroupByKeyAndWindowDoFnTransform`
**Tests for the changes:**
- test windowed word count (fixed window and sliding window)
**Other comments:**
-
Closes #123
---
.../compiler/frontend/beam/PipelineTranslator.java | 42 ++++-
...FnTransform.java => AbstractDoFnTransform.java} | 111 ++++++++----
.../frontend/beam/transform/DoFnTransform.java | 98 +----------
.../GroupByKeyAndWindowDoFnTransform.java | 196 +++++++++++++++++++++
.../beam/transform/GroupByKeyTransform.java | 5 +-
.../frontend/beam/transform/DoFnTransformTest.java | 7 -
.../nemo/examples/beam/GenericSourceSink.java | 7 +-
.../nemo/examples/beam/WindowedWordCount.java | 91 ++++++++++
.../nemo/examples/beam/WriteOneFilePerWindow.java | 102 +++++++++++
.../examples/beam/WindowedWordCountITCase.java | 87 +++++++++
.../expected_output_sliding_windowed_wordcount | 18 ++
.../resources/expected_output_windowed_wordcount | 11 ++
examples/resources/test_input_windowed_wordcount | 15 ++
examples/resources/test_input_wordcount | 2 +-
14 files changed, 639 insertions(+), 153 deletions(-)
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 2486a00..a4e5d1b 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -16,6 +16,7 @@
package org.apache.nemo.compiler.frontend.beam;
import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.Pipeline;
@@ -48,10 +49,7 @@ import java.io.IOException;
import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -198,11 +196,40 @@ public final class PipelineTranslator
pValueWithTupleTag.getKey()));
}
+ /**
+ * Create a group by key transform.
+ * It returns GroupByKeyAndWindowDoFnTransform if window function is not
default.
+ * @param ctx translation context
+ * @param transformVertex transform vertex
+ * @return group by key transform
+ */
+ private static Transform createGBKTransform(
+ final TranslationContext ctx,
+ final TransformVertex transformVertex) {
+ final AppliedPTransform pTransform =
transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final PCollection<?> mainInput = (PCollection<?>)
+
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+ final TupleTag mainOutputTag = new TupleTag<>();
+
+ if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault())
{
+ return new GroupByKeyTransform();
+ } else {
+ return new GroupByKeyAndWindowDoFnTransform(
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ Collections.emptyList(), /* GBK does not have additional outputs */
+ mainInput.getWindowingStrategy(),
+ Collections.emptyList(), /* GBK does not have additional side inputs
*/
+ ctx.pipelineOptions,
+ SystemReduceFn.buffering(mainInput.getCoder()));
+ }
+ }
+
@PrimitiveTransformTranslator(GroupByKey.class)
private static void groupByKeyTranslator(final TranslationContext ctx,
final PrimitiveTransformVertex
transformVertex,
final GroupByKey<?, ?> transform) {
- final IRVertex vertex = new OperatorVertex(new GroupByKeyTransform());
+ final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx,
transformVertex));
ctx.addVertex(vertex);
transformVertex.getNode().getInputs().values().forEach(input ->
ctx.addEdgeTo(vertex, input));
transformVertex.getNode().getOutputs().values().forEach(output ->
ctx.registerMainOutputFrom(vertex, output));
@@ -297,7 +324,7 @@ public final class PipelineTranslator
// Attempt to translate the CompositeTransform again.
// Add GroupByKey, which is the first transform in the given
CompositeTransform.
// Make sure it consumes the output from the last vertex in
OneToOneEdge-translated hierarchy.
- final IRVertex groupByKeyIRVertex = new OperatorVertex(new
GroupByKeyTransform());
+ final IRVertex groupByKeyIRVertex = new
OperatorVertex(createGBKTransform(ctx, transformVertex));
ctx.addVertex(groupByKeyIRVertex);
last.getNode().getOutputs().values().forEach(outputFromCombiner
-> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
@@ -617,7 +644,8 @@ public final class PipelineTranslator
if (srcTransform instanceof FlattenTransform) {
return CommunicationPatternProperty.Value.OneToOne;
}
- if (dstTransform instanceof GroupByKeyTransform) {
+ if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+ || dstTransform instanceof GroupByKeyTransform) {
return CommunicationPatternProperty.Value.Shuffle;
}
if (dstTransform instanceof CreateViewTransform) {
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
similarity index 60%
copy from
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
copy to
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 8dbf051..8679c73 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -35,41 +35,49 @@ import java.util.List;
import java.util.Map;
/**
- * DoFn transform implementation.
+ * This is a base class for Beam DoFn Transforms.
*
* @param <InputT> input type.
+ * @param <InterT> intermediate type.
* @param <OutputT> output type.
*/
-public final class DoFnTransform<InputT, OutputT> implements
+public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
- private OutputCollector<WindowedValue<OutputT>> outputCollector;
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> additionalOutputTags;
private final Collection<PCollectionView<?>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
- private final DoFn<InputT, OutputT> doFn;
+ private final DoFn<InterT, OutputT> doFn;
private final SerializablePipelineOptions serializedOptions;
- private transient DoFnRunner<InputT, OutputT> doFnRunner;
- private transient SideInputReader sideInputReader;
- private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
private final Coder<InputT> inputCoder;
private final Map<TupleTag<?>, Coder<?>> outputCoders;
+ private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
+ private transient DoFnRunner<InterT, OutputT> doFnRunner;
+ private transient SideInputReader sideInputReader;
+ private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
+ private transient DoFnRunners.OutputManager outputManager;
+
/**
- * DoFnTransform Constructor.
- *
- * @param doFn doFn.
- * @param options Pipeline options.
+ * AbstractDoFnTransform constructor.
+ * @param doFn doFn
+ * @param inputCoder input coder
+ * @param outputCoders output coders
+ * @param mainOutputTag main output tag
+ * @param additionalOutputTags additional output tags
+ * @param windowingStrategy windowing strategy
+ * @param sideInputs side inputs
+ * @param options pipeline options
*/
- public DoFnTransform(final DoFn<InputT, OutputT> doFn,
- final Coder<InputT> inputCoder,
- final Map<TupleTag<?>, Coder<?>> outputCoders,
- final TupleTag<OutputT> mainOutputTag,
- final List<TupleTag<?>> additionalOutputTags,
- final WindowingStrategy<?, ?> windowingStrategy,
- final Collection<PCollectionView<?>> sideInputs,
- final PipelineOptions options) {
+ public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
+ final Coder<InputT> inputCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<OutputT> mainOutputTag,
+ final List<TupleTag<?>> additionalOutputTags,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final Collection<PCollectionView<?>> sideInputs,
+ final PipelineOptions options) {
this.doFn = doFn;
this.inputCoder = inputCoder;
this.outputCoders = outputCoders;
@@ -80,15 +88,38 @@ public final class DoFnTransform<InputT, OutputT> implements
this.windowingStrategy = windowingStrategy;
}
+ protected final DoFnRunners.OutputManager getOutputManager() {
+ return outputManager;
+ }
+
+ protected final WindowingStrategy getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ protected final SideInputReader getSideInputReader() {
+ return sideInputReader;
+ }
+
+ protected final TupleTag<OutputT> getMainOutputTag() {
+ return mainOutputTag;
+ }
+
+ protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+ return doFnRunner;
+ }
+
+ public final DoFn getDoFn() {
+ return doFn;
+ }
+
@Override
- public void prepare(final Context context, final
OutputCollector<WindowedValue<OutputT>> oc) {
+ public final void prepare(final Context context, final
OutputCollector<WindowedValue<OutputT>> oc) {
// deserialize pipeline option
final NemoPipelineOptions options =
serializedOptions.get().as(NemoPipelineOptions.class);
-
this.outputCollector = oc;
// create output manager
- final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
+ outputManager = new DefaultOutputManager<>(
outputCollector, context, mainOutputTag);
// create side input reader
@@ -112,15 +143,17 @@ public final class DoFnTransform<InputT, OutputT>
implements
}
};
+ final DoFn wrappedDoFn = wrapDoFn(doFn);
+
// invoker
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker = DoFnInvokers.invokerFor(wrappedDoFn);
doFnInvoker.invokeSetup();
// DoFnRunners.simpleRunner takes care of all the hard stuff of running
the DoFn
// and that this approach is the standard used by most of the Beam runners
doFnRunner = DoFnRunners.simpleRunner(
options,
- doFn,
+ wrappedDoFn,
sideInputReader,
outputManager,
mainOutputTag,
@@ -134,24 +167,24 @@ public final class DoFnTransform<InputT, OutputT>
implements
}
@Override
- public void onData(final WindowedValue<InputT> data) {
- doFnRunner.processElement(data);
- }
-
- public DoFn getDoFn() {
- return doFn;
- }
-
- @Override
- public void close() {
+ public final void close() {
+ beforeClose();
doFnRunner.finishBundle();
doFnInvoker.invokeTeardown();
}
+ /**
+ * An abstract function that wraps the original doFn.
+ * @param originalDoFn the original doFn.
+ * @return wrapped doFn.
+ */
+ abstract DoFn wrapDoFn(final DoFn originalDoFn);
+
@Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("DoTransform:" + doFn);
- return sb.toString();
- }
+ public abstract void onData(final WindowedValue<InputT> data);
+
+ /**
+ * An abstract function that is called before close.
+ */
+ abstract void beforeClose();
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 8dbf051..76cd84b 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -15,20 +15,13 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;
-import org.apache.beam.runners.core.*;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import java.util.Collection;
import java.util.List;
@@ -40,21 +33,7 @@ import java.util.Map;
* @param <InputT> input type.
* @param <OutputT> output type.
*/
-public final class DoFnTransform<InputT, OutputT> implements
- Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
-
- private OutputCollector<WindowedValue<OutputT>> outputCollector;
- private final TupleTag<OutputT> mainOutputTag;
- private final List<TupleTag<?>> additionalOutputTags;
- private final Collection<PCollectionView<?>> sideInputs;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final DoFn<InputT, OutputT> doFn;
- private final SerializablePipelineOptions serializedOptions;
- private transient DoFnRunner<InputT, OutputT> doFnRunner;
- private transient SideInputReader sideInputReader;
- private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
- private final Coder<InputT> inputCoder;
- private final Map<TupleTag<?>, Coder<?>> outputCoders;
+public final class DoFnTransform<InputT, OutputT> extends
AbstractDoFnTransform<InputT, InputT, OutputT> {
/**
* DoFnTransform Constructor.
@@ -70,88 +49,29 @@ public final class DoFnTransform<InputT, OutputT> implements
final WindowingStrategy<?, ?> windowingStrategy,
final Collection<PCollectionView<?>> sideInputs,
final PipelineOptions options) {
- this.doFn = doFn;
- this.inputCoder = inputCoder;
- this.outputCoders = outputCoders;
- this.mainOutputTag = mainOutputTag;
- this.additionalOutputTags = additionalOutputTags;
- this.sideInputs = sideInputs;
- this.serializedOptions = new SerializablePipelineOptions(options);
- this.windowingStrategy = windowingStrategy;
+ super(doFn, inputCoder, outputCoders, mainOutputTag,
+ additionalOutputTags, windowingStrategy, sideInputs, options);
}
@Override
- public void prepare(final Context context, final
OutputCollector<WindowedValue<OutputT>> oc) {
- // deserialize pipeline option
- final NemoPipelineOptions options =
serializedOptions.get().as(NemoPipelineOptions.class);
-
- this.outputCollector = oc;
-
- // create output manager
- final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
- outputCollector, context, mainOutputTag);
-
- // create side input reader
- if (!sideInputs.isEmpty()) {
- sideInputReader = new BroadcastVariableSideInputReader(context,
sideInputs);
- } else {
- sideInputReader = NullSideInputReader.of(sideInputs);
- }
-
- // create step context
- // this transform does not support state and timer.
- final StepContext stepContext = new StepContext() {
- @Override
- public StateInternals stateInternals() {
- throw new UnsupportedOperationException("Not support stateInternals in
DoFnTransform");
- }
-
- @Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException("Not support timerInternals in
DoFnTransform");
- }
- };
-
- // invoker
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
-
- // DoFnRunners.simpleRunner takes care of all the hard stuff of running
the DoFn
- // and that this approach is the standard used by most of the Beam runners
- doFnRunner = DoFnRunners.simpleRunner(
- options,
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- additionalOutputTags,
- stepContext,
- inputCoder,
- outputCoders,
- windowingStrategy);
-
- doFnRunner.startBundle();
+ protected DoFn wrapDoFn(final DoFn initDoFn) {
+ return initDoFn;
}
@Override
public void onData(final WindowedValue<InputT> data) {
- doFnRunner.processElement(data);
- }
-
- public DoFn getDoFn() {
- return doFn;
+ getDoFnRunner().processElement(data);
}
@Override
- public void close() {
- doFnRunner.finishBundle();
- doFnInvoker.invokeTeardown();
+ protected void beforeClose() {
+ // nothing
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append("DoTransform:" + doFn);
+ sb.append("DoTransform:" + getDoFn());
return sb.toString();
}
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
new file mode 100644
index 0000000..4827e69
--- /dev/null
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Groups elements according to key and window.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ */
+public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
+ extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>,
KV<K, Iterable<InputT>>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
+
+ private final SystemReduceFn reduceFn;
+ private final Map<K, List<WindowedValue<InputT>>> keyToValues;
+ private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+
+ /**
+ * GroupByKey constructor.
+ */
+ public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>>
outputCoders,
+ final TupleTag<KV<K,
Iterable<InputT>>> mainOutputTag,
+ final List<TupleTag<?>>
additionalOutputTags,
+ final WindowingStrategy<?, ?>
windowingStrategy,
+ final Collection<PCollectionView<?>>
sideInputs,
+ final PipelineOptions options,
+ final SystemReduceFn reduceFn) {
+ super(null, /* doFn */
+ null, /* inputCoder */
+ outputCoders,
+ mainOutputTag,
+ additionalOutputTags,
+ windowingStrategy,
+ sideInputs,
+ options);
+ this.keyToValues = new HashMap<>();
+ this.reduceFn = reduceFn;
+ }
+
+ /**
+ * This creates a new DoFn that groups elements by key and window.
+ * @param doFn original doFn.
+ * @return GroupAlsoByWindowViaWindowSetNewDoFn
+ */
+ @Override
+ protected DoFn wrapDoFn(final DoFn doFn) {
+ timerInternalsFactory = new InMemoryTimerInternalsFactory();
+ // This function performs group by key and window operation
+ return
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ getWindowingStrategy(),
+ new InMemoryStateInternalsFactory(),
+ timerInternalsFactory,
+ getSideInputReader(),
+ reduceFn,
+ getOutputManager(),
+ getMainOutputTag());
+ }
+
+ @Override
+ public void onData(final WindowedValue<KV<K, InputT>> element) {
+ final KV<K, InputT> kv = element.getValue();
+ keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+ keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+ }
+
+ /**
+ * This advances the input watermark and processing time to the timestamp
max value
+ * in order to emit all data.
+ */
+ @Override
+ protected void beforeClose() {
+ final InMemoryTimerInternals timerInternals =
timerInternalsFactory.timerInternals;
+ try {
+ // Finish any pending windows by advancing the input watermark to
infinity.
+ timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (keyToValues.isEmpty()) {
+ LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
+ } else {
+ // timer
+ final Iterable<TimerInternals.TimerData> timerData =
getTimers(timerInternals);
+
+ keyToValues.entrySet().stream().forEach(entry -> {
+ // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
+ // so we convert the KV to KeyedWorkItem
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
+
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ });
+ keyToValues.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("GroupByKeyAndWindowDoFnTransform:");
+ return sb.toString();
+ }
+
+ private Iterable<TimerInternals.TimerData> getTimers(final
InMemoryTimerInternals timerInternals) {
+ final List<TimerInternals.TimerData> timerData = new LinkedList<>();
+
+ while (true) {
+ TimerInternals.TimerData timer;
+ boolean hasFired = false;
+
+ while ((timer = timerInternals.removeNextEventTimer()) != null) {
+ hasFired = true;
+ timerData.add(timer);
+ }
+ while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+ hasFired = true;
+ timerData.add(timer);
+ }
+ while ((timer = timerInternals.removeNextSynchronizedProcessingTimer())
!= null) {
+ hasFired = true;
+ timerData.add(timer);
+ }
+ if (!hasFired) {
+ break;
+ }
+ }
+
+ return timerData;
+ }
+
+ /**
+ * InMemoryStateInternalsFactory.
+ */
+ final class InMemoryStateInternalsFactory implements
StateInternalsFactory<K> {
+ private final InMemoryStateInternals inMemoryStateInternals;
+
+ InMemoryStateInternalsFactory() {
+ this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+ }
+
+ @Override
+ public StateInternals stateInternalsForKey(final K key) {
+ return inMemoryStateInternals;
+ }
+ }
+
+ /**
+ * InMemoryTimerInternalsFactory.
+ */
+ final class InMemoryTimerInternalsFactory implements
TimerInternalsFactory<K> {
+ private final InMemoryTimerInternals timerInternals;
+
+ InMemoryTimerInternalsFactory() {
+ this.timerInternals = new InMemoryTimerInternals();
+ }
+
+ @Override
+ public TimerInternals timerInternalsForKey(final K key) {
+ return timerInternals;
+ }
+ }
+}
+
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 38b2641..fc122f9 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -47,7 +47,6 @@ public final class GroupByKeyTransform<I> implements
Transform<I, WindowedValue<
@Override
public void onData(final I element) {
- // TODO #129: support window in group by key for windowed groupByKey
final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
final KV kv = windowedValue.getValue();
keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
@@ -56,13 +55,12 @@ public final class GroupByKeyTransform<I> implements
Transform<I, WindowedValue<
@Override
public void close() {
- // TODO #129: support window in group by key for windowed groupByKey
if (keyToValues.isEmpty()) {
LOG.warn("Beam GroupByKeyTransform received no data!");
} else {
keyToValues.entrySet().stream().map(entry ->
WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(),
entry.getValue())))
- .forEach(outputCollector::emit);
+ .forEach(outputCollector::emit);
keyToValues.clear();
}
}
@@ -75,4 +73,3 @@ public final class GroupByKeyTransform<I> implements
Transform<I, WindowedValue<
return sb.toString();
}
}
-
diff --git
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index a30ee46..9a65e7a 100644
---
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -19,16 +19,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -43,7 +38,6 @@ import java.util.*;
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -153,7 +147,6 @@ public final class DoFnTransformTest {
doFnTransform.close();
}
-
// TODO #216: implement side input and windowing
@Test
public void testSideInputs() {
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 51fd3bd..6a09f3b 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -94,12 +94,7 @@ final class GenericSourceSink {
dataToWrite.apply(ParDo.of(new HDFSWrite(path)));
return PDone.in(dataToWrite.getPipeline());
} else {
- // (Only relevant to local file writes) withWindowedWrites() is required
for local file writes.
- // Without it, the FileResultCoder#encode, which assumes WindowedValue,
will not be able
- // to properly handle the FileResult (Beam's file metadata information),
and hang the job.
- // The root cause is that the Nemo runtime currently only supports batch
applications, and
- // does not use the Beam's WindowedValue by default.
- return dataToWrite.apply(TextIO.write().to(path).withWindowedWrites());
+ return dataToWrite.apply(TextIO.write().to(path));
}
}
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
new file mode 100644
index 0000000..913156a
--- /dev/null
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedWordCount {
+ /**
+ * Private Constructor.
+ */
+ private WindowedWordCount() {
+ }
+
+ /**
+ * Main function for the MR BEAM program.
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String inputFilePath = args[0];
+ final String outputFilePath = args[1];
+ final String windowType = args[2];
+ final Window<String> windowFn;
+ if (windowType.equals("fixed")) {
+ windowFn =
Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+ } else {
+ windowFn =
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+ .every(Duration.standardSeconds(5)));
+ }
+
+ final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("WindowedWordCount");
+
+ final Pipeline p = Pipeline.create(options);
+ GenericSourceSink.read(p, inputFilePath)
+ .apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@Element final String elem,
+ final OutputReceiver<String> out) {
+ final String[] splitt = elem.split("!");
+ out.outputWithTimestamp(splitt[0], new
Instant(Long.valueOf(splitt[1])));
+ }
+ }))
+ .apply(windowFn)
+ .apply(MapElements.<String, KV<String, Long>>via(new
SimpleFunction<String, KV<String, Long>>() {
+ @Override
+ public KV<String, Long> apply(final String line) {
+ final String[] words = line.split(" +");
+ final String documentId = words[0] + "#" + words[1];
+ final Long count = Long.parseLong(words[2]);
+ return KV.of(documentId, count);
+ }
+ }))
+ .apply(Sum.longsPerKey())
+ .apply(MapElements.<KV<String, Long>, String>via(new
SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(final KV<String, Long> kv) {
+ return kv.getKey() + ": " + kv.getValue();
+ }
+ }))
+ .apply(new WriteOneFilePerWindow(outputFilePath, null));
+ p.run();
+ }
+}
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
new file mode 100644
index 0000000..fad134d
--- /dev/null
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import javax.annotation.Nullable;
+import static
org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.MoreObjects.firstNonNull;
+
+ /**
+ * This class is brought from beam/examples/common/WriteOneFilePerWindow.java.
+ *
+ */
+public final class WriteOneFilePerWindow extends
PTransform<PCollection<String>, PDone> {
+ // change from hourMinute to hourMinuteSecond
+ private static final DateTimeFormatter FORMATTER =
ISODateTimeFormat.hourMinuteSecond();
+ private String filenamePrefix;
+ @Nullable
+ private Integer numShards;
+ public WriteOneFilePerWindow(final String filenamePrefix, final Integer
numShards) {
+ this.filenamePrefix = filenamePrefix;
+ this.numShards = numShards;
+ }
+ @Override
+ public PDone expand(final PCollection<String> input) {
+ final ResourceId resource =
FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+ TextIO.Write write =
+ TextIO.write()
+ .to(new PerWindowFiles(resource))
+ .withTempDirectory(resource.getCurrentDirectory())
+ .withWindowedWrites();
+ if (numShards != null) {
+ write = write.withNumShards(numShards);
+ }
+ return input.apply(write);
+ }
+ /**
+ * A {@link FilenamePolicy} produces a base file name for a write based on
metadata about the data
+ * being written. This always includes the shard number and the total number
of shards. For
+ * windowed writes, it also includes the window and pane index (a sequence
number assigned to each
+ * trigger firing).
+ */
+ public static final class PerWindowFiles extends
FileBasedSink.FilenamePolicy {
+ private final ResourceId baseFilename;
+ PerWindowFiles(final ResourceId baseFilename) {
+ this.baseFilename = baseFilename;
+ }
+ String filenamePrefixForWindow(final IntervalWindow window) {
+ final String prefix =
+ baseFilename.isDirectory() ? "" :
firstNonNull(baseFilename.getFilename(), "");
+ return String.format(
+ "%s-%s-%s", prefix, FORMATTER.print(window.start()),
FORMATTER.print(window.end()));
+ }
+ @Override
+ public ResourceId windowedFilename(
+ final int shardNumber,
+ final int numShards,
+ final BoundedWindow window,
+ final PaneInfo paneInfo,
+ final FileBasedSink.OutputFileHints outputFileHints) {
+ System.out.println("Windowd file name: " + window);
+ final IntervalWindow intervalWindow = (IntervalWindow) window;
+ final String filename =
+ String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(intervalWindow),
+ shardNumber,
+ numShards,
+ outputFileHints.getSuggestedFilenameSuffix());
+ return baseFilename
+ .getCurrentDirectory()
+ .resolve(filename,
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+ }
+ @Override
+ public ResourceId unwindowedFilename(
+ final int shardNumber, final int numShards, final
FileBasedSink.OutputFileHints outputFileHints) {
+ throw new UnsupportedOperationException("Unsupported.");
+ }
+ }
+}
diff --git
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
new file mode 100644
index 0000000..27ee4d8
--- /dev/null
+++
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.compiler.optimizer.policy.ConditionalLargeShufflePolicy;
+import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedWordCountITCase {
+ private static final int TIMEOUT = 120000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
+
+ private static final String inputFileName = "test_input_windowed_wordcount";
+ private static final String outputFileName =
"test_output_windowed_wordcount";
+ private static final String expectedOutputFileName =
"expected_output_windowed_wordcount";
+ private static final String expectedSlidingWindowOutputFileName =
"expected_output_sliding_windowed_wordcount";
+ private static final String executorResourceFileName = fileBasePath +
"beam_test_executor_resources.json";
+ private static final String inputFilePath = fileBasePath + inputFileName;
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ @Test (timeout = TIMEOUT)
+ public void testFixedWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addUserMain(WindowedWordCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, "fixed");
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedWordCountITCase.class.getSimpleName())
+
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName,
expectedOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+
+
+ @Test (timeout = TIMEOUT)
+ public void testSlidingWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addUserMain(WindowedWordCount.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, "sliding");
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName,
expectedSlidingWindowOutputFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }
+}
diff --git a/examples/resources/expected_output_sliding_windowed_wordcount
b/examples/resources/expected_output_sliding_windowed_wordcount
new file mode 100644
index 0000000..4437daa
--- /dev/null
+++ b/examples/resources/expected_output_sliding_windowed_wordcount
@@ -0,0 +1,18 @@
+gw#m: 10
+gw#m: 10
+gw#m: 90
+gw#m: 90
+john#m: 40
+john#m: 50
+john#m: 80
+john#m: 90
+jykim#f: 100
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 30
+mh#m: 70
+mh#m: 70
+wonook#m: 100
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/expected_output_windowed_wordcount
b/examples/resources/expected_output_windowed_wordcount
new file mode 100644
index 0000000..636bb9e
--- /dev/null
+++ b/examples/resources/expected_output_windowed_wordcount
@@ -0,0 +1,11 @@
+gw#m: 10
+gw#m: 90
+john#m: 40
+john#m: 40
+john#m: 50
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 70
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/test_input_windowed_wordcount
b/examples/resources/test_input_windowed_wordcount
new file mode 100644
index 0000000..e26179f
--- /dev/null
+++ b/examples/resources/test_input_windowed_wordcount
@@ -0,0 +1,15 @@
+wonook m 50!1536907180000
+john m 20!1536907181000
+gw m 90!1536907182000
+john m 20!1536907183000
+mh m 30!1536907184000
+john m 20!1536907185000
+wonook m 50!1536907186000
+jykim f 40!1536907187000
+john m 20!1536907188000
+jykim f 20!1536907189000
+mh m 70!1536907190000
+jykim f 40!1536907191000
+gw m 10!1536907192000
+john m 20!1536907193000
+john m 30!1536907194000
diff --git a/examples/resources/test_input_wordcount
b/examples/resources/test_input_wordcount
index b59abc4..8813dbd 100644
--- a/examples/resources/test_input_wordcount
+++ b/examples/resources/test_input_wordcount
@@ -11,4 +11,4 @@ jykim f 20
mh m 70
jykim f 40
gw m 10
-john m 20
\ No newline at end of file
+john m 20