Repository: incubator-beam
Updated Branches:
  refs/heads/apex-runner [created] c9f140666


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
new file mode 100644
index 0000000..efb69ee
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+
+public interface ApexStreamTuple<T>
+{
+  /**
+   * Gets the value of the tuple
+   *
+   * @return
+   */
+  T getValue();
+
+  /**
+   * Plain tuple class
+   *
+   * @param <T>
+   */
+  class DataTuple<T> implements ApexStreamTuple<T>
+  {
+    private T value;
+
+    public static <T> DataTuple<T> of(T value) {
+      return new DataTuple<>(value);
+    }
+
+    private DataTuple(T value)
+    {
+      this.value = value;
+    }
+
+    @Override
+    public T getValue()
+    {
+      return value;
+    }
+
+    public void setValue(T value)
+    {
+      this.value = value;
+    }
+
+    @Override
+    public String toString()
+    {
+      return value.toString();
+    }
+
+  }
+
+  /**
+   * Tuple that includes a timestamp
+   *
+   * @param <T>
+   */
+  class TimestampedTuple<T> extends DataTuple<T>
+  {
+    private long timestamp;
+
+    public TimestampedTuple(long timestamp, T value)
+    {
+      super(value);
+      this.timestamp = timestamp;
+    }
+
+    public long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  }
+
+  /**
+   * Tuple that represents a watermark
+   *
+   * @param <T>
+   */
+  class WatermarkTuple<T> extends TimestampedTuple<T>
+  {
+    public static <T> WatermarkTuple<T> of(long timestamp) {
+      return new WatermarkTuple<>(timestamp);
+    }
+
+    protected WatermarkTuple(long timestamp)
+    {
+      super(timestamp, null);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "[Watermark " + getTimestamp() + "]";
+    }
+  }
+
+  /**
+   * Coder for {@link ApexStreamTuple}.
+   */
+  public static class ApexStreamTupleCoder<T> extends 
StandardCoder<ApexStreamTuple<T>> {
+    private static final long serialVersionUID = 1L;
+    final Coder<T> valueCoder;
+
+    public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
+      return new ApexStreamTupleCoder<>(valueCoder);
+    }
+
+    protected ApexStreamTupleCoder(Coder<T> valueCoder) {
+      this.valueCoder = checkNotNull(valueCoder);
+    }
+
+    @Override
+    public void encode(ApexStreamTuple<T> value, OutputStream outStream, 
Context context)
+        throws CoderException, IOException {
+      if (value instanceof WatermarkTuple) {
+        outStream.write(1);
+        new 
DataOutputStream(outStream).writeLong(((WatermarkTuple<?>)value).getTimestamp());
+      } else {
+        outStream.write(0);
+        valueCoder.encode(value.getValue(), outStream, context);
+      }
+    }
+
+    @Override
+    public ApexStreamTuple<T> decode(InputStream inStream, Context context)
+        throws CoderException, IOException
+    {
+      int b = inStream.read();
+      if (b == 1) {
+        return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
+      } else {
+        return new DataTuple<T>(valueCoder.decode(inStream, context));
+      }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments()
+    {
+      return Arrays.<Coder<?>>asList(valueCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException
+    {
+      verifyDeterministic(
+          this.getClass().getSimpleName() + " requires a deterministic 
valueCoder",
+          valueCoder);
+    }
+
+    /**
+     * Returns the value coder.
+     */
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
new file mode 100644
index 0000000..c18765b
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+import com.google.common.base.Throwables;
+
+/**
+ * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
+ */
+public class CoderAdapterStreamCodec implements StreamCodec<Object>, 
Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private final Coder<? super Object> coder;
+
+  public CoderAdapterStreamCodec(Coder<? super Object> coder) {
+    this.coder = coder;
+  }
+
+  @Override
+  public Object fromByteArray(Slice fragment)
+  {
+    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, 
fragment.offset, fragment.length);
+    try {
+      return coder.decode(bis, Context.OUTER);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Slice toByteArray(Object wv)
+  {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      coder.encode(wv, bos, Context.OUTER);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+    return new Slice(bos.toByteArray());
+  }
+
+  @Override
+  public int getPartition(Object o)
+  {
+    return o.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
new file mode 100644
index 0000000..ffe1a29
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+/**
+ * no-op side input reader.
+ */
+public class NoOpSideInputReader implements SideInputReader, Serializable {
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    return null;
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return false;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
new file mode 100644
index 0000000..43d92f6
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Serializable {@link ExecutionContext.StepContext} that does nothing.
+ */
+public class NoOpStepContext implements ExecutionContext.StepContext, 
Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public String getStepName() {
+    return null;
+  }
+
+  @Override
+  public String getTransformName() {
+    return null;
+  }
+
+  @Override
+  public void noteOutput(WindowedValue<?> output) {
+  }
+
+  @Override
+  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void 
writePCollectionViewData(TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> 
windowCoder) throws
+      IOException {
+
+  }
+
+  @Override
+  public StateInternals<?> stateInternals() {
+    return null;
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
new file mode 100644
index 0000000..7f7b3ef
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A wrapper to enable serialization of {@link PipelineOptions} 
+ */
+public class SerializablePipelineOptions implements Externalizable {
+
+  private transient ApexPipelineOptions pipelineOptions;
+  
+  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  public SerializablePipelineOptions() {
+  }
+  
+  public ApexPipelineOptions get() {
+    return this.pipelineOptions;
+  }
+  
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException
+  {
+    out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException
+  {
+    String s = in.readUTF();
+    this.pipelineOptions = new ObjectMapper().readValue(s, 
PipelineOptions.class).as(ApexPipelineOptions.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
new file mode 100644
index 0000000..582d839
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.examples;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Windowed word count example on Apex runner.
+ */
+public class StreamingWordCountTest {
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Aggregator<Long, Long> emptyLines =
+        createAggregator("emptyLines", new Sum.SumLongFn());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.addValue(1L);
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FormatAsStringFn.class);
+    static final ConcurrentHashMap<String, Long> RESULTS = new 
ConcurrentHashMap<>();
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      String row = c.element().getKey() + " - " + c.element().getValue() +
+          " @ " + c.timestamp().toString();
+      LOG.debug("output {}", row);
+      c.output(row);
+      RESULTS.put(c.element().getKey(), c.element().getValue());
+    }
+  }
+
+  @Test
+  public void testWindowedWordCount() throws Exception {
+    String[] args = new String[] {
+        "--runner=" + ApexRunner.class.getName()
+    };
+    ApexPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(ApexPipelineOptions.class);
+    options.setApplicationName("StreamingWordCount");
+    options.setParallelism(1);
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<KV<String, Long>> wordCounts =
+        p.apply(Read.from(new UnboundedTextSource()))
+            .apply(ParDo.of(new ExtractWordsFn()))
+            
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(Count.<String>perElement());
+
+    wordCounts.apply(ParDo.of(new FormatAsStringFn()));
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    
Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (FormatAsStringFn.RESULTS.containsKey("foo") && 
FormatAsStringFn.RESULTS.containsKey("bar")) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    result.cancel();
+    Assert.assertTrue(FormatAsStringFn.RESULTS.containsKey("foo") && 
FormatAsStringFn.RESULTS.containsKey("bar"));
+    FormatAsStringFn.RESULTS.clear();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
new file mode 100644
index 0000000..29351e9
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.examples;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from text.
+ */
+public class UnboundedTextSource extends UnboundedSource<String, 
UnboundedSource.CheckpointMark> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public List<? extends UnboundedSource<String, CheckpointMark>> 
generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.<UnboundedSource<String, 
CheckpointMark>>singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<String> createReader(PipelineOptions options,
+      @Nullable CheckpointMark checkpointMark) {
+    return new UnboundedTextReader(this);
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+  /**
+   * reads from text.
+   */
+  public static class UnboundedTextReader extends UnboundedReader<String> 
implements Serializable {
+
+    private static final long serialVersionUID = 7526472295622776147L;
+
+    private final UnboundedTextSource source;
+
+    private final String[] texts = new String[]{"foo foo foo bar bar", "foo 
foo bar bar bar"};
+    private long index = 0;
+
+    private String currentRecord;
+
+    private Instant currentTimestamp;
+
+    public UnboundedTextReader(UnboundedTextSource source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      currentRecord = texts[0];
+      currentTimestamp = new Instant(0);
+      return true;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      index++;
+      currentRecord = texts[(int) index % (texts.length)];
+      currentTimestamp = new Instant(index * 1000);
+      try {
+        Thread.sleep(index); // allow for downstream processing to complete
+      } catch (InterruptedException e) {
+        Throwables.propagate(e);
+      }
+      return true;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+      return new byte[0];
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      return this.currentRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return currentTimestamp;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return currentTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<String, ?> getCurrentSource() {
+      return this.source;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
new file mode 100644
index 0000000..d3b56bc
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * integration test for {@link FlattenPCollectionTranslator}.
+ */
+public class FlattenPCollectionTranslatorTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class);
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options =
+        PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setApplicationName("FlattenPCollection");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    List<String> collection1 = Lists.newArrayList("1", "2", "3");
+    List<String> collection2 = Lists.newArrayList("4", "5");
+    List<String> expected = Lists.newArrayList("1", "2", "3", "4", "5");
+    PCollection<String> pc1 =
+        p.apply(Create.of(collection1).withCoder(StringUtf8Coder.of()));
+    PCollection<String> pc2 =
+        p.apply(Create.of(collection2).withCoder(StringUtf8Coder.of()));
+    PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2);
+    PCollection<String> actual = pcs.apply(Flatten.<String>pCollections());
+    actual.apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    // TODO: verify translation
+    result.getApexDAG();
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.results.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    org.junit.Assert.assertEquals(Sets.newHashSet(expected), 
EmbeddedCollector.results);
+
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> results = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      results.add(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
new file mode 100644
index 0000000..e4d4606
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+
+import com.datatorrent.api.DAG;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * integration test for {@link GroupByKeyTranslator}.
+ */
+public class GroupByKeyTranslatorTest {
+
+  @SuppressWarnings({"unchecked"})
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options =
+        PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setApplicationName("GroupByKey");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    List<KV<String, Instant>> data =
+        Lists.newArrayList(
+            KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)),
+            KV.of("foo", new Instant(2000)),
+            KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)),
+            KV.of("bar", new Instant(2000))
+        );
+
+    // expected results assume outputAtLatestInputTimestamp
+    List<KV<Instant, KV<String, Long>>> expected =
+        Lists.newArrayList(
+            KV.of(new Instant(1000), KV.of("foo", 2L)),
+            KV.of(new Instant(1000), KV.of("bar", 1L)),
+            KV.of(new Instant(2000), KV.of("foo", 1L)),
+            KV.of(new Instant(2000), KV.of("bar", 2L))
+        );
+
+    p.apply(Read.from(new TestSource(data, new Instant(5000))))
+        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+        .apply(Count.<String>perElement())
+        .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
+        .apply(ParDo.of(new EmbeddedCollector()))
+        ;
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    // TODO: verify translation
+    DAG dag = result.getApexDAG();
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.results.containsAll(expected)) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> results = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      results.add(c.element());
+    }
+  }
+
+  private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(KV.of(c.timestamp(), c.element()));
+    }
+  }
+
+  private static class TestSource extends UnboundedSource<String, 
UnboundedSource.CheckpointMark> {
+
+    private final List<KV<String, Instant>> data;
+    private final Instant watermark;
+
+    public TestSource(List<KV<String, Instant>> data, Instant watermark) {
+      this.data = data;
+      this.watermark = watermark;
+    }
+
+    @Override
+    public List<? extends UnboundedSource<String, CheckpointMark>> 
generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      return Collections.<UnboundedSource<String, 
CheckpointMark>>singletonList(this);
+    }
+
+    @Override
+    public UnboundedReader<String> createReader(PipelineOptions options,
+        @Nullable CheckpointMark checkpointMark) {
+      return new TestReader(data, watermark, this);
+    }
+
+    @Nullable
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+      return null;
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    private static class TestReader extends UnboundedReader<String> implements 
Serializable {
+
+      private static final long serialVersionUID = 7526472295622776147L;
+
+      private final List<KV<String, Instant>> data;
+      private final TestSource source;
+
+      private Iterator<KV<String, Instant>> iterator;
+      private String currentRecord;
+      private Instant currentTimestamp;
+      private Instant watermark;
+      private boolean collected;
+
+      public TestReader(List<KV<String, Instant>> data, Instant watermark, 
TestSource source) {
+        this.data = data;
+        this.source = source;
+        this.watermark = watermark;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        iterator = data.iterator();
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (iterator.hasNext()) {
+          KV<String, Instant> kv = iterator.next();
+          collected = false;
+          currentRecord = kv.getKey();
+          currentTimestamp = kv.getValue();
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public byte[] getCurrentRecordId() throws NoSuchElementException {
+        return new byte[0];
+      }
+
+      @Override
+      public String getCurrent() throws NoSuchElementException {
+        collected = true;
+        return this.currentRecord;
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return currentTimestamp;
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public Instant getWatermark() {
+        if (!iterator.hasNext() && collected) {
+          return watermark;
+        } else {
+          return new Instant(0);
+        }
+      }
+
+      @Override
+      public CheckpointMark getCheckpointMark() {
+        return null;
+      }
+
+      @Override
+      public UnboundedSource<String, ?> getCurrentSource() {
+        return this.source;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
new file mode 100644
index 0000000..06aaf55
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.datatorrent.api.DAG;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * integration test for {@link ParDoBoundTranslator}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoBoundTranslatorTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setApplicationName("ParDoBound");
+    options.setRunner(ApexRunner.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    List<Integer> collection = Lists.newArrayList(1, 2, 3, 4, 5);
+    List<Integer> expected = Lists.newArrayList(6, 7, 8, 9, 10);
+    
p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class)))
+        .apply(ParDo.of(new Add(5)))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    DAG dag = result.getApexDAG();
+
+    DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
+
+    om = dag.getOperatorMeta("ParDo(Add)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.results.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+  }
+
+  @SuppressWarnings("serial")
+  private static class Add extends OldDoFn<Integer, Integer> {
+    private final Integer number;
+
+    public Add(Integer number) {
+      this.number = number;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() + number);
+    }
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> results = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      results.add(c.element());
+    }
+  }
+
+
+  @Ignore
+  @Test
+  public void testAssertionFailure() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(ApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<Integer> pcollection = pipeline
+        .apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
+
+    Throwable exc = runExpectingAssertionFailure(pipeline);
+    Pattern expectedPattern = Pattern.compile(
+        "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any 
order");
+    // A loose pattern, but should get the job done.
+    assertTrue(
+        "Expected error message from PAssert with substring matching "
+            + expectedPattern
+            + " but the message was \""
+            + exc.getMessage()
+            + "\"",
+        expectedPattern.matcher(exc.getMessage()).find());
+  }
+
+  private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
+    // We cannot use thrown.expect(AssertionError.class) because the 
AssertionError
+    // is first caught by JUnit and causes a test failure.
+    try {
+      pipeline.run();
+    } catch (AssertionError exc) {
+      return exc;
+    }
+    fail("assertion should have failed");
+    throw new RuntimeException("unreachable");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
new file mode 100644
index 0000000..6260632
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.ApexRunner;
+import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.utils.CollectionSource;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+
+import com.datatorrent.api.DAG;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * integration test for {@link ReadUnboundedTranslator}.
+ */
+public class ReadUnboundTranslatorTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadUnboundTranslatorTest.class);
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    EmbeddedCollector.results.clear();
+    options.setApplicationName("ReadUnbound");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    List<String> collection = Lists.newArrayList("1", "2", "3", "4", "5");
+    CollectionSource<String> source = new CollectionSource<>(collection, 
StringUtf8Coder.of());
+    p.apply(Read.from(source))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    DAG dag = result.getApexDAG();
+    DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.results.containsAll(collection)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(collection), 
EmbeddedCollector.results);
+  }
+
+  @Test
+  public void testReadBounded() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    EmbeddedCollector.results.clear();
+    options.setApplicationName("ReadBounded");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), 
DiscreteDomain.longs());
+    p.apply(Read.from(CountingSource.upTo(10)))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    DAG dag = result.getApexDAG();
+    DAG.OperatorMeta om = 
dag.getOperatorMeta("Read(BoundedCountingSource)/Read(BoundedCountingSource)/Read(BoundedToUnboundedSourceAdapter)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.results.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> results = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      results.add(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
new file mode 100644
index 0000000..a1e8b3e
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators.utils;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * collection as {@link UnboundedSource}, used for tests.
+ */
+public class CollectionSource<T> extends UnboundedSource<T, 
UnboundedSource.CheckpointMark> {
+
+  private final Collection<T> collection;
+  private final Coder<T> coder;
+
+  public CollectionSource(Collection<T> collection, Coder<T> coder) {
+    this.collection = collection;
+    this.coder = coder;
+  }
+
+  @Override
+  public List<? extends UnboundedSource<T, CheckpointMark>> 
generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options,
+      @Nullable UnboundedSource.CheckpointMark checkpointMark) {
+    return new CollectionReader<>(collection, this);
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return coder;
+  }
+
+  private static class CollectionReader<T> extends 
UnboundedSource.UnboundedReader<T>
+      implements Serializable {
+
+    private T current;
+    private final CollectionSource<T> source;
+    private final Collection<T> collection;
+    private Iterator<T> iterator;
+
+    public CollectionReader(Collection<T> collection, CollectionSource<T> 
source) {
+      this.collection = collection;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      if (null == iterator) {
+        iterator = collection.iterator();
+      }
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public UnboundedSource.CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<T, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return current;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
new file mode 100644
index 0000000..e2fa9d9
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datatorrent.common.util.FSStorageAgent;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Tests the serialization of PipelineOptions.
+ */
+public class PipelineOptionsTest {
+
+  public interface MyOptions extends ApexPipelineOptions {
+    @Description("Bla bla bla")
+    @Default.String("Hello")
+    String getTestOption();
+    void setTestOption(String value);
+  }
+
+  private static class MyOptionsWrapper {
+    private MyOptionsWrapper() {
+      this(null); // required for Kryo
+    }
+    private MyOptionsWrapper(ApexPipelineOptions options) {
+      this.options = new SerializablePipelineOptions(options);
+    }
+    @Bind(JavaSerializer.class)
+    private final SerializablePipelineOptions options;
+  }
+
+  private static MyOptions options;
+
+  private final static String[] args = new String[]{"--testOption=nothing"};
+
+  @BeforeClass
+  public static void beforeTest() {
+    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+  }
+
+  @Test
+  public void testSerialization() {
+    MyOptionsWrapper wrapper = new 
MyOptionsWrapper(PipelineOptionsTest.options);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    FSStorageAgent.store(bos, wrapper);
+
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    MyOptionsWrapper wrapperCopy = 
(MyOptionsWrapper)FSStorageAgent.retrieve(bis);
+    assertNotNull(wrapperCopy.options);
+    assertEquals("nothing", 
wrapperCopy.options.get().as(MyOptions.class).getTestOption());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/log4j.properties 
b/runners/apex/src/test/resources/log4j.properties
new file mode 100644
index 0000000..84a6f68
--- /dev/null
+++ b/runners/apex/src/test/resources/log4j.properties
@@ -0,0 +1,33 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=DEBUG, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.beam.runners.apex=debug

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 605c3b2..ff800d1 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -37,6 +37,7 @@
     <module>direct-java</module>
     <module>flink</module>
     <module>spark</module>
+    <module>apex</module>
   </modules>
 
   <profiles>

Reply via email to