Rename Coders to CoderTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e37b703 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e37b703 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e37b703 Branch: refs/heads/master Commit: 7e37b70317bd06f300a5423cc4cb76a06c3955c3 Parents: b35e91d Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:31:49 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../core/construction/CoderTranslation.java | 193 +++++++++++++++++++ .../beam/runners/core/construction/Coders.java | 193 ------------------- .../construction/PCollectionTranslation.java | 3 +- .../core/construction/ParDoTranslation.java | 3 +- .../core/construction/SdkComponents.java | 2 +- .../core/construction/CoderTranslationTest.java | 165 ++++++++++++++++ .../runners/core/construction/CodersTest.java | 164 ---------------- 7 files changed, 363 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java new file mode 100644 index 0000000..470db6a --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; + +/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ +public class CoderTranslation { + // This URN says that the coder is just a UDF blob this SDK understands + // TODO: standardize such things + public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1"; + + // The URNs for coders which are shared across languages + @VisibleForTesting + static final BiMap<Class<? extends StructuredCoder>, String> KNOWN_CODER_URNS = + ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder() + .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") + .put(KvCoder.class, "urn:beam:coders:kv:0.1") + .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") + .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") + .put(IterableCoder.class, "urn:beam:coders:stream:0.1") + .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1") + .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") + .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") + .build(); + + @VisibleForTesting + static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> + KNOWN_TRANSLATORS = + ImmutableMap + .<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> + builder() + .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class)) + .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class)) + .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class)) + .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class)) + .put(KvCoder.class, CoderTranslators.kv()) + .put(IterableCoder.class, CoderTranslators.iterable()) + .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix()) + .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue()) + .build(); + + public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.Coder coderProto = toProto(coder, components); + return RunnerApi.MessageWithComponents.newBuilder() + .setCoder(coderProto) + .setComponents(components.toComponents()) + .build(); + } + + public static RunnerApi.Coder toProto( + Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { + if (KNOWN_CODER_URNS.containsKey(coder.getClass())) { + return toKnownCoder(coder, components); + } + return toCustomCoder(coder); + } + + private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components) + throws IOException { + checkArgument( + coder instanceof StructuredCoder, + "A Known %s must implement %s, but %s of class %s does not", + Coder.class.getSimpleName(), + StructuredCoder.class.getSimpleName(), + coder, + coder.getClass().getName()); + StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder; + CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass()); + List<String> componentIds = registerComponents(coder, translator, components); + return RunnerApi.Coder.newBuilder() + .addAllComponentCoderIds(componentIds) + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass())))) + .build(); + } + + private static <T extends Coder<?>> List<String> registerComponents( + T coder, CoderTranslator<T> translator, SdkComponents components) throws IOException { + List<String> componentIds = new ArrayList<>(); + for (Coder<?> component : translator.getComponents(coder)) { + componentIds.add(components.registerCoder(component)); + } + return componentIds; + } + + private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws IOException { + RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); + return coderBuilder + .setSpec( + SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(JAVA_SERIALIZED_CODER_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(coder))) + .build())))) + .build(); + } + + public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components) + throws IOException { + String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn(); + if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) { + return fromCustomCoder(protoCoder, components); + } + return fromKnownCoder(protoCoder, components); + } + + private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components components) + throws IOException { + String coderUrn = coder.getSpec().getSpec().getUrn(); + List<Coder<?>> coderComponents = new LinkedList<>(); + for (String componentId : coder.getComponentCoderIdsList()) { + Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components); + coderComponents.add(innerCoder); + } + Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn); + CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType); + checkArgument( + translator != null, + "Unknown Coder URN %s. Known URNs: %s", + coderUrn, + KNOWN_CODER_URNS.values()); + return translator.fromComponents(coderComponents); + } + + private static Coder<?> fromCustomCoder( + RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components) + throws IOException { + return (Coder<?>) + SerializableUtils.deserializeFromByteArray( + protoCoder + .getSpec() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .toByteArray(), + "Custom Coder Bytes"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java deleted file mode 100644 index 6c2caa9..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.BiMap; -import com.google.common.collect.ImmutableBiMap; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; - -/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ -public class Coders { - // This URN says that the coder is just a UDF blob this SDK understands - // TODO: standardize such things - public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1"; - - // The URNs for coders which are shared across languages - @VisibleForTesting - static final BiMap<Class<? extends StructuredCoder>, String> KNOWN_CODER_URNS = - ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder() - .put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1") - .put(KvCoder.class, "urn:beam:coders:kv:0.1") - .put(VarLongCoder.class, "urn:beam:coders:varint:0.1") - .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1") - .put(IterableCoder.class, "urn:beam:coders:stream:0.1") - .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1") - .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1") - .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") - .build(); - - @VisibleForTesting - static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> - KNOWN_TRANSLATORS = - ImmutableMap - .<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> - builder() - .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class)) - .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class)) - .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class)) - .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class)) - .put(KvCoder.class, CoderTranslators.kv()) - .put(IterableCoder.class, CoderTranslators.iterable()) - .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix()) - .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue()) - .build(); - - public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException { - SdkComponents components = SdkComponents.create(); - RunnerApi.Coder coderProto = toProto(coder, components); - return RunnerApi.MessageWithComponents.newBuilder() - .setCoder(coderProto) - .setComponents(components.toComponents()) - .build(); - } - - public static RunnerApi.Coder toProto( - Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException { - if (KNOWN_CODER_URNS.containsKey(coder.getClass())) { - return toKnownCoder(coder, components); - } - return toCustomCoder(coder); - } - - private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components) - throws IOException { - checkArgument( - coder instanceof StructuredCoder, - "A Known %s must implement %s, but %s of class %s does not", - Coder.class.getSimpleName(), - StructuredCoder.class.getSimpleName(), - coder, - coder.getClass().getName()); - StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder; - CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass()); - List<String> componentIds = registerComponents(coder, translator, components); - return RunnerApi.Coder.newBuilder() - .addAllComponentCoderIds(componentIds) - .setSpec( - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass())))) - .build(); - } - - private static <T extends Coder<?>> List<String> registerComponents( - T coder, CoderTranslator<T> translator, SdkComponents components) throws IOException { - List<String> componentIds = new ArrayList<>(); - for (Coder<?> component : translator.getComponents(coder)) { - componentIds.add(components.registerCoder(component)); - } - return componentIds; - } - - private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws IOException { - RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); - return coderBuilder - .setSpec( - SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(JAVA_SERIALIZED_CODER_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(coder))) - .build())))) - .build(); - } - - public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components) - throws IOException { - String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn(); - if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) { - return fromCustomCoder(protoCoder, components); - } - return fromKnownCoder(protoCoder, components); - } - - private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components components) - throws IOException { - String coderUrn = coder.getSpec().getSpec().getUrn(); - List<Coder<?>> coderComponents = new LinkedList<>(); - for (String componentId : coder.getComponentCoderIdsList()) { - Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components); - coderComponents.add(innerCoder); - } - Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn); - CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType); - checkArgument( - translator != null, - "Unknown Coder URN %s. Known URNs: %s", - coderUrn, - KNOWN_CODER_URNS.values()); - return translator.fromComponents(coderComponents); - } - - private static Coder<?> fromCustomCoder( - RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components) - throws IOException { - return (Coder<?>) - SerializableUtils.deserializeFromByteArray( - protoCoder - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .toByteArray(), - "Custom Coder Bytes"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java index cad7b97..46f714e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java @@ -53,7 +53,8 @@ public class PCollectionTranslation { public static Coder<?> getCoder( RunnerApi.PCollection pCollection, RunnerApi.Components components) throws IOException { - return Coders.fromProto(components.getCodersOrThrow(pCollection.getCoderId()), components); + return CoderTranslation + .fromProto(components.getCodersOrThrow(pCollection.getCoderId()), components); } public static WindowingStrategy<?, ?> getWindowingStrategy( http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index baed246..bc5bb0e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -269,7 +269,8 @@ public class ParDoTranslation { components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), components); Coder<?> elemCoder = - Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components); + CoderTranslation + .fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components); Coder<Iterable<WindowedValue<?>>> coder = (Coder) IterableCoder.of( http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index da22982..5c81875 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -222,7 +222,7 @@ class SdkComponents { String baseName = NameUtils.approximateSimpleName(coder); String name = uniqify(baseName, coderIds.values()); coderIds.put(Equivalence.identity().wrap(coder), name); - RunnerApi.Coder coderProto = Coders.toProto(coder, this); + RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this); componentsBuilder.putCoders(name, coderProto); return name; } http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java new file mode 100644 index 0000000..39549d0 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link CoderTranslation}. */ +@RunWith(Enclosed.class) +public class CoderTranslationTest { + private static final Set<StructuredCoder<?>> KNOWN_CODERS = + ImmutableSet.<StructuredCoder<?>>builder() + .add(ByteArrayCoder.of()) + .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) + .add(VarLongCoder.of()) + .add(IntervalWindowCoder.of()) + .add(IterableCoder.of(ByteArrayCoder.of())) + .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of()))) + .add(GlobalWindow.Coder.INSTANCE) + .add( + FullWindowedValueCoder.of( + IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of())) + .build(); + + /** + * Tests that all known coders are present in the parameters that will be used by + * {@link ToFromProtoTest}. + */ + @RunWith(JUnit4.class) + public static class ValidateKnownCodersPresentTest { + @Test + public void validateKnownCoders() { + // Validates that every known coder in the Coders class is represented in a "Known Coder" + // tests, which demonstrates that they are serialized via components and specified URNs rather + // than java serialized + Set<Class<? extends StructuredCoder>> knownCoderClasses = + CoderTranslation.KNOWN_CODER_URNS.keySet(); + Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>(); + for (StructuredCoder<?> coder : KNOWN_CODERS) { + knownCoderTests.add(coder.getClass()); + } + Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); + missingKnownCoders.removeAll(knownCoderTests); + assertThat( + String.format( + "Missing validation of known coder %s in %s", + missingKnownCoders, CoderTranslationTest.class.getSimpleName()), + missingKnownCoders, + Matchers.empty()); + } + + @Test + public void validateCoderTranslators() { + assertThat( + "Every Known Coder must have a Known Translator", + CoderTranslation.KNOWN_CODER_URNS.keySet(), + equalTo(CoderTranslation.KNOWN_TRANSLATORS.keySet())); + } + } + + + /** + * Tests round-trip coder encodings for both known and unknown {@link Coder coders}. + */ + @RunWith(Parameterized.class) + public static class ToFromProtoTest { + @Parameters(name = "{index}: {0}") + public static Iterable<Coder<?>> data() { + return ImmutableList.<Coder<?>>builder() + .addAll(KNOWN_CODERS) + .add( + StringUtf8Coder.of(), + SerializableCoder.of(Record.class), + new RecordCoder(), + KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))) + .build(); + } + + @Parameter(0) + public Coder<?> coder; + + @Test + public void toAndFromProto() throws Exception { + SdkComponents componentsBuilder = SdkComponents.create(); + RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, componentsBuilder); + + Components encodedComponents = componentsBuilder.toComponents(); + Coder<?> decodedCoder = CoderTranslation.fromProto(coderProto, encodedComponents); + assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder)); + + if (KNOWN_CODERS.contains(coder)) { + for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) { + assertThat( + encodedCoder.getSpec().getSpec().getUrn(), + not(equalTo(CoderTranslation.JAVA_SERIALIZED_CODER_URN))); + } + } + } + + static class Record implements Serializable {} + + private static class RecordCoder extends AtomicCoder<Record> { + @Override + public void encode(Record value, OutputStream outStream) + throws CoderException, IOException {} + + @Override + public Record decode(InputStream inStream) + throws CoderException, IOException { + return new Record(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7e37b703/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java deleted file mode 100644 index 42fba7c..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.LengthPrefixCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.StructuredCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Tests for {@link Coders}. */ -@RunWith(Enclosed.class) -public class CodersTest { - private static final Set<StructuredCoder<?>> KNOWN_CODERS = - ImmutableSet.<StructuredCoder<?>>builder() - .add(ByteArrayCoder.of()) - .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of())) - .add(VarLongCoder.of()) - .add(IntervalWindowCoder.of()) - .add(IterableCoder.of(ByteArrayCoder.of())) - .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of()))) - .add(GlobalWindow.Coder.INSTANCE) - .add( - FullWindowedValueCoder.of( - IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of())) - .build(); - - /** - * Tests that all known coders are present in the parameters that will be used by - * {@link ToFromProtoTest}. - */ - @RunWith(JUnit4.class) - public static class ValidateKnownCodersPresentTest { - @Test - public void validateKnownCoders() { - // Validates that every known coder in the Coders class is represented in a "Known Coder" - // tests, which demonstrates that they are serialized via components and specified URNs rather - // than java serialized - Set<Class<? extends StructuredCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet(); - Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>(); - for (StructuredCoder<?> coder : KNOWN_CODERS) { - knownCoderTests.add(coder.getClass()); - } - Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); - missingKnownCoders.removeAll(knownCoderTests); - assertThat( - String.format( - "Missing validation of known coder %s in %s", - missingKnownCoders, CodersTest.class.getSimpleName()), - missingKnownCoders, - Matchers.empty()); - } - - @Test - public void validateCoderTranslators() { - assertThat( - "Every Known Coder must have a Known Translator", - Coders.KNOWN_CODER_URNS.keySet(), - equalTo(Coders.KNOWN_TRANSLATORS.keySet())); - } - } - - - /** - * Tests round-trip coder encodings for both known and unknown {@link Coder coders}. - */ - @RunWith(Parameterized.class) - public static class ToFromProtoTest { - @Parameters(name = "{index}: {0}") - public static Iterable<Coder<?>> data() { - return ImmutableList.<Coder<?>>builder() - .addAll(KNOWN_CODERS) - .add( - StringUtf8Coder.of(), - SerializableCoder.of(Record.class), - new RecordCoder(), - KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))) - .build(); - } - - @Parameter(0) - public Coder<?> coder; - - @Test - public void toAndFromProto() throws Exception { - SdkComponents componentsBuilder = SdkComponents.create(); - RunnerApi.Coder coderProto = Coders.toProto(coder, componentsBuilder); - - Components encodedComponents = componentsBuilder.toComponents(); - Coder<?> decodedCoder = Coders.fromProto(coderProto, encodedComponents); - assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder)); - - if (KNOWN_CODERS.contains(coder)) { - for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) { - assertThat( - encodedCoder.getSpec().getSpec().getUrn(), - not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN))); - } - } - } - - static class Record implements Serializable {} - - private static class RecordCoder extends AtomicCoder<Record> { - @Override - public void encode(Record value, OutputStream outStream) - throws CoderException, IOException {} - - @Override - public Record decode(InputStream inStream) - throws CoderException, IOException { - return new Record(); - } - } - } -}
