[BEAM-1871] Move ProtoCoder to new sdks/java/extensions/protobuf package.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff1fe7fa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff1fe7fa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff1fe7fa Branch: refs/heads/master Commit: ff1fe7fa53816fe4327c7572c13a616fc4243dc9 Parents: d7e7af8 Author: Luke Cwik <[email protected]> Authored: Mon Apr 24 14:32:21 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon Apr 24 15:04:13 2017 -0700 ---------------------------------------------------------------------- sdks/java/core/pom.xml | 6 - .../apache/beam/sdk/coders/CoderRegistry.java | 10 +- .../beam/sdk/coders/StringDelegateCoder.java | 4 +- .../beam/sdk/coders/protobuf/ProtoCoder.java | 405 ------------------- .../beam/sdk/coders/protobuf/ProtobufUtil.java | 171 -------- .../beam/sdk/coders/protobuf/package-info.java | 24 -- sdks/java/core/src/main/proto/README.md | 45 --- .../main/proto/proto2_coder_test_messages.proto | 53 --- .../beam/sdk/coders/CoderRegistryTest.java | 14 - .../sdk/coders/protobuf/ProtoCoderTest.java | 182 --------- .../sdk/coders/protobuf/ProtobufUtilTest.java | 192 --------- sdks/java/extensions/pom.xml | 1 + sdks/java/extensions/protobuf/pom.xml | 142 +++++++ .../sdk/extensions/protobuf/ProtoCoder.java | 405 +++++++++++++++++++ .../sdk/extensions/protobuf/ProtobufUtil.java | 171 ++++++++ .../sdk/extensions/protobuf/package-info.java | 24 ++ .../sdk/extensions/protobuf/ProtoCoderTest.java | 181 +++++++++ .../extensions/protobuf/ProtobufUtilTest.java | 191 +++++++++ .../test/proto/proto2_coder_test_messages.proto | 53 +++ 19 files changed, 1173 insertions(+), 1101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index ac7a3bb..6c46453 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -294,12 +294,6 @@ </dependency> <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> - <scope>test</scope> - </dependency> - - <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> <version>2.21</version> http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index 4238293..e0b2b3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -44,7 +44,6 @@ import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; -import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CoderUtils; @@ -77,9 +76,7 @@ import org.slf4j.LoggerFactory; * the default {@code Coder} type. The {@link Coder} class must satisfy the requirements * of {@link CoderProviders#fromStaticMethods}. * <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder} - * for any type. By default, there are two chained fallback coders: - * {@link ProtoCoder#coderProvider}, which can provide a coder to efficiently serialize any - * Protocol Buffers message, and then {@link SerializableCoder#PROVIDER}, which can provide a + * for any type. By default, there is {@link SerializableCoder#PROVIDER}, which can provide a * {@link Coder} for any type that is serializable via Java serialization. The fallback * {@link CoderProvider} can be get and set respectively using * {@link #getFallbackCoderProvider()} and {@link #setFallbackCoderProvider}. Multiple @@ -165,7 +162,7 @@ public class CoderRegistry implements CoderProvider { private CoderRegistry() { coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS); setFallbackCoderProvider( - CoderProviders.firstOf(ProtoCoder.coderProvider(), SerializableCoder.PROVIDER)); + CoderProviders.firstOf(SerializableCoder.PROVIDER)); } /** @@ -423,8 +420,7 @@ public class CoderRegistry implements CoderProvider { * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create * a {@link Coder} using this {@link CoderProvider}. * - * <p>By default, this is set to the chain of {@link ProtoCoder#coderProvider()} and - * {@link SerializableCoder#PROVIDER}. + * <p>By default, this is set to {@link SerializableCoder#PROVIDER}. * * <p>See {@link #getFallbackCoderProvider}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index f86369c..51ead3c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.util.Collection; import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction; -import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -43,7 +42,8 @@ import org.apache.beam.sdk.values.TypeDescriptor; * * <p>This method of encoding is not designed for ease of evolution of {@code Clazz}; * it should only be used in cases where the class is stable or the encoding is not - * important. If evolution of the class is important, see {@link ProtoCoder} or {@link AvroCoder}. + * important. If evolution of the class is important, see {@link AvroCoder} or any other + * evolution safe encoding. * * @param <T> The type of objects coded. */ http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java deleted file mode 100644 index a5f53ff..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders.protobuf; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CoderProvider; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.beam.sdk.util.Structs; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both - * Protocol Buffers syntax versions 2 and 3. - * - * <p>To learn more about Protocol Buffers, visit: - * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a> - * - * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default - * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but - * these extensions must be registered for a particular {@link ProtoCoder} instance and that - * instance must be registered on the {@link PCollection} that needs the extensions: - * - * <pre>{@code - * import MyProtoFile; - * import MyProtoFile.MyMessage; - * - * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class); - * PCollection<MyMessage> records = input.apply(...).setCoder(coder); - * }</pre> - * - * <h3>Versioning</h3> - * - * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However, - * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the - * version of <code>protoc</code> that was used to produce the JAR files containing the compiled - * <code>.proto</code> messages. - * - * <p>For more information, see the - * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>. - * - * <h3>{@link ProtoCoder} and Determinism</h3> - * - * <p>In general, Protocol Buffers messages can be encoded deterministically within a single - * pipeline as long as: - * - * <ul> - * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code> - * fields.</li> - * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the - * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li> - * </ul> - * - * <h3>{@link ProtoCoder} and Encoding Stability</h3> - * - * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language - * guides for - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a> - * and - * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a> - * syntaxes, depending on your message type. Following these guidelines will ensure that the - * old encoded data can be read by new versions of the code. - * - * <p>Generally, any change to the message type, registered extensions, runtime library, or - * compiled proto JARs may change the encoding. Thus even if both the original and updated messages - * can be encoded deterministically within a single job, these deterministic encodings may not be - * the same across jobs. - * - * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}. - */ -public class ProtoCoder<T extends Message> extends AtomicCoder<T> { - - /** - * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty - * {@link ExtensionRegistry}. - */ - public static CoderProvider coderProvider() { - return PROVIDER; - } - - /** - * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}. - */ - public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) { - return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of()); - } - - /** - * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given - * {@link TypeDescriptor}. - */ - public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType(); - return of(protoMessageClass); - } - - /** - * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes - * registered. - * - * <p>Each of the extension host classes must be an class automatically generated by the - * Protocol Buffers compiler, {@code protoc}, that contains messages. - * - * <p>Does not modify this object. - */ - public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) { - for (Class<?> extensionHost : moreExtensionHosts) { - // Attempt to access the required method, to make sure it's present. - try { - Method registerAllExtensions = - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - checkArgument( - Modifier.isStatic(registerAllExtensions.getModifiers()), - "Method registerAllExtensions() must be static"); - } catch (NoSuchMethodException | SecurityException e) { - throw new IllegalArgumentException( - String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()), - e); - } - } - - return new ProtoCoder<>( - protoMessageClass, - new ImmutableSet.Builder<Class<?>>() - .addAll(extensionHostClasses) - .addAll(moreExtensionHosts) - .build()); - } - - /** - * See {@link #withExtensionsFrom(Iterable)}. - * - * <p>Does not modify this object. - */ - public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) { - return withExtensionsFrom(Arrays.asList(moreExtensionHosts)); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); - } - if (context.isWholeStream) { - value.writeTo(outStream); - } else { - value.writeDelimitedTo(outStream); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - if (context.isWholeStream) { - return getParser().parseFrom(inStream, getExtensionRegistry()); - } else { - return getParser().parseDelimitedFrom(inStream, getExtensionRegistry()); - } - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof ProtoCoder)) { - return false; - } - ProtoCoder<?> otherCoder = (ProtoCoder<?>) other; - return protoMessageClass.equals(otherCoder.protoMessageClass) - && Sets.newHashSet(extensionHostClasses) - .equals(Sets.newHashSet(otherCoder.extensionHostClasses)); - } - - @Override - public int hashCode() { - return Objects.hash(protoMessageClass, extensionHostClasses); - } - - /** - * The encoding identifier is designed to support evolution as per the design of Protocol - * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol - * Buffers documentation at - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating - * A Message Type</a>. - * - * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder} - * instances of the same principal message class, with the same registered extension host classes, - * and otherwise distinct. Note that the encoding ID does not encode any version of the message - * or extensions, nor does it include the message schema. - * - * <p>When modifying a message class, here are the broadest guidelines; see the above link - * for greater detail. - * - * <ul> - * <li>Do not change the numeric tags for any fields. - * <li>Never remove a <code>required</code> field. - * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults. - * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure - * the new and old types are interchangeable. - * </ul> - * - * <p>Code consuming this message class should be prepared to support <i>all</i> versions of - * the class until it is certain that no remaining serialized instances exist. - * - * <p>If backwards incompatible changes must be made, the best recourse is to change the name - * of your Protocol Buffers message class. - */ - @Override - public String getEncodingId() { - return protoMessageClass.getName() + getSortedExtensionClasses().toString(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - ProtobufUtil.verifyDeterministic(this); - } - - /** - * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports. - */ - public Class<T> getMessageType() { - return protoMessageClass; - } - - /** - * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages - * to {@code T} registered with this {@link ProtoCoder}. - */ - public ExtensionRegistry getExtensionRegistry() { - if (memoizedExtensionRegistry == null) { - ExtensionRegistry registry = ExtensionRegistry.newInstance(); - for (Class<?> extensionHost : extensionHostClasses) { - try { - extensionHost - .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class) - .invoke(null, registry); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalStateException(e); - } - } - memoizedExtensionRegistry = registry.getUnmodifiable(); - } - return memoizedExtensionRegistry; - } - - //////////////////////////////////////////////////////////////////////////////////// - // Private implementation details below. - - /** The {@link Message} type to be coded. */ - private final Class<T> protoMessageClass; - - /** - * All extension host classes included in this {@link ProtoCoder}. The extensions from these - * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding. - */ - private final Set<Class<?>> extensionHostClasses; - - // Constants used to serialize and deserialize - private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; - private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts"; - - // Transient fields that are lazy initialized and then memoized. - private transient ExtensionRegistry memoizedExtensionRegistry; - private transient Parser<T> memoizedParser; - - /** Private constructor. */ - private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) { - this.protoMessageClass = protoMessageClass; - this.extensionHostClasses = extensionHostClasses; - } - - /** - * @deprecated For JSON deserialization only. - */ - @JsonCreator - @Deprecated - public static <T extends Message> ProtoCoder<T> of( - @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, - @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { - - try { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); - List<Class<?>> extensionHostClasses = Lists.newArrayList(); - if (extensionHostClassNames != null) { - for (String extensionHostClassName : extensionHostClassNames) { - extensionHostClasses.add(Class.forName(extensionHostClassName)); - } - } - return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); - List<CloudObject> extensionHostClassNames = Lists.newArrayList(); - for (String className : getSortedExtensionClasses()) { - extensionHostClassNames.add(CloudObject.forString(className)); - } - Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames); - return result; - } - - /** Get the memoized {@link Parser}, possibly initializing it lazily. */ - private Parser<T> getParser() { - if (memoizedParser == null) { - try { - @SuppressWarnings("unchecked") - T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); - memoizedParser = tParser; - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - return memoizedParser; - } - - static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {}; - - /** - * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by - * {@link #coderProvider()}. - */ - private static final CoderProvider PROVIDER = - new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - if (!type.isSubtypeOf(CHECK)) { - throw new CannotProvideCoderException( - String.format( - "Cannot provide %s because %s is not a subclass of %s", - ProtoCoder.class.getSimpleName(), - type, - Message.class.getName())); - } - - @SuppressWarnings("unchecked") - TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type; - try { - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType); - return coder; - } catch (IllegalArgumentException e) { - throw new CannotProvideCoderException(e); - } - } - }; - - private SortedSet<String> getSortedExtensionClasses() { - SortedSet<String> ret = new TreeSet<>(); - for (Class<?> clazz : extensionHostClasses) { - ret.add(clazz.getName()); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java deleted file mode 100644 index 77afb47..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders.protobuf; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Descriptors.FileDescriptor.Syntax; -import com.google.protobuf.Descriptors.GenericDescriptor; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.ExtensionRegistry.ExtensionInfo; -import com.google.protobuf.Message; -import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.coders.Coder.NonDeterministicException; - -/** - * Utility functions for reflecting and analyzing Protocol Buffers classes. - * - * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation. - */ -class ProtobufUtil { - /** - * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}. - * - * @throws IllegalArgumentException if there is an error in Java reflection. - */ - static Descriptor getDescriptorForClass(Class<? extends Message> clazz) { - try { - return (Descriptor) clazz.getMethod("getDescriptor").invoke(null); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as - * every class it can include transitively. - * - * @throws IllegalArgumentException if there is an error in Java reflection. - */ - static Set<Descriptor> getRecursiveDescriptorsForClass( - Class<? extends Message> clazz, ExtensionRegistry registry) { - Descriptor root = getDescriptorForClass(clazz); - Set<Descriptor> descriptors = new HashSet<>(); - recursivelyAddDescriptors(root, descriptors, registry); - return descriptors; - } - - /** - * Recursively walks the given {@link Message} class and verifies that every field or message - * linked in uses the Protocol Buffers proto2 syntax. - */ - static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) { - for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) { - Syntax s = d.getFile().getSyntax(); - checkArgument( - s == Syntax.PROTO2, - "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s", - clazz.getName(), - d.getFullName(), - d.getFile().getName()); - } - } - - /** - * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot - * be deterministically encoded. - * - * @throws NonDeterministicException if the object cannot be encoded deterministically. - */ - static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException { - Class<? extends Message> message = coder.getMessageType(); - ExtensionRegistry registry = coder.getExtensionRegistry(); - Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry); - for (Descriptor d : descriptors) { - for (FieldDescriptor fd : d.getFields()) { - // If there is a transitively reachable Protocol Buffers map field, then this object cannot - // be encoded deterministically. - if (fd.isMapField()) { - String reason = - String.format( - "Protocol Buffers message %s transitively includes Map field %s (from file %s)." - + " Maps cannot be deterministically encoded.", - message.getName(), - fd.getFullName(), - fd.getFile().getFullName()); - throw new NonDeterministicException(coder, reason); - } - } - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - // Disable construction of utility class - private ProtobufUtil() {} - - private static void recursivelyAddDescriptors( - Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) { - if (descriptors.contains(message)) { - return; - } - descriptors.add(message); - - for (FieldDescriptor f : message.getFields()) { - recursivelyAddDescriptors(f, descriptors, registry); - } - for (FieldDescriptor f : message.getExtensions()) { - recursivelyAddDescriptors(f, descriptors, registry); - } - for (ExtensionInfo info : - registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) { - recursivelyAddDescriptors(info.descriptor, descriptors, registry); - } - for (ExtensionInfo info : - registry.getAllMutableExtensionsByExtendedType(message.getFullName())) { - recursivelyAddDescriptors(info.descriptor, descriptors, registry); - } - } - - private static void recursivelyAddDescriptors( - FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) { - switch (field.getType()) { - case BOOL: - case BYTES: - case DOUBLE: - case ENUM: - case FIXED32: - case FIXED64: - case FLOAT: - case INT32: - case INT64: - case SFIXED32: - case SFIXED64: - case SINT32: - case SINT64: - case STRING: - case UINT32: - case UINT64: - // Primitive types do not transitively access anything else. - break; - - case GROUP: - case MESSAGE: - // Recursively adds all the fields from this nested Message. - recursivelyAddDescriptors(field.getMessageType(), descriptors, registry); - break; - - default: - throw new UnsupportedOperationException( - "Unexpected Protocol Buffers field type: " + field.getType()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java deleted file mode 100644 index bd16484..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines a {@link org.apache.beam.sdk.coders.Coder} - * for Protocol Buffers messages, {@code ProtoCoder}. - * - * @see org.apache.beam.sdk.coders.protobuf.ProtoCoder - */ -package org.apache.beam.sdk.coders.protobuf; http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/proto/README.md b/sdks/java/core/src/main/proto/README.md deleted file mode 100644 index b6d91df..0000000 --- a/sdks/java/core/src/main/proto/README.md +++ /dev/null @@ -1,45 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - -## Protocol Buffers in Apache Beam - -This directory contains the Protocol Buffer messages used in Apache Beam. - -They aren't, however, used during the Maven build process, and are included here -for completeness only. Instead, the following artifact on Maven Central contains -the binary version of the generated code from these Protocol Buffers: - - <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> - <version>LATEST</version> - </dependency> - -Please follow this process for testing changes: - -* Make changes to the Protocol Buffer messages in this directory. -* Use `protoc` to generate the new code, and compile it into a new Java library. -* Install that Java library into your local Maven repository. -* Update SDK's `pom.xml` to pick up the newly installed library, instead of -downloading it from Maven Central. - -Once the changes are ready for submission, please separate them into two -commits. The first commit should update the Protocol Buffer messages only. After -that, we need to update the generated artifact on Maven Central. Finally, -changes that make use of the Protocol Buffer changes may be committed. http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto b/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto deleted file mode 100644 index b1abe46..0000000 --- a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Protocol Buffer messages used for testing Proto2Coder implementation. - */ - -syntax = "proto2"; - -package proto2_coder_test_messages; - -option java_package = "org.apache.beam.sdk.coders"; - -message MessageA { - optional string field1 = 1; - repeated MessageB field2 = 2; -} - -message MessageB { - optional bool field1 = 1; -} - -message MessageC { - extensions 100 to 105; -} - -extend MessageC { - optional MessageA field1 = 101; - optional MessageB field2 = 102; -} - -message MessageWithMap { - map<string, MessageA> field1 = 1; -} - -message ReferencesMessageWithMap { - repeated MessageWithMap field1 = 1; -} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 10e011f..616e88e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -22,10 +22,8 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import com.google.auto.service.AutoService; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Duration; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -38,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException; -import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; @@ -87,17 +84,6 @@ public class CoderRegistryTest { } @Test - public void testProtoCoderFallbackCoderProvider() throws Exception { - CoderRegistry registry = CoderRegistry.createDefault(); - - // MessageA is a Protocol Buffers test message with syntax 2 - assertEquals(registry.getDefaultCoder(MessageA.class), ProtoCoder.of(MessageA.class)); - - // Duration is a Protocol Buffers default type with syntax 3 - assertEquals(registry.getDefaultCoder(Duration.class), ProtoCoder.of(Duration.class)); - } - - @Test public void testAvroFallbackCoderProvider() throws Exception { CoderRegistry registry = CoderRegistry.createDefault(); registry.setFallbackCoderProvider(AvroCoder.PROVIDER); http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java deleted file mode 100644 index 8b889da..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders.protobuf; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap; -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ProtoCoder}. - */ -@RunWith(JUnit4.class) -public class ProtoCoderTest { - - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testFactoryMethodAgreement() throws Exception { - assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), ProtoCoder.of(MessageA.class)); - - assertEquals( - ProtoCoder.of(new TypeDescriptor<MessageA>() {}), - ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() {})); - } - - @Test - public void testProviderCannotProvideCoder() throws Exception { - thrown.expect(CannotProvideCoderException.class); - thrown.expectMessage("java.lang.Integer is not a subclass of com.google.protobuf.Message"); - - ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {}); - } - - @Test - public void testCoderEncodeDecodeEqual() throws Exception { - MessageA value = - MessageA.newBuilder() - .setField1("hello") - .addField2(MessageB.newBuilder().setField1(true).build()) - .addField2(MessageB.newBuilder().setField1(false).build()) - .build(); - CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), value); - } - - @Test - public void testCoderEncodeDecodeEqualNestedContext() throws Exception { - MessageA value1 = - MessageA.newBuilder() - .setField1("hello") - .addField2(MessageB.newBuilder().setField1(true).build()) - .addField2(MessageB.newBuilder().setField1(false).build()) - .build(); - MessageA value2 = - MessageA.newBuilder() - .setField1("world") - .addField2(MessageB.newBuilder().setField1(false).build()) - .addField2(MessageB.newBuilder().setField1(true).build()) - .build(); - CoderProperties.coderDecodeEncodeEqual( - ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, value2)); - } - - @Test - public void testCoderEncodeDecodeExtensionsEqual() throws Exception { - MessageC value = - MessageC.newBuilder() - .setExtension( - Proto2CoderTestMessages.field1, - MessageA.newBuilder() - .setField1("hello") - .addField2(MessageB.newBuilder().setField1(true).build()) - .build()) - .setExtension( - Proto2CoderTestMessages.field2, MessageB.newBuilder().setField1(false).build()) - .build(); - CoderProperties.coderDecodeEncodeEqual( - ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class), value); - } - - @Test - public void testCoderSerialization() throws Exception { - ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class); - CoderProperties.coderSerializable(coder); - } - - @Test - public void testCoderExtensionsSerialization() throws Exception { - ProtoCoder<MessageC> coder = - ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class); - CoderProperties.coderSerializable(coder); - } - - @Test - public void testEncodingId() throws Exception { - Coder<MessageA> coderA = ProtoCoder.of(MessageA.class); - CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + "[]"); - - ProtoCoder<MessageC> coder = - ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class); - CoderProperties.coderHasEncodingId( - coder, - String.format("%s[%s]", MessageC.class.getName(), Proto2CoderTestMessages.class.getName())); - } - - @Test - public void encodeNullThrowsCoderException() throws Exception { - thrown.expect(CoderException.class); - thrown.expectMessage("cannot encode a null MessageA"); - - CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null); - } - - @Test - public void testDeterministicCoder() throws NonDeterministicException { - Coder<MessageA> coder = ProtoCoder.of(MessageA.class); - coder.verifyDeterministic(); - } - - @Test - public void testNonDeterministicCoder() throws NonDeterministicException { - thrown.expect(NonDeterministicException.class); - thrown.expectMessage(MessageWithMap.class.getName() + " transitively includes Map field"); - - Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class); - coder.verifyDeterministic(); - } - - @Test - public void testNonDeterministicProperty() throws CoderException { - MessageWithMap.Builder msg1B = MessageWithMap.newBuilder(); - MessageWithMap.Builder msg2B = MessageWithMap.newBuilder(); - - // Built in reverse order but with equal contents. - for (int i = 0; i < 10; ++i) { - msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance()); - msg2B.getMutableField1().put("key" + (9 - i), MessageA.getDefaultInstance()); - } - - // Assert the messages are equal. - MessageWithMap msg1 = msg1B.build(); - MessageWithMap msg2 = msg2B.build(); - assertEquals(msg2, msg1); - - // Assert the encoded messages are not equal. - Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class); - assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), CoderUtils.encodeToBase64(coder, msg1)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java deleted file mode 100644 index 1408048..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.coders.protobuf; - -import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.checkProto2Syntax; -import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass; -import static org.apache.beam.sdk.coders.protobuf.ProtobufUtil.verifyDeterministic; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap; -import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.ReferencesMessageWithMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.google.protobuf.Any; -import com.google.protobuf.Descriptors.GenericDescriptor; -import com.google.protobuf.Duration; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ProtobufUtil}. - */ -@RunWith(JUnit4.class) -public class ProtobufUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static final Set<String> MESSAGE_A_ONLY = - ImmutableSet.of("proto2_coder_test_messages.MessageA"); - - private static final Set<String> MESSAGE_B_ONLY = - ImmutableSet.of("proto2_coder_test_messages.MessageB"); - - private static final Set<String> MESSAGE_C_ONLY = - ImmutableSet.of("proto2_coder_test_messages.MessageC"); - - // map fields are actually represented as a nested Message in generated Java code. - private static final Set<String> WITH_MAP_ONLY = - ImmutableSet.of( - "proto2_coder_test_messages.MessageWithMap", - "proto2_coder_test_messages.MessageWithMap.Field1Entry"); - - private static final Set<String> REFERS_MAP_ONLY = - ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap"); - - // A references A and B. - private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, MESSAGE_B_ONLY); - - // C, only with registered extensions, references A. - private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, MESSAGE_A_ALL); - - // MessageWithMap references A. - private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, MESSAGE_A_ALL); - - // ReferencesMessageWithMap references MessageWithMap. - private static final Set<String> REFERS_MAP_ALL = Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL); - - @Test - public void testRecursiveDescriptorsMessageA() { - assertThat(getRecursiveDescriptorFullNames(MessageA.class), equalTo(MESSAGE_A_ALL)); - } - - @Test - public void testRecursiveDescriptorsMessageB() { - assertThat(getRecursiveDescriptorFullNames(MessageB.class), equalTo(MESSAGE_B_ONLY)); - } - - @Test - public void testRecursiveDescriptorsMessageC() { - assertThat(getRecursiveDescriptorFullNames(MessageC.class), equalTo(MESSAGE_C_ONLY)); - } - - @Test - public void testRecursiveDescriptorsMessageCWithExtensions() { - // With extensions, Message C has a reference to Message A and Message B. - ExtensionRegistry registry = ExtensionRegistry.newInstance(); - Proto2CoderTestMessages.registerAllExtensions(registry); - assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), equalTo(MESSAGE_C_EXT)); - } - - @Test - public void testRecursiveDescriptorsMessageWithMap() { - assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), equalTo(WITH_MAP_ALL)); - } - - @Test - public void testRecursiveDescriptorsReferencesMessageWithMap() { - assertThat( - getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), equalTo(REFERS_MAP_ALL)); - } - - @Test - public void testVerifyProto2() { - checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry()); - checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry()); - checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry()); - checkProto2Syntax(MessageWithMap.class, ExtensionRegistry.getEmptyRegistry()); - checkProto2Syntax(ReferencesMessageWithMap.class, ExtensionRegistry.getEmptyRegistry()); - } - - @Test - public void testAnyIsNotProto2() { - // Any is a core Protocol Buffers type that uses proto3 syntax. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(Any.class.getCanonicalName()); - thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName()); - - checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry()); - } - - @Test - public void testDurationIsNotProto2() { - // Duration is a core Protocol Buffers type that uses proto3 syntax. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(Duration.class.getCanonicalName()); - thrown.expectMessage("in file " + Duration.getDescriptor().getFile().getName()); - - checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry()); - } - - @Test - public void testDurationIsDeterministic() throws NonDeterministicException { - // Duration can be encoded deterministically. - verifyDeterministic(ProtoCoder.of(Duration.class)); - } - - @Test - public void testMessageWithMapIsNotDeterministic() throws NonDeterministicException { - String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName(); - thrown.expect(NonDeterministicException.class); - thrown.expectMessage(MessageWithMap.class.getName()); - thrown.expectMessage("transitively includes Map field " + mapFieldName); - thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName()); - - verifyDeterministic(ProtoCoder.of(MessageWithMap.class)); - } - - @Test - public void testMessageWithTransitiveMapIsNotDeterministic() throws NonDeterministicException { - String mapFieldName = MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName(); - thrown.expect(NonDeterministicException.class); - thrown.expectMessage(ReferencesMessageWithMap.class.getName()); - thrown.expectMessage("transitively includes Map field " + mapFieldName); - thrown.expectMessage("file " + MessageWithMap.getDescriptor().getFile().getName()); - - verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class)); - } - - //////////////////////////////////////////////////////////////////////////////////////////// - - /** Helper used to test the recursive class traversal and print good error messages. */ - private static Set<String> getRecursiveDescriptorFullNames(Class<? extends Message> clazz) { - return getRecursiveDescriptorFullNames(clazz, ExtensionRegistry.getEmptyRegistry()); - } - - /** Helper used to test the recursive class traversal and print good error messages. */ - private static Set<String> getRecursiveDescriptorFullNames( - Class<? extends Message> clazz, ExtensionRegistry registry) { - Set<String> result = new HashSet<>(); - for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) { - result.add(d.getFullName()); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index dde8be5..8a48eca 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -35,6 +35,7 @@ <module>gcp-core</module> <module>jackson</module> <module>join-library</module> + <module>protobuf</module> <module>sorter</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml new file mode 100644 index 0000000..9a54254 --- /dev/null +++ b/sdks/java/extensions/protobuf/pom.xml @@ -0,0 +1,142 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-protobuf</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Protobuf</name> + <description>Add support to Apache Beam for Google Protobuf.</description> + + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>test-compile</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <!-- build dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java new file mode 100644 index 0000000..99a0838 --- /dev/null +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.util.CloudObject; +import org.apache.beam.sdk.util.Structs; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both + * Protocol Buffers syntax versions 2 and 3. + * + * <p>To learn more about Protocol Buffers, visit: + * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a> + * + * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default + * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but + * these extensions must be registered for a particular {@link ProtoCoder} instance and that + * instance must be registered on the {@link PCollection} that needs the extensions: + * + * <pre>{@code + * import MyProtoFile; + * import MyProtoFile.MyMessage; + * + * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class); + * PCollection<MyMessage> records = input.apply(...).setCoder(coder); + * }</pre> + * + * <h3>Versioning</h3> + * + * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However, + * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the + * version of <code>protoc</code> that was used to produce the JAR files containing the compiled + * <code>.proto</code> messages. + * + * <p>For more information, see the + * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>. + * + * <h3>{@link ProtoCoder} and Determinism</h3> + * + * <p>In general, Protocol Buffers messages can be encoded deterministically within a single + * pipeline as long as: + * + * <ul> + * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code> + * fields.</li> + * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the + * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li> + * </ul> + * + * <h3>{@link ProtoCoder} and Encoding Stability</h3> + * + * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language + * guides for + * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a> + * and + * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a> + * syntaxes, depending on your message type. Following these guidelines will ensure that the + * old encoded data can be read by new versions of the code. + * + * <p>Generally, any change to the message type, registered extensions, runtime library, or + * compiled proto JARs may change the encoding. Thus even if both the original and updated messages + * can be encoded deterministically within a single job, these deterministic encodings may not be + * the same across jobs. + * + * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}. + */ +public class ProtoCoder<T extends Message> extends AtomicCoder<T> { + + /** + * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty + * {@link ExtensionRegistry}. + */ + public static CoderProvider coderProvider() { + return PROVIDER; + } + + /** + * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}. + */ + public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) { + return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of()); + } + + /** + * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given + * {@link TypeDescriptor}. + */ + public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) { + @SuppressWarnings("unchecked") + Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType(); + return of(protoMessageClass); + } + + /** + * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes + * registered. + * + * <p>Each of the extension host classes must be an class automatically generated by the + * Protocol Buffers compiler, {@code protoc}, that contains messages. + * + * <p>Does not modify this object. + */ + public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) { + for (Class<?> extensionHost : moreExtensionHosts) { + // Attempt to access the required method, to make sure it's present. + try { + Method registerAllExtensions = + extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); + checkArgument( + Modifier.isStatic(registerAllExtensions.getModifiers()), + "Method registerAllExtensions() must be static"); + } catch (NoSuchMethodException | SecurityException e) { + throw new IllegalArgumentException( + String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()), + e); + } + } + + return new ProtoCoder<>( + protoMessageClass, + new ImmutableSet.Builder<Class<?>>() + .addAll(extensionHostClasses) + .addAll(moreExtensionHosts) + .build()); + } + + /** + * See {@link #withExtensionsFrom(Iterable)}. + * + * <p>Does not modify this object. + */ + public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) { + return withExtensionsFrom(Arrays.asList(moreExtensionHosts)); + } + + @Override + public void encode(T value, OutputStream outStream, Context context) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); + } + if (context.isWholeStream) { + value.writeTo(outStream); + } else { + value.writeDelimitedTo(outStream); + } + } + + @Override + public T decode(InputStream inStream, Context context) throws IOException { + if (context.isWholeStream) { + return getParser().parseFrom(inStream, getExtensionRegistry()); + } else { + return getParser().parseDelimitedFrom(inStream, getExtensionRegistry()); + } + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof ProtoCoder)) { + return false; + } + ProtoCoder<?> otherCoder = (ProtoCoder<?>) other; + return protoMessageClass.equals(otherCoder.protoMessageClass) + && Sets.newHashSet(extensionHostClasses) + .equals(Sets.newHashSet(otherCoder.extensionHostClasses)); + } + + @Override + public int hashCode() { + return Objects.hash(protoMessageClass, extensionHostClasses); + } + + /** + * The encoding identifier is designed to support evolution as per the design of Protocol + * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol + * Buffers documentation at + * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating + * A Message Type</a>. + * + * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder} + * instances of the same principal message class, with the same registered extension host classes, + * and otherwise distinct. Note that the encoding ID does not encode any version of the message + * or extensions, nor does it include the message schema. + * + * <p>When modifying a message class, here are the broadest guidelines; see the above link + * for greater detail. + * + * <ul> + * <li>Do not change the numeric tags for any fields. + * <li>Never remove a <code>required</code> field. + * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults. + * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure + * the new and old types are interchangeable. + * </ul> + * + * <p>Code consuming this message class should be prepared to support <i>all</i> versions of + * the class until it is certain that no remaining serialized instances exist. + * + * <p>If backwards incompatible changes must be made, the best recourse is to change the name + * of your Protocol Buffers message class. + */ + @Override + public String getEncodingId() { + return protoMessageClass.getName() + getSortedExtensionClasses().toString(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + ProtobufUtil.verifyDeterministic(this); + } + + /** + * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports. + */ + public Class<T> getMessageType() { + return protoMessageClass; + } + + /** + * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages + * to {@code T} registered with this {@link ProtoCoder}. + */ + public ExtensionRegistry getExtensionRegistry() { + if (memoizedExtensionRegistry == null) { + ExtensionRegistry registry = ExtensionRegistry.newInstance(); + for (Class<?> extensionHost : extensionHostClasses) { + try { + extensionHost + .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class) + .invoke(null, registry); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new IllegalStateException(e); + } + } + memoizedExtensionRegistry = registry.getUnmodifiable(); + } + return memoizedExtensionRegistry; + } + + //////////////////////////////////////////////////////////////////////////////////// + // Private implementation details below. + + /** The {@link Message} type to be coded. */ + private final Class<T> protoMessageClass; + + /** + * All extension host classes included in this {@link ProtoCoder}. The extensions from these + * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding. + */ + private final Set<Class<?>> extensionHostClasses; + + // Constants used to serialize and deserialize + private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; + private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts"; + + // Transient fields that are lazy initialized and then memoized. + private transient ExtensionRegistry memoizedExtensionRegistry; + private transient Parser<T> memoizedParser; + + /** Private constructor. */ + private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) { + this.protoMessageClass = protoMessageClass; + this.extensionHostClasses = extensionHostClasses; + } + + /** + * @deprecated For JSON deserialization only. + */ + @JsonCreator + @Deprecated + public static <T extends Message> ProtoCoder<T> of( + @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, + @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { + + try { + @SuppressWarnings("unchecked") + Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); + List<Class<?>> extensionHostClasses = Lists.newArrayList(); + if (extensionHostClassNames != null) { + for (String extensionHostClassName : extensionHostClassNames) { + extensionHostClasses.add(Class.forName(extensionHostClassName)); + } + } + return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public CloudObject initializeCloudObject() { + CloudObject result = CloudObject.forClass(getClass()); + Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); + List<CloudObject> extensionHostClassNames = Lists.newArrayList(); + for (String className : getSortedExtensionClasses()) { + extensionHostClassNames.add(CloudObject.forString(className)); + } + Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames); + return result; + } + + /** Get the memoized {@link Parser}, possibly initializing it lazily. */ + private Parser<T> getParser() { + if (memoizedParser == null) { + try { + @SuppressWarnings("unchecked") + T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); + @SuppressWarnings("unchecked") + Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); + memoizedParser = tParser; + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + } + return memoizedParser; + } + + static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() {}; + + /** + * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by + * {@link #coderProvider()}. + */ + private static final CoderProvider PROVIDER = + new CoderProvider() { + @Override + public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { + if (!type.isSubtypeOf(CHECK)) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide %s because %s is not a subclass of %s", + ProtoCoder.class.getSimpleName(), + type, + Message.class.getName())); + } + + @SuppressWarnings("unchecked") + TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type; + try { + @SuppressWarnings("unchecked") + Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType); + return coder; + } catch (IllegalArgumentException e) { + throw new CannotProvideCoderException(e); + } + } + }; + + private SortedSet<String> getSortedExtensionClasses() { + SortedSet<String> ret = new TreeSet<>(); + for (Class<?> clazz : extensionHostClasses) { + ret.add(clazz.getName()); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java new file mode 100644 index 0000000..68a775a --- /dev/null +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.protobuf; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FileDescriptor.Syntax; +import com.google.protobuf.Descriptors.GenericDescriptor; +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.ExtensionRegistry.ExtensionInfo; +import com.google.protobuf.Message; +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; + +/** + * Utility functions for reflecting and analyzing Protocol Buffers classes. + * + * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation. + */ +class ProtobufUtil { + /** + * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}. + * + * @throws IllegalArgumentException if there is an error in Java reflection. + */ + static Descriptor getDescriptorForClass(Class<? extends Message> clazz) { + try { + return (Descriptor) clazz.getMethod("getDescriptor").invoke(null); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as + * every class it can include transitively. + * + * @throws IllegalArgumentException if there is an error in Java reflection. + */ + static Set<Descriptor> getRecursiveDescriptorsForClass( + Class<? extends Message> clazz, ExtensionRegistry registry) { + Descriptor root = getDescriptorForClass(clazz); + Set<Descriptor> descriptors = new HashSet<>(); + recursivelyAddDescriptors(root, descriptors, registry); + return descriptors; + } + + /** + * Recursively walks the given {@link Message} class and verifies that every field or message + * linked in uses the Protocol Buffers proto2 syntax. + */ + static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) { + for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) { + Syntax s = d.getFile().getSyntax(); + checkArgument( + s == Syntax.PROTO2, + "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s", + clazz.getName(), + d.getFullName(), + d.getFile().getName()); + } + } + + /** + * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot + * be deterministically encoded. + * + * @throws NonDeterministicException if the object cannot be encoded deterministically. + */ + static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException { + Class<? extends Message> message = coder.getMessageType(); + ExtensionRegistry registry = coder.getExtensionRegistry(); + Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry); + for (Descriptor d : descriptors) { + for (FieldDescriptor fd : d.getFields()) { + // If there is a transitively reachable Protocol Buffers map field, then this object cannot + // be encoded deterministically. + if (fd.isMapField()) { + String reason = + String.format( + "Protocol Buffers message %s transitively includes Map field %s (from file %s)." + + " Maps cannot be deterministically encoded.", + message.getName(), + fd.getFullName(), + fd.getFile().getFullName()); + throw new NonDeterministicException(coder, reason); + } + } + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + // Disable construction of utility class + private ProtobufUtil() {} + + private static void recursivelyAddDescriptors( + Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) { + if (descriptors.contains(message)) { + return; + } + descriptors.add(message); + + for (FieldDescriptor f : message.getFields()) { + recursivelyAddDescriptors(f, descriptors, registry); + } + for (FieldDescriptor f : message.getExtensions()) { + recursivelyAddDescriptors(f, descriptors, registry); + } + for (ExtensionInfo info : + registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) { + recursivelyAddDescriptors(info.descriptor, descriptors, registry); + } + for (ExtensionInfo info : + registry.getAllMutableExtensionsByExtendedType(message.getFullName())) { + recursivelyAddDescriptors(info.descriptor, descriptors, registry); + } + } + + private static void recursivelyAddDescriptors( + FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) { + switch (field.getType()) { + case BOOL: + case BYTES: + case DOUBLE: + case ENUM: + case FIXED32: + case FIXED64: + case FLOAT: + case INT32: + case INT64: + case SFIXED32: + case SFIXED64: + case SINT32: + case SINT64: + case STRING: + case UINT32: + case UINT64: + // Primitive types do not transitively access anything else. + break; + + case GROUP: + case MESSAGE: + // Recursively adds all the fields from this nested Message. + recursivelyAddDescriptors(field.getMessageType(), descriptors, registry); + break; + + default: + throw new UnsupportedOperationException( + "Unexpected Protocol Buffers field type: " + field.getType()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java new file mode 100644 index 0000000..b69bc8b --- /dev/null +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines a {@link org.apache.beam.sdk.coders.Coder} + * for Protocol Buffers messages, {@code ProtoCoder}. + * + * @see org.apache.beam.sdk.extensions.protobuf.ProtoCoder + */ +package org.apache.beam.sdk.extensions.protobuf;
