Repository: beam
Updated Branches:
  refs/heads/master 462335caf -> 0cba43ee2


Add TestStream translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cedc61f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cedc61f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cedc61f

Branch: refs/heads/master
Commit: 0cedc61ffb59a08a4b5205a5a224fd9fa906f7a7
Parents: 47cea78
Author: Kenneth Knowles <k...@google.com>
Authored: Tue May 30 14:41:46 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Jun 2 10:06:52 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     |   1 +
 .../construction/TestStreamTranslation.java     | 156 +++++++++++++++++++
 .../construction/TestStreamTranslationTest.java | 129 +++++++++++++++
 3 files changed, 286 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index b2f06ac..fd3f9f3 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -50,6 +50,7 @@ public class PTransformTranslation {
   public static final String GROUP_BY_KEY_TRANSFORM_URN = 
"urn:beam:transform:groupbykey:v1";
   public static final String READ_TRANSFORM_URN = "urn:beam:transform:read:v1";
   public static final String WINDOW_TRANSFORM_URN = 
"urn:beam:transform:window:v1";
+  public static final String TEST_STREAM_TRANSFORM_URN = 
"urn:beam:transform:teststream:v1";
 
   // Less well-known. And where shall these live?
   public static final String WRITE_FILES_TRANSFORM_URN = 
"urn:beam:transform:write_files:0.1";

http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
new file mode 100644
index 0000000..90e6304
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.service.AutoService;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility methods for translating a {@link TestStream} to and from {@link 
RunnerApi}
+ * representations.
+ */
+public class TestStreamTranslation {
+
+  static <T> RunnerApi.TestStreamPayload testStreamToPayload(
+      TestStream<T> transform, SdkComponents components) throws IOException {
+    String coderId = components.registerCoder(transform.getValueCoder());
+
+    RunnerApi.TestStreamPayload.Builder builder =
+        RunnerApi.TestStreamPayload.newBuilder().setCoderId(coderId);
+
+    for (TestStream.Event<T> event : transform.getEvents()) {
+      builder.addEvents(toProto(event, transform.getValueCoder()));
+    }
+
+    return builder.build();
+  }
+
+  static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> 
event, Coder<T> coder)
+      throws IOException {
+    switch (event.getType()) {
+      case WATERMARK:
+        return RunnerApi.TestStreamPayload.Event.newBuilder()
+            .setWatermarkEvent(
+                RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder()
+                    .setNewWatermark(
+                        ((TestStream.WatermarkEvent<T>) 
event).getWatermark().getMillis()))
+            .build();
+
+      case PROCESSING_TIME:
+        return RunnerApi.TestStreamPayload.Event.newBuilder()
+            .setProcessingTimeEvent(
+                
RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder()
+                    .setAdvanceDuration(
+                        ((TestStream.ProcessingTimeEvent<T>) event)
+                            .getProcessingTimeAdvance()
+                            .getMillis()))
+            .build();
+
+      case ELEMENT:
+        RunnerApi.TestStreamPayload.Event.AddElements.Builder builder =
+            RunnerApi.TestStreamPayload.Event.AddElements.newBuilder();
+        for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) 
event).getElements()) {
+          builder.addElements(
+              RunnerApi.TestStreamPayload.TimestampedElement.newBuilder()
+                  .setTimestamp(element.getTimestamp().getMillis())
+                  .setEncodedElement(
+                      ByteString.copyFrom(
+                          CoderUtils.encodeToByteArray(coder, 
element.getValue()))));
+        }
+        return 
RunnerApi.TestStreamPayload.Event.newBuilder().setElementEvent(builder).build();
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unsupported type of %s: %s",
+                TestStream.Event.class.getCanonicalName(), event.getType()));
+    }
+  }
+
+  static <T> TestStream.Event<T> fromProto(
+      RunnerApi.TestStreamPayload.Event protoEvent, Coder<T> coder) throws 
IOException {
+    switch (protoEvent.getEventCase()) {
+      case WATERMARK_EVENT:
+        return TestStream.WatermarkEvent.advanceTo(
+            new Instant(protoEvent.getWatermarkEvent().getNewWatermark()));
+      case PROCESSING_TIME_EVENT:
+        return TestStream.ProcessingTimeEvent.advanceBy(
+            
Duration.millis(protoEvent.getProcessingTimeEvent().getAdvanceDuration()));
+      case ELEMENT_EVENT:
+        List<TimestampedValue<T>> decodedElements = new ArrayList<>();
+        for (RunnerApi.TestStreamPayload.TimestampedElement element :
+            protoEvent.getElementEvent().getElementsList()) {
+          decodedElements.add(
+              TimestampedValue.of(
+                  CoderUtils.decodeFromByteArray(coder, 
element.getEncodedElement().toByteArray()),
+                  new Instant(element.getTimestamp())));
+        }
+        return TestStream.ElementEvent.add(decodedElements);
+      case EVENT_NOT_SET:
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unsupported type of %s: %s",
+                RunnerApi.TestStreamPayload.Event.class.getCanonicalName(),
+                protoEvent.getEventCase()));
+    }
+  }
+
+  static class TestStreamTranslator implements 
TransformPayloadTranslator<TestStream<?>> {
+    @Override
+    public String getUrn(TestStream<?> transform) {
+      return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+    }
+
+    @Override
+    public RunnerApi.FunctionSpec translate(
+        AppliedPTransform<?, ?, TestStream<?>> transform, SdkComponents 
components)
+        throws IOException {
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setParameter(Any.pack(testStreamToPayload(transform.getTransform(), 
components)))
+          .build();
+    }
+  }
+
+  /** Registers {@link TestStreamTranslator}. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class Registrar implements TransformPayloadTranslatorRegistrar 
{
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(TestStream.class, new 
TestStreamTranslator());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0cedc61f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
new file mode 100644
index 0000000..b2029be
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import 
org.apache.beam.runners.core.construction.TestStreamTranslationTest.TestStreamPayloadTranslation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Suite;
+
+/** Tests for {@link TestStreamTranslation}. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  TestStreamPayloadTranslation.class,
+})
+public class TestStreamTranslationTest {
+
+  /** Tests for translating various {@link ParDo} transforms to/from {@link 
ParDoPayload} protos. */
+  @RunWith(Parameterized.class)
+  public static class TestStreamPayloadTranslation {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<TestStream<?>> data() {
+      return ImmutableList.<TestStream<?>>of(
+          TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity(),
+          TestStream.create(VarIntCoder.of())
+              .advanceWatermarkTo(new Instant(42))
+              .advanceWatermarkToInfinity(),
+          TestStream.create(VarIntCoder.of())
+              .addElements(TimestampedValue.of(3, new Instant(17)))
+              .advanceWatermarkToInfinity(),
+          TestStream.create(StringUtf8Coder.of())
+              .advanceProcessingTime(Duration.millis(82))
+              .advanceWatermarkToInfinity());
+    }
+
+    @Parameter(0)
+    public TestStream<String> testStream;
+
+    public static TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+    @Test
+    public void testEncodedProto() throws Exception {
+      SdkComponents components = SdkComponents.create();
+      RunnerApi.TestStreamPayload payload =
+          TestStreamTranslation.testStreamToPayload(testStream, components);
+
+      verifyTestStreamEncoding(testStream, payload, components.toComponents());
+    }
+
+    @Test
+    public void testRegistrarEncodedProto() throws Exception {
+      PCollection<String> output = p.apply(testStream);
+
+      AppliedPTransform<PBegin, PCollection<String>, TestStream<String>> 
appliedTestStream =
+          AppliedPTransform.<PBegin, PCollection<String>, 
TestStream<String>>of(
+              "fakeName", PBegin.in(p).expand(), output.expand(), testStream, 
p);
+
+      SdkComponents components = SdkComponents.create();
+      RunnerApi.FunctionSpec spec =
+          PTransformTranslation.toProto(appliedTestStream, 
components).getSpec();
+
+      assertThat(spec.getUrn(), equalTo(TEST_STREAM_TRANSFORM_URN));
+
+      RunnerApi.TestStreamPayload payload =
+          spec.getParameter().unpack(RunnerApi.TestStreamPayload.class);
+
+      verifyTestStreamEncoding(testStream, payload, components.toComponents());
+    }
+
+    private static <T> void verifyTestStreamEncoding(
+        TestStream<T> testStream,
+        RunnerApi.TestStreamPayload payload,
+        RunnerApi.Components protoComponents)
+        throws Exception {
+
+      // This reverse direction is only valid for Java-based coders
+      assertThat(
+          CoderTranslation.fromProto(
+              protoComponents.getCodersOrThrow(payload.getCoderId()), 
protoComponents),
+          Matchers.<Coder<?>>equalTo(testStream.getValueCoder()));
+
+      assertThat(payload.getEventsList().size(), 
equalTo(testStream.getEvents().size()));
+
+      for (int i = 0; i < payload.getEventsList().size(); ++i) {
+        assertThat(
+            TestStreamTranslation.fromProto(payload.getEvents(i), 
testStream.getValueCoder()),
+            equalTo(testStream.getEvents().get(i)));
+      }
+    }
+  }
+}

Reply via email to