Add A CoderTranslator Interface This will enable the removal of StandardCoder.getComponents
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1e48750 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1e48750 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1e48750 Branch: refs/heads/master Commit: d1e487506373fb7078650e42a307f117023316ee Parents: fe2a320 Author: Thomas Groh <[email protected]> Authored: Tue Apr 25 17:24:27 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon May 1 11:18:36 2017 -0700 ---------------------------------------------------------------------- .../core/construction/CoderTranslator.java | 44 ++++++++ .../core/construction/CoderTranslators.java | 107 +++++++++++++++++++ .../beam/runners/core/construction/Coders.java | 71 ++++++------ .../runners/core/construction/CodersTest.java | 18 +++- .../beam/sdk/coders/LengthPrefixCoder.java | 4 + 5 files changed, 208 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java new file mode 100644 index 0000000..26d8c1d --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; + +/** + * An interface that translates coders to components and back. + * + * <p>This interface is highly experimental, and incomplete. Coders must in the general case have + * the capability to encode an additional payload, which is not currently supported. This exists as + * a temporary measure. + */ +@Experimental(Kind.CORE_RUNNERS_ONLY) +public interface CoderTranslator<T extends Coder<?>> { + /** + * Extract all component {@link Coder coders} within a coder. + */ + List<? extends Coder<?>> getComponents(T from); + + /** + * Create a {@link Coder} from its component {@link Coder coders}. + */ + T fromComponents(List<Coder<?>> components); +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java new file mode 100644 index 0000000..989a8b6 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; + +/** {@link CoderTranslator} implementations for known coder types. */ +class CoderTranslators { + private CoderTranslators() {} + + static <T extends Coder<?>> CoderTranslator<T> atomic(final Class<T> clazz) { + return new CoderTranslator<T>() { + @Override + public List<? extends Coder<?>> getComponents(T from) { + return Collections.emptyList(); + } + + @Override + public T fromComponents(List<Coder<?>> components) { + return InstanceBuilder.ofType(clazz).build(); + } + }; + } + + static CoderTranslator<KvCoder<?, ?>> kv() { + return new CoderTranslator<KvCoder<?, ?>>() { + @Override + public List<? extends Coder<?>> getComponents(KvCoder<?, ?> from) { + return ImmutableList.of(from.getKeyCoder(), from.getValueCoder()); + } + + @Override + public KvCoder<?, ?> fromComponents(List<Coder<?>> components) { + return KvCoder.of(components.get(0), components.get(1)); + } + }; + } + + static CoderTranslator<IterableCoder<?>> iterable() { + return new CoderTranslator<IterableCoder<?>>() { + @Override + public List<? extends Coder<?>> getComponents(IterableCoder<?> from) { + return Collections.singletonList(from.getElemCoder()); + } + + @Override + public IterableCoder<?> fromComponents(List<Coder<?>> components) { + return IterableCoder.of(components.get(0)); + } + }; + } + + static CoderTranslator<LengthPrefixCoder<?>> lengthPrefix() { + return new CoderTranslator<LengthPrefixCoder<?>>() { + @Override + public List<? extends Coder<?>> getComponents(LengthPrefixCoder<?> from) { + return Collections.singletonList(from.getValueCoder()); + } + + @Override + public LengthPrefixCoder<?> fromComponents(List<Coder<?>> components) { + return LengthPrefixCoder.of(components.get(0)); + } + }; + } + + static CoderTranslator<FullWindowedValueCoder<?>> fullWindowedValue() { + return new CoderTranslator<FullWindowedValueCoder<?>>() { + @Override + public List<? extends Coder<?>> getComponents(FullWindowedValueCoder<?> from) { + return ImmutableList.of(from.getValueCoder(), from.getWindowCoder()); + } + + @Override + public FullWindowedValueCoder<?> fromComponents(List<Coder<?>> components) { + return WindowedValue.getFullCoder( + components.get(0), (Coder<BoundedWindow>) components.get(1)); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java index 094f21f..6c2caa9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -44,7 +46,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; /** Converts to and from Beam Runner API representations of {@link Coder Coders}. */ @@ -67,6 +68,22 @@ public class Coders { .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1") .build(); + @VisibleForTesting + static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> + KNOWN_TRANSLATORS = + ImmutableMap + .<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>> + builder() + .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class)) + .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class)) + .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class)) + .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class)) + .put(KvCoder.class, CoderTranslators.kv()) + .put(IterableCoder.class, CoderTranslators.iterable()) + .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix()) + .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue()) + .build(); + public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException { SdkComponents components = SdkComponents.create(); RunnerApi.Coder coderProto = toProto(coder, components); @@ -94,18 +111,26 @@ public class Coders { coder, coder.getClass().getName()); StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder; - List<String> componentIds = new ArrayList<>(); - for (Coder<?> componentCoder : stdCoder.getComponents()) { - componentIds.add(components.registerCoder(componentCoder)); - } + CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass()); + List<String> componentIds = registerComponents(coder, translator, components); return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) .setSpec( SdkFunctionSpec.newBuilder() - .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass())))) + .setSpec( + FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass())))) .build(); } + private static <T extends Coder<?>> List<String> registerComponents( + T coder, CoderTranslator<T> translator, SdkComponents components) throws IOException { + List<String> componentIds = new ArrayList<>(); + for (Coder<?> component : translator.getComponents(coder)) { + componentIds.add(components.registerCoder(component)); + } + return componentIds; + } + private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws IOException { RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder(); return coderBuilder @@ -141,30 +166,14 @@ public class Coders { Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components); coderComponents.add(innerCoder); } - switch (coderUrn) { - case "urn:beam:coders:bytes:0.1": - return ByteArrayCoder.of(); - case "urn:beam:coders:kv:0.1": - return KvCoder.of(coderComponents); - case "urn:beam:coders:varint:0.1": - return VarLongCoder.of(); - case "urn:beam:coders:interval_window:0.1": - return IntervalWindowCoder.of(); - case "urn:beam:coders:length_prefix:0.1": - checkArgument( - coderComponents.size() == 1, "Expecting 1 component, got %s", coderComponents.size()); - return LengthPrefixCoder.of(coderComponents.get(0)); - case "urn:beam:coders:stream:0.1": - return IterableCoder.of(coderComponents); - case "urn:beam:coders:global_window:0.1": - return GlobalWindow.Coder.INSTANCE; - case "urn:beam:coders:windowed_value:0.1": - return WindowedValue.FullWindowedValueCoder.of(coderComponents); - default: - throw new IllegalStateException( - String.format( - "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values())); - } + Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn); + CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType); + checkArgument( + translator != null, + "Unknown Coder URN %s. Known URNs: %s", + coderUrn, + KNOWN_CODER_URNS.values()); + return translator.fromComponents(coderComponents); } private static Coder<?> fromCustomCoder( @@ -179,6 +188,6 @@ public class Coders { .unpack(BytesValue.class) .getValue() .toByteArray(), - protoCoder.getSpec().getSpec().getUrn()); + "Custom Coder Bytes"); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java index ecd0fa5..32a78fa 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core.construction; -import static com.google.common.base.Preconditions.checkState; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -92,11 +91,20 @@ public class CodersTest { } Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses); missingKnownCoders.removeAll(knownCoderTests); - checkState( - missingKnownCoders.isEmpty(), - "Missing validation of known coder %s in %s", + assertThat( + String.format( + "Missing validation of known coder %s in %s", + missingKnownCoders, CodersTest.class.getSimpleName()), missingKnownCoders, - CodersTest.class.getSimpleName()); + Matchers.empty()); + } + + @Test + public void validateCoderTranslators() { + assertThat( + "Every Known Coder must have a Known Translator", + Coders.KNOWN_CODER_URNS.keySet(), + equalTo(Coders.KNOWN_TRANSLATORS.keySet())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java index 0972b1e..6a1f8ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java @@ -86,6 +86,10 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> { return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER); } + public Coder<?> getValueCoder() { + return valueCoder; + } + @Override public List<? extends Coder<?>> getCoderArguments() { return ImmutableList.of(valueCoder);
