Rename Coders to CoderTranslation

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e37b703
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e37b703
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e37b703

Branch: refs/heads/master
Commit: 7e37b70317bd06f300a5423cc4cb76a06c3955c3
Parents: b35e91d
Author: Kenneth Knowles <[email protected]>
Authored: Tue May 23 15:31:49 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslation.java     | 193 +++++++++++++++++++
 .../beam/runners/core/construction/Coders.java  | 193 -------------------
 .../construction/PCollectionTranslation.java    |   3 +-
 .../core/construction/ParDoTranslation.java     |   3 +-
 .../core/construction/SdkComponents.java        |   2 +-
 .../core/construction/CoderTranslationTest.java | 165 ++++++++++++++++
 .../runners/core/construction/CodersTest.java   | 164 ----------------
 7 files changed, 363 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
new file mode 100644
index 0000000..470db6a
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** Converts to and from Beam Runner API representations of {@link Coder 
Coders}. */
+public class CoderTranslation {
+  // This URN says that the coder is just a UDF blob this SDK understands
+  // TODO: standardize such things
+  public static final String JAVA_SERIALIZED_CODER_URN = 
"urn:beam:coders:javasdk:0.1";
+
+  // The URNs for coders which are shared across languages
+  @VisibleForTesting
+  static final BiMap<Class<? extends StructuredCoder>, String> 
KNOWN_CODER_URNS =
+      ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder()
+          .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
+          .put(KvCoder.class, "urn:beam:coders:kv:0.1")
+          .put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
+          .put(IntervalWindowCoder.class, 
"urn:beam:coders:interval_window:0.1")
+          .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
+          .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1")
+          .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
+          .put(FullWindowedValueCoder.class, 
"urn:beam:coders:windowed_value:0.1")
+          .build();
+
+  @VisibleForTesting
+  static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends 
StructuredCoder>>
+      KNOWN_TRANSLATORS =
+          ImmutableMap
+              .<Class<? extends StructuredCoder>, CoderTranslator<? extends 
StructuredCoder>>
+                  builder()
+              .put(ByteArrayCoder.class, 
CoderTranslators.atomic(ByteArrayCoder.class))
+              .put(VarLongCoder.class, 
CoderTranslators.atomic(VarLongCoder.class))
+              .put(IntervalWindowCoder.class, 
CoderTranslators.atomic(IntervalWindowCoder.class))
+              .put(GlobalWindow.Coder.class, 
CoderTranslators.atomic(GlobalWindow.Coder.class))
+              .put(KvCoder.class, CoderTranslators.kv())
+              .put(IterableCoder.class, CoderTranslators.iterable())
+              .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
+              .put(FullWindowedValueCoder.class, 
CoderTranslators.fullWindowedValue())
+              .build();
+
+  public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws 
IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.Coder coderProto = toProto(coder, components);
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setCoder(coderProto)
+        .setComponents(components.toComponents())
+        .build();
+  }
+
+  public static RunnerApi.Coder toProto(
+      Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) 
throws IOException {
+    if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+      return toKnownCoder(coder, components);
+    }
+    return toCustomCoder(coder);
+  }
+
+  private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents 
components)
+      throws IOException {
+    checkArgument(
+        coder instanceof StructuredCoder,
+        "A Known %s must implement %s, but %s of class %s does not",
+        Coder.class.getSimpleName(),
+        StructuredCoder.class.getSimpleName(),
+        coder,
+        coder.getClass().getName());
+    StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder;
+    CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass());
+    List<String> componentIds = registerComponents(coder, translator, 
components);
+    return RunnerApi.Coder.newBuilder()
+        .addAllComponentCoderIds(componentIds)
+        .setSpec(
+            SdkFunctionSpec.newBuilder()
+                .setSpec(
+                    
FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass()))))
+        .build();
+  }
+
+  private static <T extends Coder<?>> List<String> registerComponents(
+      T coder, CoderTranslator<T> translator, SdkComponents components) throws 
IOException {
+    List<String> componentIds = new ArrayList<>();
+    for (Coder<?> component : translator.getComponents(coder)) {
+      componentIds.add(components.registerCoder(component));
+    }
+    return componentIds;
+  }
+
+  private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws 
IOException {
+    RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
+    return coderBuilder
+        .setSpec(
+            SdkFunctionSpec.newBuilder()
+                .setSpec(
+                    FunctionSpec.newBuilder()
+                        .setUrn(JAVA_SERIALIZED_CODER_URN)
+                        .setParameter(
+                            Any.pack(
+                                BytesValue.newBuilder()
+                                    .setValue(
+                                        ByteString.copyFrom(
+                                            
SerializableUtils.serializeToByteArray(coder)))
+                                    .build()))))
+        .build();
+  }
+
+  public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components 
components)
+      throws IOException {
+    String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
+    if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
+      return fromCustomCoder(protoCoder, components);
+    }
+    return fromKnownCoder(protoCoder, components);
+  }
+
+  private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components 
components)
+      throws IOException {
+    String coderUrn = coder.getSpec().getSpec().getUrn();
+    List<Coder<?>> coderComponents = new LinkedList<>();
+    for (String componentId : coder.getComponentCoderIdsList()) {
+      Coder<?> innerCoder = 
fromProto(components.getCodersOrThrow(componentId), components);
+      coderComponents.add(innerCoder);
+    }
+    Class<? extends StructuredCoder> coderType = 
KNOWN_CODER_URNS.inverse().get(coderUrn);
+    CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType);
+    checkArgument(
+        translator != null,
+        "Unknown Coder URN %s. Known URNs: %s",
+        coderUrn,
+        KNOWN_CODER_URNS.values());
+    return translator.fromComponents(coderComponents);
+  }
+
+  private static Coder<?> fromCustomCoder(
+      RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components 
components)
+      throws IOException {
+    return (Coder<?>)
+        SerializableUtils.deserializeFromByteArray(
+            protoCoder
+                .getSpec()
+                .getSpec()
+                .getParameter()
+                .unpack(BytesValue.class)
+                .getValue()
+                .toByteArray(),
+            "Custom Coder Bytes");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
deleted file mode 100644
index 6c2caa9..0000000
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-
-/** Converts to and from Beam Runner API representations of {@link Coder 
Coders}. */
-public class Coders {
-  // This URN says that the coder is just a UDF blob this SDK understands
-  // TODO: standardize such things
-  public static final String JAVA_SERIALIZED_CODER_URN = 
"urn:beam:coders:javasdk:0.1";
-
-  // The URNs for coders which are shared across languages
-  @VisibleForTesting
-  static final BiMap<Class<? extends StructuredCoder>, String> 
KNOWN_CODER_URNS =
-      ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder()
-          .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
-          .put(KvCoder.class, "urn:beam:coders:kv:0.1")
-          .put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
-          .put(IntervalWindowCoder.class, 
"urn:beam:coders:interval_window:0.1")
-          .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
-          .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1")
-          .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
-          .put(FullWindowedValueCoder.class, 
"urn:beam:coders:windowed_value:0.1")
-          .build();
-
-  @VisibleForTesting
-  static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends 
StructuredCoder>>
-      KNOWN_TRANSLATORS =
-          ImmutableMap
-              .<Class<? extends StructuredCoder>, CoderTranslator<? extends 
StructuredCoder>>
-                  builder()
-              .put(ByteArrayCoder.class, 
CoderTranslators.atomic(ByteArrayCoder.class))
-              .put(VarLongCoder.class, 
CoderTranslators.atomic(VarLongCoder.class))
-              .put(IntervalWindowCoder.class, 
CoderTranslators.atomic(IntervalWindowCoder.class))
-              .put(GlobalWindow.Coder.class, 
CoderTranslators.atomic(GlobalWindow.Coder.class))
-              .put(KvCoder.class, CoderTranslators.kv())
-              .put(IterableCoder.class, CoderTranslators.iterable())
-              .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
-              .put(FullWindowedValueCoder.class, 
CoderTranslators.fullWindowedValue())
-              .build();
-
-  public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws 
IOException {
-    SdkComponents components = SdkComponents.create();
-    RunnerApi.Coder coderProto = toProto(coder, components);
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setCoder(coderProto)
-        .setComponents(components.toComponents())
-        .build();
-  }
-
-  public static RunnerApi.Coder toProto(
-      Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) 
throws IOException {
-    if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
-      return toKnownCoder(coder, components);
-    }
-    return toCustomCoder(coder);
-  }
-
-  private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents 
components)
-      throws IOException {
-    checkArgument(
-        coder instanceof StructuredCoder,
-        "A Known %s must implement %s, but %s of class %s does not",
-        Coder.class.getSimpleName(),
-        StructuredCoder.class.getSimpleName(),
-        coder,
-        coder.getClass().getName());
-    StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder;
-    CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass());
-    List<String> componentIds = registerComponents(coder, translator, 
components);
-    return RunnerApi.Coder.newBuilder()
-        .addAllComponentCoderIds(componentIds)
-        .setSpec(
-            SdkFunctionSpec.newBuilder()
-                .setSpec(
-                    
FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass()))))
-        .build();
-  }
-
-  private static <T extends Coder<?>> List<String> registerComponents(
-      T coder, CoderTranslator<T> translator, SdkComponents components) throws 
IOException {
-    List<String> componentIds = new ArrayList<>();
-    for (Coder<?> component : translator.getComponents(coder)) {
-      componentIds.add(components.registerCoder(component));
-    }
-    return componentIds;
-  }
-
-  private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws 
IOException {
-    RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
-    return coderBuilder
-        .setSpec(
-            SdkFunctionSpec.newBuilder()
-                .setSpec(
-                    FunctionSpec.newBuilder()
-                        .setUrn(JAVA_SERIALIZED_CODER_URN)
-                        .setParameter(
-                            Any.pack(
-                                BytesValue.newBuilder()
-                                    .setValue(
-                                        ByteString.copyFrom(
-                                            
SerializableUtils.serializeToByteArray(coder)))
-                                    .build()))))
-        .build();
-  }
-
-  public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components 
components)
-      throws IOException {
-    String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
-    if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
-      return fromCustomCoder(protoCoder, components);
-    }
-    return fromKnownCoder(protoCoder, components);
-  }
-
-  private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components 
components)
-      throws IOException {
-    String coderUrn = coder.getSpec().getSpec().getUrn();
-    List<Coder<?>> coderComponents = new LinkedList<>();
-    for (String componentId : coder.getComponentCoderIdsList()) {
-      Coder<?> innerCoder = 
fromProto(components.getCodersOrThrow(componentId), components);
-      coderComponents.add(innerCoder);
-    }
-    Class<? extends StructuredCoder> coderType = 
KNOWN_CODER_URNS.inverse().get(coderUrn);
-    CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType);
-    checkArgument(
-        translator != null,
-        "Unknown Coder URN %s. Known URNs: %s",
-        coderUrn,
-        KNOWN_CODER_URNS.values());
-    return translator.fromComponents(coderComponents);
-  }
-
-  private static Coder<?> fromCustomCoder(
-      RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components 
components)
-      throws IOException {
-    return (Coder<?>)
-        SerializableUtils.deserializeFromByteArray(
-            protoCoder
-                .getSpec()
-                .getSpec()
-                .getParameter()
-                .unpack(BytesValue.class)
-                .getValue()
-                .toByteArray(),
-            "Custom Coder Bytes");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index cad7b97..46f714e 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -53,7 +53,8 @@ public class PCollectionTranslation {
 
   public static Coder<?> getCoder(
       RunnerApi.PCollection pCollection, RunnerApi.Components components) 
throws IOException {
-    return 
Coders.fromProto(components.getCodersOrThrow(pCollection.getCoderId()), 
components);
+    return CoderTranslation
+        .fromProto(components.getCodersOrThrow(pCollection.getCoderId()), 
components);
   }
 
   public static WindowingStrategy<?, ?> getWindowingStrategy(

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index baed246..bc5bb0e 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -269,7 +269,8 @@ public class ParDoTranslation {
             
components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
             components);
     Coder<?> elemCoder =
-        
Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), 
components);
+        CoderTranslation
+            
.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), 
components);
     Coder<Iterable<WindowedValue<?>>> coder =
         (Coder)
             IterableCoder.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index da22982..5c81875 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -222,7 +222,7 @@ class SdkComponents {
     String baseName = NameUtils.approximateSimpleName(coder);
     String name = uniqify(baseName, coderIds.values());
     coderIds.put(Equivalence.identity().wrap(coder), name);
-    RunnerApi.Coder coderProto = Coders.toProto(coder, this);
+    RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
     componentsBuilder.putCoders(name, coderProto);
     return name;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
new file mode 100644
index 0000000..39549d0
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link CoderTranslation}. */
+@RunWith(Enclosed.class)
+public class CoderTranslationTest {
+  private static final Set<StructuredCoder<?>> KNOWN_CODERS =
+      ImmutableSet.<StructuredCoder<?>>builder()
+          .add(ByteArrayCoder.of())
+          .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
+          .add(VarLongCoder.of())
+          .add(IntervalWindowCoder.of())
+          .add(IterableCoder.of(ByteArrayCoder.of()))
+          .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of())))
+          .add(GlobalWindow.Coder.INSTANCE)
+          .add(
+              FullWindowedValueCoder.of(
+                  IterableCoder.of(VarLongCoder.of()), 
IntervalWindowCoder.of()))
+          .build();
+
+  /**
+   * Tests that all known coders are present in the parameters that will be 
used by
+   * {@link ToFromProtoTest}.
+   */
+  @RunWith(JUnit4.class)
+  public static class ValidateKnownCodersPresentTest {
+    @Test
+    public void validateKnownCoders() {
+      // Validates that every known coder in the Coders class is represented 
in a "Known Coder"
+      // tests, which demonstrates that they are serialized via components and 
specified URNs rather
+      // than java serialized
+      Set<Class<? extends StructuredCoder>> knownCoderClasses =
+          CoderTranslation.KNOWN_CODER_URNS.keySet();
+      Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>();
+      for (StructuredCoder<?> coder : KNOWN_CODERS) {
+        knownCoderTests.add(coder.getClass());
+      }
+      Set<Class<? extends StructuredCoder>> missingKnownCoders = new 
HashSet<>(knownCoderClasses);
+      missingKnownCoders.removeAll(knownCoderTests);
+      assertThat(
+          String.format(
+              "Missing validation of known coder %s in %s",
+              missingKnownCoders, CoderTranslationTest.class.getSimpleName()),
+          missingKnownCoders,
+          Matchers.empty());
+    }
+
+    @Test
+    public void validateCoderTranslators() {
+      assertThat(
+          "Every Known Coder must have a Known Translator",
+          CoderTranslation.KNOWN_CODER_URNS.keySet(),
+          equalTo(CoderTranslation.KNOWN_TRANSLATORS.keySet()));
+    }
+  }
+
+
+  /**
+   * Tests round-trip coder encodings for both known and unknown {@link Coder 
coders}.
+   */
+  @RunWith(Parameterized.class)
+  public static class ToFromProtoTest {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<Coder<?>> data() {
+      return ImmutableList.<Coder<?>>builder()
+          .addAll(KNOWN_CODERS)
+          .add(
+              StringUtf8Coder.of(),
+              SerializableCoder.of(Record.class),
+              new RecordCoder(),
+              KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
+          .build();
+    }
+
+    @Parameter(0)
+    public Coder<?> coder;
+
+    @Test
+    public void toAndFromProto() throws Exception {
+      SdkComponents componentsBuilder = SdkComponents.create();
+      RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, 
componentsBuilder);
+
+      Components encodedComponents = componentsBuilder.toComponents();
+      Coder<?> decodedCoder = CoderTranslation.fromProto(coderProto, 
encodedComponents);
+      assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
+
+      if (KNOWN_CODERS.contains(coder)) {
+        for (RunnerApi.Coder encodedCoder : 
encodedComponents.getCodersMap().values()) {
+          assertThat(
+              encodedCoder.getSpec().getSpec().getUrn(),
+              not(equalTo(CoderTranslation.JAVA_SERIALIZED_CODER_URN)));
+        }
+      }
+    }
+
+    static class Record implements Serializable {}
+
+    private static class RecordCoder extends AtomicCoder<Record> {
+      @Override
+      public void encode(Record value, OutputStream outStream)
+          throws CoderException, IOException {}
+
+      @Override
+      public Record decode(InputStream inStream)
+          throws CoderException, IOException {
+        return new Record();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
deleted file mode 100644
index 42fba7c..0000000
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for {@link Coders}. */
-@RunWith(Enclosed.class)
-public class CodersTest {
-  private static final Set<StructuredCoder<?>> KNOWN_CODERS =
-      ImmutableSet.<StructuredCoder<?>>builder()
-          .add(ByteArrayCoder.of())
-          .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
-          .add(VarLongCoder.of())
-          .add(IntervalWindowCoder.of())
-          .add(IterableCoder.of(ByteArrayCoder.of()))
-          .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of())))
-          .add(GlobalWindow.Coder.INSTANCE)
-          .add(
-              FullWindowedValueCoder.of(
-                  IterableCoder.of(VarLongCoder.of()), 
IntervalWindowCoder.of()))
-          .build();
-
-  /**
-   * Tests that all known coders are present in the parameters that will be 
used by
-   * {@link ToFromProtoTest}.
-   */
-  @RunWith(JUnit4.class)
-  public static class ValidateKnownCodersPresentTest {
-    @Test
-    public void validateKnownCoders() {
-      // Validates that every known coder in the Coders class is represented 
in a "Known Coder"
-      // tests, which demonstrates that they are serialized via components and 
specified URNs rather
-      // than java serialized
-      Set<Class<? extends StructuredCoder>> knownCoderClasses = 
Coders.KNOWN_CODER_URNS.keySet();
-      Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>();
-      for (StructuredCoder<?> coder : KNOWN_CODERS) {
-        knownCoderTests.add(coder.getClass());
-      }
-      Set<Class<? extends StructuredCoder>> missingKnownCoders = new 
HashSet<>(knownCoderClasses);
-      missingKnownCoders.removeAll(knownCoderTests);
-      assertThat(
-          String.format(
-              "Missing validation of known coder %s in %s",
-              missingKnownCoders, CodersTest.class.getSimpleName()),
-          missingKnownCoders,
-          Matchers.empty());
-    }
-
-    @Test
-    public void validateCoderTranslators() {
-      assertThat(
-          "Every Known Coder must have a Known Translator",
-          Coders.KNOWN_CODER_URNS.keySet(),
-          equalTo(Coders.KNOWN_TRANSLATORS.keySet()));
-    }
-  }
-
-
-  /**
-   * Tests round-trip coder encodings for both known and unknown {@link Coder 
coders}.
-   */
-  @RunWith(Parameterized.class)
-  public static class ToFromProtoTest {
-    @Parameters(name = "{index}: {0}")
-    public static Iterable<Coder<?>> data() {
-      return ImmutableList.<Coder<?>>builder()
-          .addAll(KNOWN_CODERS)
-          .add(
-              StringUtf8Coder.of(),
-              SerializableCoder.of(Record.class),
-              new RecordCoder(),
-              KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
-          .build();
-    }
-
-    @Parameter(0)
-    public Coder<?> coder;
-
-    @Test
-    public void toAndFromProto() throws Exception {
-      SdkComponents componentsBuilder = SdkComponents.create();
-      RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder);
-
-      Components encodedComponents = componentsBuilder.toComponents();
-      Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents);
-      assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
-
-      if (KNOWN_CODERS.contains(coder)) {
-        for (RunnerApi.Coder encodedCoder : 
encodedComponents.getCodersMap().values()) {
-          assertThat(
-              encodedCoder.getSpec().getSpec().getUrn(),
-              not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN)));
-        }
-      }
-    }
-
-    static class Record implements Serializable {}
-
-    private static class RecordCoder extends AtomicCoder<Record> {
-      @Override
-      public void encode(Record value, OutputStream outStream)
-          throws CoderException, IOException {}
-
-      @Override
-      public Record decode(InputStream inStream)
-          throws CoderException, IOException {
-        return new Record();
-      }
-    }
-  }
-}

Reply via email to