BEAM-844 Add README, wordcount test, fix View.AsIterable, initial GBK watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a21550f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a21550f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a21550f7

Branch: refs/heads/master
Commit: a21550f7ae3004e460ca6dfb33102a9fb191356c
Parents: 51af7e5
Author: Thomas Weise <[email protected]>
Authored: Thu Nov 3 05:40:46 2016 +0100
Committer: Thomas Weise <[email protected]>
Committed: Fri Nov 4 23:01:17 2016 +0100

----------------------------------------------------------------------
 runners/apex/README.md                          |  76 ++++++++
 runners/apex/pom.xml                            |   1 +
 .../apache/beam/runners/apex/ApexRunner.java    |  82 ++++++++
 .../functions/ApexGroupByKeyOperator.java       |   2 +-
 .../io/ApexReadUnboundedInputOperator.java      |   2 +-
 .../apex/translators/utils/ApexStreamTuple.java |  21 ++-
 .../apex/examples/StreamingWordCountTest.java   | 121 ------------
 .../runners/apex/examples/WordCountTest.java    | 188 +++++++++++++++++++
 .../translators/ApexGroupByKeyOperatorTest.java | 112 +++++++++++
 .../translators/GroupByKeyTranslatorTest.java   |   1 +
 runners/apex/src/test/resources/words.txt       |   3 +
 11 files changed, 484 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/README.md
----------------------------------------------------------------------
diff --git a/runners/apex/README.md b/runners/apex/README.md
new file mode 100644
index 0000000..c9e47a1
--- /dev/null
+++ b/runners/apex/README.md
@@ -0,0 +1,76 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+Apex Beam Runner ﴾Apex‐Runner﴿
+=============================
+
+Apex‐Runner is a Runner for Apache Beam which executes Beam pipelines with 
Apache Apex as underlying engine. The runner has broad support for the Beam 
model and supports streaming and batch pipelines. 
+
+[Apache Apex](http://apex.apache.org/) is a stream processing platform and 
framework for low-latency, high-throughput and fault-tolerant analytics 
applications on Apache Hadoop. Apex is Java based and also provides its own API 
for application development (native compositional and declarative Java API, 
SQL) with a comprehensive [operator 
library](https://github.com/apache/apex-malhar). Apex has a unified streaming 
architecture and can be used for real-time and batch processing. With its 
stateful stream processing architecture Apex can support all of the concepts in 
the Beam model (event time, triggers, watermarks etc.).
+
+##Status
+
+Apex-Runner is relatively new. It is fully functional and can currently be 
used to run pipelines in embedded mode. It does not take advantage of all the 
performance and scalability that Apex can deliver. This is expected to be 
addressed with upcoming work, leveraging features like incremental 
checkpointing, partitioning and operator affinity from Apex. Please see 
[JIRA](https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20component%20%3D%20runner-apex%20AND%20resolution%20%3D%20Unresolved)
 and we welcome contributions!
+
+##Getting Started
+
+The following shows how to run the WordCount example that is provided with the 
source code on Apex (the example is identical with the one provided as part of 
the Beam examples). 
+
+###Installing Beam
+
+To get the latest version of Beam with Apex-Runner, first clone the Beam 
repository:
+
+```
+git clone https://github.com/apache/incubator‐beam
+```
+
+Then switch to the newly created directory and run Maven to build the Apache 
Beam:
+
+```
+cd incubator‐beam
+mvn clean install ‐DskipTests
+```
+
+Now Apache Beam and the Apex Runner are installed in your local Maven 
repository.
+
+###Running an Example
+
+Download something to count:
+
+```
+curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+```
+
+Run the pipeline, using the Apex runner:
+
+```
+cd examples/java
+mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=/tmp/kinglear.txt --output=/tmp/wordcounts.txt 
--runner=ApexRunner" -Pinclude-runners
+```
+
+Once completed, there will be multiple output files with the base name given 
above:
+
+```
+$ ls /tmp/out-*
+/tmp/out-00000-of-00003  /tmp/out-00001-of-00003  /tmp/out-00002-of-00003
+```
+
+##Running pipelines on an Apex YARN cluster
+
+Coming soon.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index d4bcc3d..1ca61b9 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -215,6 +215,7 @@
               <ignoredUsedUndeclaredDependencies>
                 
<ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
                 
<ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
+                
<ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.1</ignoredUsedUndeclaredDependency>
                 
<ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
                 
<ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.2.1</ignoredUsedUndeclaredDependency>
                 
<ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 416e99c..661308d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -25,12 +25,16 @@ import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.beam.runners.apex.translators.TranslationContext;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -104,6 +108,10 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
       PTransform<InputT, OutputT> customTransform = (PTransform)
           new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) 
transform);
       return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsIterable.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = (PTransform)
+          new StreamingViewAsIterable<InputT>(this, (View.AsIterable) 
transform);
+      return Pipeline.applyTransform(input, customTransform);
     } else {
       return super.apply(transform, input);
     }
@@ -317,4 +325,78 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     }
   }
 
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> 
transform) {
+    }
+
+    @Override
+    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view = 
PCollectionViews.iterableView(input.getPipeline(),
+          input.getWindowingStrategy(), input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index d69aeab..98f3eca 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -103,7 +103,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
   private transient ProcessContext context;
   private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
   private transient ApexTimerInternals timerInternals = new 
ApexTimerInternals();
-  private Instant inputWatermark = new Instant(0);
+  private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, 
V>>>> input =
       new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 0e2b0c2..61236ca 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -140,7 +140,7 @@ public class ApexReadUnboundedInputOperator<OutputT, 
CheckpointMarkT
         Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
         if (traceTuples) {
-          LOG.debug("\nemitting {}\n", data);
+          LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
         }
         output.emit(DataTuple.of(WindowedValue.of(
             data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index 7f8b0fa..25518dc 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
@@ -109,6 +110,22 @@ public interface ApexStreamTuple<T> {
     public void setTimestamp(long timestamp) {
       this.timestamp = timestamp;
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimestampedTuple)) {
+        return false;
+      } else {
+        TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
+        return (timestamp == other.timestamp) && 
Objects.equals(this.getValue(), other.getValue());
+      }
+    }
+
   }
 
   /**
@@ -164,10 +181,10 @@ public interface ApexStreamTuple<T> {
         throws CoderException, IOException {
       int b = inStream.read();
       if (b == 1) {
-        return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
+        return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
       } else {
         int unionTag = inStream.read();
-        return new DataTuple<T>(valueCoder.decode(inStream, context), 
unionTag);
+        return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
deleted file mode 100644
index 363e669..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.examples;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Windowed word count example on Apex runner.
- */
-public class StreamingWordCountTest {
-
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
-      }
-
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(FormatAsStringFn.class);
-    static final ConcurrentHashMap<String, Long> RESULTS = new 
ConcurrentHashMap<>();
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue()
-          + " @ " + c.timestamp().toString();
-      LOG.debug("output {}", row);
-      c.output(row);
-      RESULTS.put(c.element().getKey(), c.element().getValue());
-    }
-  }
-
-  @Test
-  public void testWindowedWordCount() throws Exception {
-    String[] args = new String[] {
-        "--runner=" + ApexRunner.class.getName()
-    };
-    ApexPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
-        .as(ApexPipelineOptions.class);
-    options.setApplicationName("StreamingWordCount");
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<KV<String, Long>> wordCounts =
-        p.apply(Read.from(new UnboundedTextSource()))
-            .apply(ParDo.of(new ExtractWordsFn()))
-            
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
-            .apply(Count.<String>perElement());
-
-    wordCounts.apply(ParDo.of(new FormatAsStringFn()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    
Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (FormatAsStringFn.RESULTS.containsKey("foo")
-          && FormatAsStringFn.RESULTS.containsKey("bar")) {
-        break;
-      }
-      result.waitUntilFinish(Duration.millis(1000));
-    }
-    result.cancel();
-    Assert.assertTrue(
-        FormatAsStringFn.RESULTS.containsKey("foo") && 
FormatAsStringFn.RESULTS.containsKey("bar"));
-    FormatAsStringFn.RESULTS.clear();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
new file mode 100644
index 0000000..28bb8ad
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.examples;
+
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.io.FileUtils;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Windowed word count example on Apex runner.
+ */
+public class WordCountTest {
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    private static final long serialVersionUID = 1L;
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue()
+          + " @ " + c.timestamp().toString();
+      c.output(row);
+    }
+  }
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private static final long serialVersionUID = 1L;
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /**
+   * Options for word count example.
+   */
+  public interface WordCountOptions extends ApexPipelineOptions {
+    @Description("Path of the file to read from")
+    @Validation.Required
+    String getInputFile();
+    void setInputFile(String value);
+
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) {
+    WordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
+      .as(WordCountOptions.class);
+    Pipeline p = Pipeline.create(options);
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .apply(Count.<String>perElement())
+      .apply(ParDo.of(new FormatAsStringFn()))
+      .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+      ;
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWordCountExample() throws Exception {
+    PipelineOptionsFactory.register(WordCountOptions.class);
+    WordCountOptions options = 
TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
+    options.setRunner(TestApexRunner.class);
+    options.setApplicationName("StreamingWordCount");
+    String inputFile = WordCountTest.class.getResource("/words.txt").getFile();
+    options.setInputFile(new File(inputFile).getAbsolutePath());
+    String outputFilePrefix = "target/wordcountresult.txt";
+    options.setOutput(outputFilePrefix);
+    WordCountTest.main(TestPipeline.convertToArgs(options));
+
+    File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
+    File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
+    Assert.assertTrue(outFile1.exists() && outFile2.exists());
+    HashSet<String> results = new HashSet<>();
+    results.addAll(FileUtils.readLines(outFile1));
+    results.addAll(FileUtils.readLines(outFile2));
+    HashSet<String> expectedOutput = Sets.newHashSet(
+        "foo - 5 @ 294247-01-09T04:00:54.775Z",
+        "bar - 5 @ 294247-01-09T04:00:54.775Z"
+    );
+    Assert.assertEquals("expected output", expectedOutput, results);
+  }
+
+  static class CollectResultsFn extends DoFn<KV<String, Long>, String> {
+    static final ConcurrentHashMap<String, Long> RESULTS = new 
ConcurrentHashMap<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      RESULTS.put(c.element().getKey(), c.element().getValue());
+    }
+  }
+
+  @Test
+  public void testWindowedWordCount() throws Exception {
+    String[] args = new String[] {
+        "--runner=" + ApexRunner.class.getName()
+    };
+    ApexPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(ApexPipelineOptions.class);
+    options.setApplicationName("StreamingWordCount");
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<KV<String, Long>> wordCounts =
+        p.apply(Read.from(new UnboundedTextSource()))
+            .apply(ParDo.of(new ExtractWordsFn()))
+            
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new CollectResultsFn()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    
Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (CollectResultsFn.RESULTS.containsKey("foo")
+          && CollectResultsFn.RESULTS.containsKey("bar")) {
+        break;
+      }
+      result.waitUntilFinish(Duration.millis(1000));
+    }
+    result.cancel();
+    Assert.assertTrue(
+        CollectResultsFn.RESULTS.containsKey("foo") && 
CollectResultsFn.RESULTS.containsKey("bar"));
+    CollectResultsFn.RESULTS.clear();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
new file mode 100644
index 0000000..3e8d575
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators;
+
+import com.datatorrent.api.Sink;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import 
org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
+import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ApexGroupByKeyOperator}.
+ */
+public class ApexGroupByKeyOperatorTest {
+
+  @Test
+  public void testGlobalWindowMinTimestamp() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
+        Duration.standardSeconds(10)));
+    PCollection<KV<String, Integer>> input = 
PCollection.createPrimitiveOutputInternal(pipeline,
+        ws, IsBounded.BOUNDED);
+    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    ApexGroupByKeyOperator<String, Integer> operator = new 
ApexGroupByKeyOperator<>(options,
+        input, new ApexStateInternals.ApexStateInternalsFactory<String>()
+        );
+
+    final List<Object> results = Lists.newArrayList();
+    Sink<Object> sink =  new Sink<Object>() {
+      @Override
+      public void put(Object tuple) {
+        results.add(tuple);
+      }
+      @Override
+      public int getCount(boolean reset) {
+        return 0;
+      }
+    };
+    operator.output.setSink(sink);
+    operator.setup(null);
+    operator.beginWindow(1);
+
+    Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    BoundedWindow window = new IntervalWindow(windowStart, 
windowStart.plus(10000));
+    PaneInfo paneInfo = PaneInfo.NO_FIRING;
+
+    WindowedValue<KV<String, Integer>> wv1 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv1));
+
+    WindowedValue<KV<String, Integer>> wv2 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+
+    ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark =
+        
ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+    Assert.assertEquals("number outputs", 0, results.size());
+    operator.input.process(watermark);
+    Assert.assertEquals("number outputs", 2, results.size());
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>> 
dataTuple =
+        (ApexStreamTuple.DataTuple) results.get(0);
+    List<Integer> counts = Lists.newArrayList(1, 1);
+    Assert.assertEquals("iterable", KV.of("foo", counts), 
dataTuple.getValue().getValue());
+    Assert.assertEquals("expected watermark", watermark, results.get(1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
index cb764d6..e67e29e 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
@@ -242,4 +242,5 @@ public class GroupByKeyTranslatorTest {
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a21550f7/runners/apex/src/test/resources/words.txt
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/words.txt 
b/runners/apex/src/test/resources/words.txt
new file mode 100644
index 0000000..94151ee
--- /dev/null
+++ b/runners/apex/src/test/resources/words.txt
@@ -0,0 +1,3 @@
+
+foo foo foo bar bar
+foo foo bar bar bar

Reply via email to