Repository: beam
Updated Branches:
  refs/heads/master 986fcefca -> 8beea73c1


Add Coder utilities for Proto conversions

Include Known Coders


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01e5a8d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01e5a8d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01e5a8d5

Branch: refs/heads/master
Commit: 01e5a8d5c7517f511d5d7bfc524319a02e6d2e21
Parents: 986fcef
Author: Thomas Groh <[email protected]>
Authored: Fri Apr 7 11:46:24 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Tue Apr 11 09:58:37 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/Coders.java  | 162 +++++++++++++++++++
 .../core/construction/SdkComponents.java        |   5 +-
 .../runners/core/construction/CodersTest.java   |  99 ++++++++++++
 .../core/construction/SdkComponentsTest.java    |  25 ++-
 4 files changed, 288 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
new file mode 100644
index 0000000..d890de7
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** Converts to and from Beam Runner API representations of {@link Coder 
Coders}. */
+public class Coders {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  // This URN says that the coder is just a UDF blob this SDK understands
+  // TODO: standardize such things
+  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+
+  // The URNs for coders which are shared across languages
+  private static final BiMap<Class<? extends Coder>, String> KNOWN_CODER_URNS =
+      ImmutableBiMap.<Class<? extends Coder>, String>builder()
+          .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
+          .put(KvCoder.class, "urn:beam:coders:kv:0.1")
+          .put(VarIntCoder.class, "urn:beam:coders:varint:0.1")
+          .put(IntervalWindowCoder.class, 
"urn:beam:coders:interval_window:0.1")
+          .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
+          .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
+          .put(FullWindowedValueCoder.class, 
"urn:beam:coders:windowed_value:0.1")
+          .build();
+
+  public static RunnerApi.Coder toProto(
+      Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) 
throws IOException {
+    if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+      return toKnownCoder(coder, components);
+    }
+    return toCustomCoder(coder);
+  }
+
+  private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents 
components)
+      throws IOException {
+    List<String> componentIds = new ArrayList<>();
+    for (Coder<?> componentCoder : coder.getCoderArguments()) {
+      componentIds.add(components.registerCoder(componentCoder));
+    }
+    return RunnerApi.Coder.newBuilder()
+        .addAllComponentCoderIds(componentIds)
+        .setSpec(
+            SdkFunctionSpec.newBuilder()
+                
.setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass()))))
+        .build();
+  }
+
+  private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws 
IOException {
+    RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
+    return coderBuilder
+        .setSpec(
+            SdkFunctionSpec.newBuilder()
+                .setSpec(
+                    FunctionSpec.newBuilder()
+                        .setUrn(CUSTOM_CODER_URN)
+                        .setParameter(
+                            Any.pack(
+                                BytesValue.newBuilder()
+                                    .setValue(
+                                        ByteString.copyFrom(
+                                            
OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+                                    .build()))))
+        .build();
+  }
+
+  public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components 
components)
+      throws IOException {
+    String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
+    if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+      return fromCustomCoder(protoCoder, components);
+    }
+    return fromKnownCoder(protoCoder, components);
+  }
+
+  private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components 
components)
+      throws IOException {
+    String coderUrn = coder.getSpec().getSpec().getUrn();
+    List<Coder<?>> coderComponents = new LinkedList<>();
+    for (String componentId : coder.getComponentCoderIdsList()) {
+      Coder<?> innerCoder = 
fromProto(components.getCodersOrThrow(componentId), components);
+      coderComponents.add(innerCoder);
+    }
+    switch (coderUrn) {
+      case "urn:beam:coders:bytes:0.1":
+        return ByteArrayCoder.of();
+      case "urn:beam:coders:kv:0.1":
+        return KvCoder.of(coderComponents);
+      case "urn:beam:coders:varint:0.1":
+        return VarLongCoder.of();
+      case "urn:beam:coders:interval_window:0.1":
+        return IntervalWindowCoder.of();
+      case "urn:beam:coders:stream:0.1":
+        return IterableCoder.of(coderComponents);
+      case "urn:beam:coders:global_window:0.1":
+        return GlobalWindow.Coder.INSTANCE;
+      case "urn:beam:coders:windowed_value:0.1":
+        return WindowedValue.FullWindowedValueCoder.of(coderComponents);
+      default:
+        throw new IllegalStateException(
+            String.format(
+                "Unknown coder URN %s. Known URNs: %s", coderUrn, 
KNOWN_CODER_URNS.values()));
+    }
+  }
+
+  private static Coder<?> fromCustomCoder(
+      RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components 
components)
+      throws IOException {
+    CloudObject coderCloudObject =
+        OBJECT_MAPPER.readValue(
+            protoCoder
+                .getSpec()
+                .getSpec()
+                .getParameter()
+                .unpack(BytesValue.class)
+                .getValue()
+                .toByteArray(),
+            CloudObject.class);
+    return Serializer.deserialize(coderCloudObject, Coder.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index c4b8cf1..5cb0a00 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.core.construction;
 import com.google.common.base.Equivalence;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import java.io.IOException;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
@@ -119,7 +120,7 @@ class SdkComponents {
    * #equals(Object)} and {@link #hashCode()} but incompatible binary formats 
are not considered the
    * same coder.
    */
-  String registerCoder(Coder<?> coder) {
+  String registerCoder(Coder<?> coder) throws IOException {
     String existing = coderIds.get(Equivalence.identity().wrap(coder));
     if (existing != null) {
       return existing;
@@ -127,6 +128,8 @@ class SdkComponents {
     String baseName = NameUtils.approximateSimpleName(coder);
     String name = uniqify(baseName, coderIds.values());
     coderIds.put(Equivalence.identity().wrap(coder), name);
+    RunnerApi.Coder coderProto = Coders.toProto(coder, this);
+    componentsBuilder.putCoders(name, coderProto);
     return name;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
new file mode 100644
index 0000000..1a657b2
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link Coders}.
+ */
+@RunWith(Parameterized.class)
+public class CodersTest {
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<Coder<?>> data() {
+    return ImmutableList.<Coder<?>>of(
+        StringUtf8Coder.of(),
+        IterableCoder.of(VarLongCoder.of()),
+        KvCoder.of(StringUtf8Coder.of(), ListCoder.of(VarLongCoder.of())),
+        SerializableCoder.of(Record.class),
+        new RecordCoder(),
+        KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)));
+  }
+
+  @Parameter(0)
+  public Coder<?> coder;
+
+  @Test
+  public void toAndFromProto() throws Exception {
+    SdkComponents componentsBuilder = SdkComponents.create();
+    RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder);
+
+    Components encodedComponents = componentsBuilder.toComponents();
+    Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents);
+    assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
+  }
+
+  static class Record implements Serializable {
+  }
+
+  private static class RecordCoder extends CustomCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream, Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream, Context context) throws 
CoderException, IOException {
+      return new Record();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return other != null && getClass().equals(other.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return getClass().hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/01e5a8d5/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index c96e57c..28b4911 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import java.io.IOException;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -56,13 +58,32 @@ public class SdkComponentsTest {
   private SdkComponents components = SdkComponents.create();
 
   @Test
-  public void registerCoder() {
+  public void registerCoder() throws IOException {
     Coder<?> coder =
         KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
     String id = components.registerCoder(coder);
     assertThat(components.registerCoder(coder), equalTo(id));
     assertThat(id, not(isEmptyOrNullString()));
-    assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id)));
+    VarLongCoder otherCoder = VarLongCoder.of();
+    assertThat(components.registerCoder(otherCoder), not(equalTo(id)));
+
+    components.toComponents().getCodersOrThrow(id);
+    
components.toComponents().getCodersOrThrow(components.registerCoder(otherCoder));
+  }
+
+  @Test
+  public void registerCoderEqualsNotSame() throws IOException {
+    Coder<?> coder =
+        KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+    Coder<?> otherCoder =
+        KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+    assertThat(coder, Matchers.<Coder<?>>equalTo(otherCoder));
+    String id = components.registerCoder(coder);
+    String otherId = components.registerCoder(otherCoder);
+    assertThat(otherId, not(equalTo(id)));
+
+    components.toComponents().getCodersOrThrow(id);
+    components.toComponents().getCodersOrThrow(otherId);
   }
 
   @Test

Reply via email to