ProtoCoder: a Coder for Protocol Buffers Messages

----Release Notes----
Adds ProtoCoder, which is a Coder for Protocol Buffers messages. ProtoCoder has
the following advantages over Proto2Coder:
 * ProtoCoder is designed to support Protocol Buffers syntax versions 2 and 3,
   not just proto2.
 * ProtoCoder dynamically inspects the Message type it encodes and will throw a
   Coder.NonDeterministicException when the configured message cannot be
   encoded deterministically.
 * Because early versions of the Protocol Buffers 2 syntax did not allow
   non-deterministic types, Proto2Coder does not do this type inspection and
   instead always claims to be able to coder deterministically. For backwards
   compatibility, we are not changing this behavior.

Proto2Coder is now deprecated; we recommend that all users switch to
ProtoCoder.
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115411869


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01fd8595
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01fd8595
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01fd8595

Branch: refs/heads/master
Commit: 01fd8595e7d54009d313e98889fdf322d98b5c63
Parents: db708bb
Author: dhalperi <[email protected]>
Authored: Tue Feb 23 20:07:32 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 sdk/pom.xml                                     |   2 +-
 .../dataflow/sdk/coders/CoderRegistry.java      |   5 +-
 .../cloud/dataflow/sdk/coders/Proto2Coder.java  |   3 +
 .../sdk/coders/StringDelegateCoder.java         |   4 +-
 .../sdk/coders/protobuf/ProtoCoder.java         | 411 +++++++++++++++++++
 .../sdk/coders/protobuf/ProtobufUtil.java       | 171 ++++++++
 .../dataflow/sdk/coders/CoderRegistryTest.java  |  14 +-
 .../dataflow/sdk/coders/Proto2CoderTest.java    |   1 +
 .../sdk/coders/protobuf/ProtoCoderTest.java     | 182 ++++++++
 .../sdk/coders/protobuf/ProtobufUtilTest.java   | 195 +++++++++
 10 files changed, 980 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index bdbdd5f..bc6d519 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -442,7 +442,7 @@
     <dependency>
       <groupId>com.google.cloud.dataflow</groupId>
       <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
-      <version>0.5.160127</version>
+      <version>0.5.160222</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
index 1ef0971..00982e6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
@@ -18,6 +18,7 @@ package com.google.cloud.dataflow.sdk.coders;
 
 import com.google.api.services.bigquery.model.TableRow;
 import 
com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException.ReasonCode;
+import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.util.CoderUtils;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -81,8 +82,8 @@ public class CoderRegistry implements CoderProvider {
   private static final Logger LOG = 
LoggerFactory.getLogger(CoderRegistry.class);
 
   public CoderRegistry() {
-    
setFallbackCoderProvider(CoderProviders.firstOf(Proto2Coder.coderProvider(),
-        SerializableCoder.PROVIDER));
+    setFallbackCoderProvider(
+        CoderProviders.firstOf(ProtoCoder.coderProvider(), 
SerializableCoder.PROVIDER));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
index 56ec0c7..ef91ba9 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
@@ -17,6 +17,7 @@ package com.google.cloud.dataflow.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
 import com.google.cloud.dataflow.sdk.util.CloudObject;
 import com.google.cloud.dataflow.sdk.util.Structs;
 import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
@@ -71,7 +72,9 @@ import javax.annotation.Nullable;
  * </pre>
  *
  * @param <T> the type of elements handled by this coder, must extend {@code 
Message}
+ * @deprecated Use {@link ProtoCoder}.
  */
+@Deprecated
 public class Proto2Coder<T extends Message> extends AtomicCoder<T> {
 
   /** The class of Protobuf message to be encoded. */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
index 9696b58..1fc1247 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
@@ -16,6 +16,8 @@
 
 package com.google.cloud.dataflow.sdk.coders;
 
+import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
+
 import java.lang.reflect.InvocationTargetException;
 
 /**
@@ -35,7 +37,7 @@ import java.lang.reflect.InvocationTargetException;
  *
  * <p>This method of encoding is not designed for ease of evolution of {@code 
Clazz};
  * it should only be used in cases where the class is stable or the encoding 
is not
- * important. If evolution of the class is important, see {@link Proto2Coder}, 
{@link AvroCoder},
+ * important. If evolution of the class is important, see {@link ProtoCoder}, 
{@link AvroCoder},
  * or {@link JAXBCoder}.
  *
  * @param <T> The type of objects coded.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
new file mode 100644
index 0000000..d8c8e9e
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
@@ -0,0 +1,411 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.coders.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.datastore.DatastoreV1;
+import com.google.api.services.datastore.DatastoreV1.Entity;
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.CoderProvider;
+import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
+import com.google.cloud.dataflow.sdk.util.CloudObject;
+import com.google.cloud.dataflow.sdk.util.Structs;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Coder} using Google Protocol Buffers binary format. {@link 
ProtoCoder} supports both
+ * Protocol Buffers syntax versions 2 and 3.
+ *
+ * <p>To learn more about Protocol Buffers, visit:
+ * <a 
href="https://developers.google.com/protocol-buffers";>https://developers.google.com/protocol-buffers</a>
+ *
+ * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as 
the default
+ * {@link Coder} for any {@link Message} object. Custom message extensions are 
also supported, but
+ * these extensions must be registered for a particular {@link ProtoCoder} 
instance and that
+ * instance must be registered on the {@link PCollection} that needs the 
extensions:
+ *
+ * <pre>{@code
+ * import MyProtoFile;
+ * import MyProtoFile.MyMessage;
+ *
+ * Coder<MyMessage> coder = 
ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
+ * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
+ * }</pre>
+ *
+ * <h3>Versioning</h3>
+ *
+ * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol 
Buffers syntax. However,
+ * the Java runtime version of the <code>google.com.protobuf</code> library 
must match exactly the
+ * version of <code>protoc</code> that was used to produce the JAR files 
containing the compiled
+ * <code>.proto</code> messages.
+ *
+ * <p>For more information, see the
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types";>Protocol
 Buffers documentation</a>.
+ *
+ * <h3>{@link ProtoCoder} and Determinism</h3>
+ *
+ * <p>In general, Protocol Buffers messages can be encoded deterministically 
within a single
+ * pipeline as long as:
+ *
+ * <ul>
+ * <li>The encoded messages (and any transitively linked messages) do not use 
<code>map</code>
+ *     fields.</li>
+ * <li>Every Java VM that encodes or decodes the messages use the same runtime 
version of the
+ *     Protocol Buffers library and the same compiled <code>.proto</code> file 
JAR.</li>
+ * </ul>
+ *
+ * <h3>{@link ProtoCoder} and Encoding Stability</h3>
+ *
+ * <p>When changing Protocol Buffers messages, follow the rules in the 
Protocol Buffers language
+ * guides for
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>{@code
 proto2}</a>
+ * and
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#updating";>{@code
 proto3}</a>
+ * syntaxes, depending on your message type. Following these guidelines will 
ensure that the
+ * old encoded data can be read by new versions of the code.
+ *
+ * <p>Generally, any change to the message type, registered extensions, 
runtime library, or
+ * compiled proto JARs may change the encoding. Thus even if both the original 
and updated messages
+ * can be encoded deterministically within a single job, these deterministic 
encodings may not be
+ * the same across jobs.
+ *
+ * @param <T> the Protocol Buffers {@link Message} handled by this {@link 
Coder}.
+ */
+public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+
+  /**
+   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
+   * {@link ExtensionRegistry}.
+   */
+  public static CoderProvider coderProvider() {
+    return PROVIDER;
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link 
Message}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(Class<T> 
protoMessageClass) {
+    return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} 
indicated by the given
+   * {@link TypeDescriptor}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> 
protoMessageType) {
+    @SuppressWarnings("unchecked")
+    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
+    return of(protoMessageClass);
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} like this one, but with the extensions from 
the given classes
+   * registered.
+   *
+   * <p>Each of the extension host classes must be an class automatically 
generated by the
+   * Protocol Buffers compiler, {@code protoc}, that contains messages. For 
example, the class
+   * {@link Proto2CoderTestMessages} is the extension host for the {@link 
Message} classes
+   * {@link MessageA Proto2CoderTestMessages.MessageA} and the class {@link 
DatastoreV1} is the
+   * extension host for the Google Cloud Datastore {@link Entity} entity type.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> 
moreExtensionHosts) {
+    for (Class<?> extensionHost : moreExtensionHosts) {
+      // Attempt to access the required method, to make sure it's present.
+      try {
+        Method registerAllExtensions =
+            extensionHost.getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class);
+        checkArgument(
+            Modifier.isStatic(registerAllExtensions.getModifiers()),
+            "Method registerAllExtensions() must be static");
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new IllegalArgumentException(
+            String.format("Unable to register extensions for %s", 
extensionHost.getCanonicalName()),
+            e);
+      }
+    }
+
+    return new ProtoCoder<T>(
+        protoMessageClass,
+        new ImmutableSet.Builder<Class<?>>()
+            .addAll(extensionHostClasses)
+            .addAll(moreExtensionHosts)
+            .build());
+  }
+
+  /**
+   * See {@link #withExtensionsFrom(Iterable)}.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
+    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null " + 
protoMessageClass.getSimpleName());
+    }
+    if (context.isWholeStream) {
+      value.writeTo(outStream);
+    } else {
+      value.writeDelimitedTo(outStream);
+    }
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws IOException {
+    if (context.isWholeStream) {
+      return getParser().parseFrom(inStream, getExtensionRegistry());
+    } else {
+      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (!(other instanceof ProtoCoder)) {
+      return false;
+    }
+    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
+    return protoMessageClass.equals(otherCoder.protoMessageClass)
+        && Sets.newHashSet(extensionHostClasses)
+            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(protoMessageClass, extensionHostClasses);
+  }
+
+  /**
+   * The encoding identifier is designed to support evolution as per the 
design of Protocol
+   * Buffers. In order to use this class effectively, carefully follow the 
advice in the Protocol
+   * Buffers documentation at
+   * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>Updating
+   * A Message Type</a>.
+   *
+   * <p>In particular, the encoding identifier is guaranteed to be the same 
for {@link ProtoCoder}
+   * instances of the same principal message class, with the same registered 
extension host classes,
+   * and otherwise distinct. Note that the encoding ID does not encode any 
version of the message
+   * or extensions, nor does it include the message schema.
+   *
+   * <p>When modifying a message class, here are the broadest guidelines; see 
the above link
+   * for greater detail.
+   *
+   * <ul>
+   * <li>Do not change the numeric tags for any fields.
+   * <li>Never remove a <code>required</code> field.
+   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with 
sensible defaults.
+   * <li>When changing the type of a field, consult the Protocol Buffers 
documentation to ensure
+   * the new and old types are interchangeable.
+   * </ul>
+   *
+   * <p>Code consuming this message class should be prepared to support 
<i>all</i> versions of
+   * the class until it is certain that no remaining serialized instances 
exist.
+   *
+   * <p>If backwards incompatible changes must be made, the best recourse is 
to change the name
+   * of your Protocol Buffers message class.
+   */
+  @Override
+  public String getEncodingId() {
+    return protoMessageClass.getName() + 
getSortedExtensionClasses().toString();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    ProtobufUtil.verifyDeterministic(this);
+  }
+
+  /**
+   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} 
supports.
+   */
+  public Class<T> getMessageType() {
+    return protoMessageClass;
+  }
+
+  /**
+   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers 
extension messages
+   * to {@code T} registered with this {@link ProtoCoder}.
+   */
+  public ExtensionRegistry getExtensionRegistry() {
+    if (memoizedExtensionRegistry == null) {
+      ExtensionRegistry registry = ExtensionRegistry.newInstance();
+      for (Class<?> extensionHost : extensionHostClasses) {
+        try {
+          extensionHost
+              .getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class)
+              .invoke(null, registry);
+        } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      memoizedExtensionRegistry = registry.getUnmodifiable();
+    }
+    return memoizedExtensionRegistry;
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////
+  // Private implementation details below.
+
+  /** The {@link Message} type to be coded. */
+  private final Class<T> protoMessageClass;
+
+  /**
+   * All extension host classes included in this {@link ProtoCoder}. The 
extensions from these
+   * classes will be included in the {@link ExtensionRegistry} used during 
encoding and decoding.
+   */
+  private final Set<Class<?>> extensionHostClasses;
+
+  // Constants used to serialize and deserialize
+  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
+  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
+
+  // Transient fields that are lazy initialized and then memoized.
+  private transient ExtensionRegistry memoizedExtensionRegistry;
+  private transient Parser<T> memoizedParser;
+
+  /** Private constructor. */
+  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> 
extensionHostClasses) {
+    this.protoMessageClass = protoMessageClass;
+    this.extensionHostClasses = extensionHostClasses;
+  }
+
+  /**
+   * @deprecated For JSON deserialization only.
+   */
+  @JsonCreator
+  @Deprecated
+  public static <T extends Message> ProtoCoder<T> of(
+      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
+      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> 
extensionHostClassNames) {
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<T> protoMessageClass = (Class<T>) 
Class.forName(protoMessageClassName);
+      List<Class<?>> extensionHostClasses = Lists.newArrayList();
+      if (extensionHostClassNames != null) {
+        for (String extensionHostClassName : extensionHostClassNames) {
+          extensionHostClasses.add(Class.forName(extensionHostClassName));
+        }
+      }
+      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public CloudObject asCloudObject() {
+    CloudObject result = super.asCloudObject();
+    Structs.addString(result, PROTO_MESSAGE_CLASS, 
protoMessageClass.getName());
+    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
+    for (String className : getSortedExtensionClasses()) {
+      extensionHostClassNames.add(CloudObject.forString(className));
+    }
+    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
+    return result;
+  }
+
+  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
+  private Parser<T> getParser() {
+    if (memoizedParser == null) {
+      try {
+        @SuppressWarnings("unchecked")
+        T protoMessageInstance = (T) 
protoMessageClass.getMethod("getDefaultInstance").invoke(null);
+        @SuppressWarnings("unchecked")
+        Parser<T> tParser = (Parser<T>) 
protoMessageInstance.getParserForType();
+        memoizedParser = tParser;
+      } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return memoizedParser;
+  }
+
+  /**
+   * The implementation of the {@link CoderProvider} for this {@link 
ProtoCoder} returned by
+   * {@link #coderProvider()}.
+   */
+  private static final CoderProvider PROVIDER =
+      new CoderProvider() {
+        @Override
+        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException {
+          if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
+            throw new CannotProvideCoderException(
+                String.format(
+                    "Cannot provide %s because %s is not a subclass of %s",
+                    ProtoCoder.class.getSimpleName(),
+                    type,
+                    Message.class.getName()));
+          }
+
+          @SuppressWarnings("unchecked")
+          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? 
extends Message>) type;
+          try {
+            @SuppressWarnings("unchecked")
+            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+            return coder;
+          } catch (IllegalArgumentException e) {
+            throw new CannotProvideCoderException(e);
+          }
+        }
+      };
+
+  private SortedSet<String> getSortedExtensionClasses() {
+    SortedSet<String> ret = new TreeSet<>();
+    for (Class<?> clazz : extensionHostClasses) {
+      ret.add(clazz.getName());
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
new file mode 100644
index 0000000..597b1de
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.coders.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import com.google.protobuf.Descriptors.GenericDescriptor;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
+import com.google.protobuf.Message;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility functions for reflecting and analyzing Protocol Buffers classes.
+ *
+ * <p>Used by {@link ProtoCoder}, but in a separate file for testing and 
isolation.
+ */
+class ProtobufUtil {
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message}.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
+    try {
+      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
+    } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message} as well as
+   * every class it can include transitively.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Set<Descriptor> getRecursiveDescriptorsForClass(
+      Class<? extends Message> clazz, ExtensionRegistry registry) {
+    Descriptor root = getDescriptorForClass(clazz);
+    Set<Descriptor> descriptors = new HashSet<>();
+    recursivelyAddDescriptors(root, descriptors, registry);
+    return descriptors;
+  }
+
+  /**
+   * Recursively walks the given {@link Message} class and verifies that every 
field or message
+   * linked in uses the Protocol Buffers proto2 syntax.
+   */
+  static void checkProto2Syntax(Class<? extends Message> clazz, 
ExtensionRegistry registry) {
+    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, 
registry)) {
+      Syntax s = d.getFile().getSyntax();
+      checkArgument(
+          s == Syntax.PROTO2,
+          "Message %s or one of its dependencies does not use proto2 syntax: 
%s in file %s",
+          clazz.getName(),
+          d.getFullName(),
+          d.getFile().getName());
+    }
+  }
+
+  /**
+   * Recursively checks whether the specified class uses any Protocol Buffers 
fields that cannot
+   * be deterministically encoded.
+   *
+   * @throws NonDeterministicException if the object cannot be encoded 
deterministically.
+   */
+  static void verifyDeterministic(ProtoCoder<?> coder) throws 
NonDeterministicException {
+    Class<? extends Message> message = coder.getMessageType();
+    ExtensionRegistry registry = coder.getExtensionRegistry();
+    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, 
registry);
+    for (Descriptor d : descriptors) {
+      for (FieldDescriptor fd : d.getFields()) {
+        // If there is a transitively reachable Protocol Buffers map field, 
then this object cannot
+        // be encoded deterministically.
+        if (fd.isMapField()) {
+          String reason =
+              String.format(
+                  "Protocol Buffers message %s transitively includes Map field 
%s (from file %s)."
+                      + " Maps cannot be deterministically encoded.",
+                  message.getName(),
+                  fd.getFullName(),
+                  fd.getFile().getFullName());
+          throw new NonDeterministicException(coder, reason);
+        }
+      }
+    }
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+  // Disable construction of utility class
+  private ProtobufUtil() {}
+
+  private static void recursivelyAddDescriptors(
+      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
+    if (descriptors.contains(message)) {
+      return;
+    }
+    descriptors.add(message);
+
+    for (FieldDescriptor f : message.getFields()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (FieldDescriptor f : message.getExtensions()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        
registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) 
{
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+  }
+
+  private static void recursivelyAddDescriptors(
+      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
+    switch (field.getType()) {
+      case BOOL:
+      case BYTES:
+      case DOUBLE:
+      case ENUM:
+      case FIXED32:
+      case FIXED64:
+      case FLOAT:
+      case INT32:
+      case INT64:
+      case SFIXED32:
+      case SFIXED64:
+      case SINT32:
+      case SINT64:
+      case STRING:
+      case UINT32:
+      case UINT64:
+        // Primitive types do not transitively access anything else.
+        break;
+
+      case GROUP:
+      case MESSAGE:
+        // Recursively adds all the fields from this nested Message.
+        recursivelyAddDescriptors(field.getMessageType(), descriptors, 
registry);
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Unexpected Protocol Buffers field type: " + field.getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderRegistryTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderRegistryTest.java
index 7fd0d22..2f350b2 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderRegistryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/CoderRegistryTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
 import 
com.google.cloud.dataflow.sdk.coders.CoderRegistry.IncompatibleCoderException;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
+import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
@@ -33,6 +35,7 @@ import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
 import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Duration;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,11 +85,14 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testProto2CoderFallbackCoderProvider() throws Exception {
+  public void testProtoCoderFallbackCoderProvider() throws Exception {
     CoderRegistry registry = getStandardRegistry();
-    Coder<Proto2CoderTestMessages.MessageA> coder =
-        registry.getDefaultCoder(Proto2CoderTestMessages.MessageA.class);
-    assertEquals(coder, Proto2Coder.of(new 
TypeDescriptor<Proto2CoderTestMessages.MessageA>() {}));
+
+    // MessageA is a Protocol Buffers test message with syntax 2
+    assertEquals(registry.getDefaultCoder(MessageA.class), 
ProtoCoder.of(MessageA.class));
+
+    // Duration is a Protocol Buffers default type with syntax 3
+    assertEquals(registry.getDefaultCoder(Duration.class), 
ProtoCoder.of(Duration.class));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/Proto2CoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/Proto2CoderTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/Proto2CoderTest.java
index f4c3557..91ebc65 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/Proto2CoderTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/Proto2CoderTest.java
@@ -35,6 +35,7 @@ import org.junit.runners.JUnit4;
 /**
  * Tests for Proto2Coder.
  */
+@SuppressWarnings("deprecation") // test of a deprecated coder.
 @RunWith(JUnit4.class)
 public class Proto2CoderTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoderTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoderTest.java
new file mode 100644
index 0000000..6f4e99d
--- /dev/null
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoderTest.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.coders.protobuf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.ListCoder;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
+import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
+import com.google.cloud.dataflow.sdk.testing.CoderProperties;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ProtoCoder}.
+ */
+@RunWith(JUnit4.class)
+public class ProtoCoderTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testFactoryMethodAgreement() throws Exception {
+    assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), 
ProtoCoder.of(MessageA.class));
+
+    assertEquals(
+        ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
+        ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() 
{}));
+  }
+
+  @Test
+  public void testProviderCannotProvideCoder() throws Exception {
+    thrown.expect(CannotProvideCoderException.class);
+    thrown.expectMessage("java.lang.Integer is not a subclass of 
com.google.protobuf.Message");
+
+    ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
+  }
+
+  @Test
+  public void testCoderEncodeDecodeEqual() throws Exception {
+    MessageA value =
+        MessageA.newBuilder()
+            .setField1("hello")
+            .addField2(MessageB.newBuilder().setField1(true).build())
+            .addField2(MessageB.newBuilder().setField1(false).build())
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), 
value);
+  }
+
+  @Test
+  public void testCoderEncodeDecodeEqualNestedContext() throws Exception {
+    MessageA value1 =
+        MessageA.newBuilder()
+            .setField1("hello")
+            .addField2(MessageB.newBuilder().setField1(true).build())
+            .addField2(MessageB.newBuilder().setField1(false).build())
+            .build();
+    MessageA value2 =
+        MessageA.newBuilder()
+            .setField1("world")
+            .addField2(MessageB.newBuilder().setField1(false).build())
+            .addField2(MessageB.newBuilder().setField1(true).build())
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(
+        ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, 
value2));
+  }
+
+  @Test
+  public void testCoderEncodeDecodeExtensionsEqual() throws Exception {
+    MessageC value =
+        MessageC.newBuilder()
+            .setExtension(
+                Proto2CoderTestMessages.field1,
+                MessageA.newBuilder()
+                    .setField1("hello")
+                    .addField2(MessageB.newBuilder().setField1(true).build())
+                    .build())
+            .setExtension(
+                Proto2CoderTestMessages.field2, 
MessageB.newBuilder().setField1(false).build())
+            .build();
+    CoderProperties.coderDecodeEncodeEqual(
+        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class),
 value);
+  }
+
+  @Test
+  public void testCoderSerialization() throws Exception {
+    ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class);
+    CoderProperties.coderSerializable(coder);
+  }
+
+  @Test
+  public void testCoderExtensionsSerialization() throws Exception {
+    ProtoCoder<MessageC> coder =
+        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
+    CoderProperties.coderSerializable(coder);
+  }
+
+  @Test
+  public void testEncodingId() throws Exception {
+    Coder<MessageA> coderA = ProtoCoder.of(MessageA.class);
+    CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + 
"[]");
+
+    ProtoCoder<MessageC> coder =
+        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
+    CoderProperties.coderHasEncodingId(
+        coder,
+        String.format("%s[%s]", MessageC.class.getName(), 
Proto2CoderTestMessages.class.getName()));
+  }
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null MessageA");
+
+    CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null);
+  }
+
+  @Test
+  public void testDeterministicCoder() throws NonDeterministicException {
+    Coder<MessageA> coder = ProtoCoder.of(MessageA.class);
+    coder.verifyDeterministic();
+  }
+
+  @Test
+  public void testNonDeterministicCoder() throws NonDeterministicException {
+    thrown.expect(NonDeterministicException.class);
+    thrown.expectMessage(MessageWithMap.class.getName() + " transitively 
includes Map field");
+
+    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
+    coder.verifyDeterministic();
+  }
+
+  @Test
+  public void testNonDeterministicProperty() throws CoderException {
+    MessageWithMap.Builder msg1B = MessageWithMap.newBuilder();
+    MessageWithMap.Builder msg2B = MessageWithMap.newBuilder();
+
+    // Built in reverse order but with equal contents.
+    for (int i = 0; i < 10; ++i) {
+      msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance());
+      msg2B.getMutableField1().put("key" + (9 - i), 
MessageA.getDefaultInstance());
+    }
+
+    // Assert the messages are equal.
+    MessageWithMap msg1 = msg1B.build();
+    MessageWithMap msg2 = msg2B.build();
+    assertEquals(msg2, msg1);
+
+    // Assert the encoded messages are not equal.
+    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
+    assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), 
CoderUtils.encodeToBase64(coder, msg1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01fd8595/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtilTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtilTest.java
new file mode 100644
index 0000000..f2192e6
--- /dev/null
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtilTest.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.coders.protobuf;
+
+import static 
com.google.cloud.dataflow.sdk.coders.protobuf.ProtobufUtil.checkProto2Syntax;
+import static 
com.google.cloud.dataflow.sdk.coders.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass;
+import static 
com.google.cloud.dataflow.sdk.coders.protobuf.ProtobufUtil.verifyDeterministic;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.datastore.DatastoreV1.Entity;
+import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
+import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
+import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
+import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.ReferencesMessageWithMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Any;
+import com.google.protobuf.Descriptors.GenericDescriptor;
+import com.google.protobuf.Duration;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests for {@link ProtobufUtil}.
+ */
+@RunWith(JUnit4.class)
+public class ProtobufUtilTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static final Set<String> MESSAGE_A_ONLY =
+      ImmutableSet.of("proto2_coder_test_messages.MessageA");
+
+  private static final Set<String> MESSAGE_B_ONLY =
+      ImmutableSet.of("proto2_coder_test_messages.MessageB");
+
+  private static final Set<String> MESSAGE_C_ONLY =
+      ImmutableSet.of("proto2_coder_test_messages.MessageC");
+
+  // map fields are actually represented as a nested Message in generated Java 
code.
+  private static final Set<String> WITH_MAP_ONLY =
+      ImmutableSet.of(
+          "proto2_coder_test_messages.MessageWithMap",
+          "proto2_coder_test_messages.MessageWithMap.Field1Entry");
+
+  private static final Set<String> REFERS_MAP_ONLY =
+      ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap");
+
+  // A references A and B.
+  private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, 
MESSAGE_B_ONLY);
+
+  // C, only with registered extensions, references A.
+  private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, 
MESSAGE_A_ALL);
+
+  // MessageWithMap references A.
+  private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, 
MESSAGE_A_ALL);
+
+  // ReferencesMessageWithMap references MessageWithMap.
+  private static final Set<String> REFERS_MAP_ALL = 
Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL);
+
+  @Test
+  public void testRecursiveDescriptorsMessageA() {
+    assertThat(getRecursiveDescriptorFullNames(MessageA.class), 
equalTo(MESSAGE_A_ALL));
+  }
+
+  @Test
+  public void testRecursiveDescriptorsMessageB() {
+    assertThat(getRecursiveDescriptorFullNames(MessageB.class), 
equalTo(MESSAGE_B_ONLY));
+  }
+
+  @Test
+  public void testRecursiveDescriptorsMessageC() {
+    assertThat(getRecursiveDescriptorFullNames(MessageC.class), 
equalTo(MESSAGE_C_ONLY));
+  }
+
+  @Test
+  public void testRecursiveDescriptorsMessageCWithExtensions() {
+    // With extensions, Message C has a reference to Message A and Message B.
+    ExtensionRegistry registry = ExtensionRegistry.newInstance();
+    Proto2CoderTestMessages.registerAllExtensions(registry);
+    assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), 
equalTo(MESSAGE_C_EXT));
+  }
+
+  @Test
+  public void testRecursiveDescriptorsMessageWithMap() {
+    assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), 
equalTo(WITH_MAP_ALL));
+  }
+
+  @Test
+  public void testRecursiveDescriptorsReferencesMessageWithMap() {
+    assertThat(
+        getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), 
equalTo(REFERS_MAP_ALL));
+  }
+
+  @Test
+  public void testVerifyProto2() {
+    // Everything in Dataflow's Proto2TestMessages uses Proto2 syntax.
+    checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry());
+    checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry());
+    checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry());
+    checkProto2Syntax(MessageWithMap.class, 
ExtensionRegistry.getEmptyRegistry());
+    checkProto2Syntax(ReferencesMessageWithMap.class, 
ExtensionRegistry.getEmptyRegistry());
+  }
+
+  @Test
+  public void testAnyIsNotProto2() {
+    // Any is a core Protocol Buffers type that uses proto3 syntax.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(Any.class.getCanonicalName());
+    thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName());
+
+    checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry());
+  }
+
+  @Test
+  public void testDurationIsNotProto2() {
+    // Duration is a core Protocol Buffers type that uses proto3 syntax.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(Duration.class.getCanonicalName());
+    thrown.expectMessage("in file " + 
Duration.getDescriptor().getFile().getName());
+
+    checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry());
+  }
+
+  @Test
+  public void testEntityIsDeterministic() throws NonDeterministicException {
+    // Cloud Datastore's Entities can be encoded deterministically.
+    verifyDeterministic(ProtoCoder.of(Entity.class));
+  }
+
+  @Test
+  public void testMessageWithMapIsNotDeterministic() throws 
NonDeterministicException {
+    String mapFieldName = 
MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
+    thrown.expect(NonDeterministicException.class);
+    thrown.expectMessage(MessageWithMap.class.getName());
+    thrown.expectMessage("transitively includes Map field " + mapFieldName);
+    thrown.expectMessage("file " + 
MessageWithMap.getDescriptor().getFile().getName());
+
+    verifyDeterministic(ProtoCoder.of(MessageWithMap.class));
+  }
+
+  @Test
+  public void testMessageWithTransitiveMapIsNotDeterministic() throws 
NonDeterministicException {
+    String mapFieldName = 
MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
+    thrown.expect(NonDeterministicException.class);
+    thrown.expectMessage(ReferencesMessageWithMap.class.getName());
+    thrown.expectMessage("transitively includes Map field " + mapFieldName);
+    thrown.expectMessage("file " + 
MessageWithMap.getDescriptor().getFile().getName());
+
+    verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class));
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////
+
+  /** Helper used to test the recursive class traversal and print good error 
messages. */
+  private static Set<String> getRecursiveDescriptorFullNames(Class<? extends 
Message> clazz) {
+    return getRecursiveDescriptorFullNames(clazz, 
ExtensionRegistry.getEmptyRegistry());
+  }
+
+  /** Helper used to test the recursive class traversal and print good error 
messages. */
+  private static Set<String> getRecursiveDescriptorFullNames(
+      Class<? extends Message> clazz, ExtensionRegistry registry) {
+    Set<String> result = new HashSet<>();
+    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, 
registry)) {
+      result.add(d.getFullName());
+    }
+    return result;
+  }
+}

Reply via email to