Java-Serialize Coders Directly Known Coders will be serialized via a known path. Unknown Coders will be serialized as a java object.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed0b63a4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed0b63a4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed0b63a4 Branch: refs/heads/master Commit: ed0b63a4c1c55d58b94ab7574c14d221486f434a Parents: 8cc4d59 Author: Thomas Groh <[email protected]> Authored: Fri Apr 28 18:06:13 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Apr 28 20:16:21 2017 -0700 ---------------------------------------------------------------------- runners/core-construction-java/pom.xml | 5 ---- .../beam/runners/core/construction/Coders.java | 30 +++++++++++--------- .../runners/core/construction/CodersTest.java | 3 +- 3 files changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index 854fdc1..b38e602 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -70,11 +70,6 @@ </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index 8793df4..094f21f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; @@ -44,18 +43,15 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; /** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ public class Coders { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - // This URN says that the coder is just a UDF blob this SDK understands // TODO: standardize such things - public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1"; + public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1"; // The URNs for coders which are shared across languages @VisibleForTesting @@ -71,6 +67,15 @@ public class Coders { .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") .build(); + public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.Coder coderProto = toProto(coder, components); + return RunnerApi.MessageWithComponents.newBuilder() + .setCoder(coderProto) + .setComponents(components.toComponents()) + .build(); + } + public static RunnerApi.Coder toProto( Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { if (KNOWN_CODER_URNS.containsKey(coder.getClass())) { @@ -108,13 +113,13 @@ public class Coders { SdkFunctionSpec.newBuilder() .setSpec( FunctionSpec.newBuilder() - .setUrn(CUSTOM_CODER_URN) + .setUrn(JAVA_SERIALIZED_CODER_URN) .setParameter( Any.pack( BytesValue.newBuilder() .setValue( ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject()))) + SerializableUtils.serializeToByteArray(coder))) .build())))) .build(); } @@ -122,7 +127,7 @@ public class Coders { public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components) throws IOException { String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn(); - if (coderSpecUrn.equals(CUSTOM_CODER_URN)) { + if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) { return fromCustomCoder(protoCoder, components); } return fromKnownCoder(protoCoder, components); @@ -165,8 +170,8 @@ public class Coders { private static Coder<?> fromCustomCoder( RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components) throws IOException { - CloudObject coderCloudObject = - OBJECT_MAPPER.readValue( + return (Coder<?>) + SerializableUtils.deserializeFromByteArray( protoCoder .getSpec() .getSpec() @@ -174,7 +179,6 @@ public class Coders { .unpack(BytesValue.class) .getValue() .toByteArray(), - CloudObject.class); - return Serializer.deserialize(coderCloudObject, Coder.class); + protoCoder.getSpec().getSpec().getUrn()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index ca0fdc9..ecd0fa5 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -133,7 +133,8 @@ public class CodersTest { if (KNOWN_CODERS.contains(coder)) { for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) { assertThat( - encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN))); + encodedCoder.getSpec().getSpec().getUrn(), + not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN))); } } }
