[BEAM-1871] Move ProtoCoder to new sdks/java/extensions/protobuf package.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff1fe7fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff1fe7fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff1fe7fa

Branch: refs/heads/master
Commit: ff1fe7fa53816fe4327c7572c13a616fc4243dc9
Parents: d7e7af8
Author: Luke Cwik <[email protected]>
Authored: Mon Apr 24 14:32:21 2017 -0700
Committer: Luke Cwik <[email protected]>
Committed: Mon Apr 24 15:04:13 2017 -0700

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |   6 -
 .../apache/beam/sdk/coders/CoderRegistry.java   |  10 +-
 .../beam/sdk/coders/StringDelegateCoder.java    |   4 +-
 .../beam/sdk/coders/protobuf/ProtoCoder.java    | 405 -------------------
 .../beam/sdk/coders/protobuf/ProtobufUtil.java  | 171 --------
 .../beam/sdk/coders/protobuf/package-info.java  |  24 --
 sdks/java/core/src/main/proto/README.md         |  45 ---
 .../main/proto/proto2_coder_test_messages.proto |  53 ---
 .../beam/sdk/coders/CoderRegistryTest.java      |  14 -
 .../sdk/coders/protobuf/ProtoCoderTest.java     | 182 ---------
 .../sdk/coders/protobuf/ProtobufUtilTest.java   | 192 ---------
 sdks/java/extensions/pom.xml                    |   1 +
 sdks/java/extensions/protobuf/pom.xml           | 142 +++++++
 .../sdk/extensions/protobuf/ProtoCoder.java     | 405 +++++++++++++++++++
 .../sdk/extensions/protobuf/ProtobufUtil.java   | 171 ++++++++
 .../sdk/extensions/protobuf/package-info.java   |  24 ++
 .../sdk/extensions/protobuf/ProtoCoderTest.java | 181 +++++++++
 .../extensions/protobuf/ProtobufUtilTest.java   | 191 +++++++++
 .../test/proto/proto2_coder_test_messages.proto |  53 +++
 19 files changed, 1173 insertions(+), 1101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index ac7a3bb..6c46453 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -294,12 +294,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
       <version>2.21</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 4238293..e0b2b3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -44,7 +44,6 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -77,9 +76,7 @@ import org.slf4j.LoggerFactory;
  *       the default {@code Coder} type. The {@link Coder} class must satisfy 
the requirements
  *       of {@link CoderProviders#fromStaticMethods}.
  *   <li>Fallback: A fallback {@link CoderProvider} is used to attempt to 
provide a {@link Coder}
- *       for any type. By default, there are two chained fallback coders:
- *       {@link ProtoCoder#coderProvider}, which can provide a coder to 
efficiently serialize any
- *       Protocol Buffers message, and then {@link 
SerializableCoder#PROVIDER}, which can provide a
+ *       for any type. By default, there is {@link 
SerializableCoder#PROVIDER}, which can provide a
  *       {@link Coder} for any type that is serializable via Java 
serialization. The fallback
  *       {@link CoderProvider} can be get and set respectively using
  *       {@link #getFallbackCoderProvider()} and {@link 
#setFallbackCoderProvider}. Multiple
@@ -165,7 +162,7 @@ public class CoderRegistry implements CoderProvider {
   private CoderRegistry() {
     coderFactoryMap = new HashMap<>(REGISTERED_CODER_FACTORIES_PER_CLASS);
     setFallbackCoderProvider(
-        CoderProviders.firstOf(ProtoCoder.coderProvider(), 
SerializableCoder.PROVIDER));
+        CoderProviders.firstOf(SerializableCoder.PROVIDER));
   }
 
   /**
@@ -423,8 +420,7 @@ public class CoderRegistry implements CoderProvider {
    * providing a {@code Coder<T>} for a type {@code T}, then the registry will 
attempt to create
    * a {@link Coder} using this {@link CoderProvider}.
    *
-   * <p>By default, this is set to the chain of {@link 
ProtoCoder#coderProvider()} and
-   * {@link SerializableCoder#PROVIDER}.
+   * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
    *
    * <p>See {@link #getFallbackCoderProvider}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index f86369c..51ead3c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -43,7 +42,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * <p>This method of encoding is not designed for ease of evolution of {@code 
Clazz};
  * it should only be used in cases where the class is stable or the encoding 
is not
- * important. If evolution of the class is important, see {@link ProtoCoder} 
or {@link AvroCoder}.
+ * important. If evolution of the class is important, see {@link AvroCoder} or 
any other
+ * evolution safe encoding.
  *
  * @param <T> The type of objects coded.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
deleted file mode 100644
index a5f53ff..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A {@link Coder} using Google Protocol Buffers binary format. {@link 
ProtoCoder} supports both
- * Protocol Buffers syntax versions 2 and 3.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a 
href="https://developers.google.com/protocol-buffers";>https://developers.google.com/protocol-buffers</a>
- *
- * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as 
the default
- * {@link Coder} for any {@link Message} object. Custom message extensions are 
also supported, but
- * these extensions must be registered for a particular {@link ProtoCoder} 
instance and that
- * instance must be registered on the {@link PCollection} that needs the 
extensions:
- *
- * <pre>{@code
- * import MyProtoFile;
- * import MyProtoFile.MyMessage;
- *
- * Coder<MyMessage> coder = 
ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
- * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
- * }</pre>
- *
- * <h3>Versioning</h3>
- *
- * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol 
Buffers syntax. However,
- * the Java runtime version of the <code>google.com.protobuf</code> library 
must match exactly the
- * version of <code>protoc</code> that was used to produce the JAR files 
containing the compiled
- * <code>.proto</code> messages.
- *
- * <p>For more information, see the
- * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types";>Protocol
 Buffers documentation</a>.
- *
- * <h3>{@link ProtoCoder} and Determinism</h3>
- *
- * <p>In general, Protocol Buffers messages can be encoded deterministically 
within a single
- * pipeline as long as:
- *
- * <ul>
- * <li>The encoded messages (and any transitively linked messages) do not use 
<code>map</code>
- *     fields.</li>
- * <li>Every Java VM that encodes or decodes the messages use the same runtime 
version of the
- *     Protocol Buffers library and the same compiled <code>.proto</code> file 
JAR.</li>
- * </ul>
- *
- * <h3>{@link ProtoCoder} and Encoding Stability</h3>
- *
- * <p>When changing Protocol Buffers messages, follow the rules in the 
Protocol Buffers language
- * guides for
- * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>{@code
 proto2}</a>
- * and
- * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#updating";>{@code
 proto3}</a>
- * syntaxes, depending on your message type. Following these guidelines will 
ensure that the
- * old encoded data can be read by new versions of the code.
- *
- * <p>Generally, any change to the message type, registered extensions, 
runtime library, or
- * compiled proto JARs may change the encoding. Thus even if both the original 
and updated messages
- * can be encoded deterministically within a single job, these deterministic 
encodings may not be
- * the same across jobs.
- *
- * @param <T> the Protocol Buffers {@link Message} handled by this {@link 
Coder}.
- */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
-
-  /**
-   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
-   * {@link ExtensionRegistry}.
-   */
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link 
Message}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(Class<T> 
protoMessageClass) {
-    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} 
indicated by the given
-   * {@link TypeDescriptor}.
-   */
-  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> 
protoMessageType) {
-    @SuppressWarnings("unchecked")
-    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
-    return of(protoMessageClass);
-  }
-
-  /**
-   * Returns a {@link ProtoCoder} like this one, but with the extensions from 
the given classes
-   * registered.
-   *
-   * <p>Each of the extension host classes must be an class automatically 
generated by the
-   * Protocol Buffers compiler, {@code protoc}, that contains messages.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> 
moreExtensionHosts) {
-    for (Class<?> extensionHost : moreExtensionHosts) {
-      // Attempt to access the required method, to make sure it's present.
-      try {
-        Method registerAllExtensions =
-            extensionHost.getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class);
-        checkArgument(
-            Modifier.isStatic(registerAllExtensions.getModifiers()),
-            "Method registerAllExtensions() must be static");
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IllegalArgumentException(
-            String.format("Unable to register extensions for %s", 
extensionHost.getCanonicalName()),
-            e);
-      }
-    }
-
-    return new ProtoCoder<>(
-        protoMessageClass,
-        new ImmutableSet.Builder<Class<?>>()
-            .addAll(extensionHostClasses)
-            .addAll(moreExtensionHosts)
-            .build());
-  }
-
-  /**
-   * See {@link #withExtensionsFrom(Iterable)}.
-   *
-   * <p>Does not modify this object.
-   */
-  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
-    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null " + 
protoMessageClass.getSimpleName());
-    }
-    if (context.isWholeStream) {
-      value.writeTo(outStream);
-    } else {
-      value.writeDelimitedTo(outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    if (context.isWholeStream) {
-      return getParser().parseFrom(inStream, getExtensionRegistry());
-    } else {
-      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof ProtoCoder)) {
-      return false;
-    }
-    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
-    return protoMessageClass.equals(otherCoder.protoMessageClass)
-        && Sets.newHashSet(extensionHostClasses)
-            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(protoMessageClass, extensionHostClasses);
-  }
-
-  /**
-   * The encoding identifier is designed to support evolution as per the 
design of Protocol
-   * Buffers. In order to use this class effectively, carefully follow the 
advice in the Protocol
-   * Buffers documentation at
-   * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>Updating
-   * A Message Type</a>.
-   *
-   * <p>In particular, the encoding identifier is guaranteed to be the same 
for {@link ProtoCoder}
-   * instances of the same principal message class, with the same registered 
extension host classes,
-   * and otherwise distinct. Note that the encoding ID does not encode any 
version of the message
-   * or extensions, nor does it include the message schema.
-   *
-   * <p>When modifying a message class, here are the broadest guidelines; see 
the above link
-   * for greater detail.
-   *
-   * <ul>
-   * <li>Do not change the numeric tags for any fields.
-   * <li>Never remove a <code>required</code> field.
-   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with 
sensible defaults.
-   * <li>When changing the type of a field, consult the Protocol Buffers 
documentation to ensure
-   * the new and old types are interchangeable.
-   * </ul>
-   *
-   * <p>Code consuming this message class should be prepared to support 
<i>all</i> versions of
-   * the class until it is certain that no remaining serialized instances 
exist.
-   *
-   * <p>If backwards incompatible changes must be made, the best recourse is 
to change the name
-   * of your Protocol Buffers message class.
-   */
-  @Override
-  public String getEncodingId() {
-    return protoMessageClass.getName() + 
getSortedExtensionClasses().toString();
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    ProtobufUtil.verifyDeterministic(this);
-  }
-
-  /**
-   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} 
supports.
-   */
-  public Class<T> getMessageType() {
-    return protoMessageClass;
-  }
-
-  /**
-   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers 
extension messages
-   * to {@code T} registered with this {@link ProtoCoder}.
-   */
-  public ExtensionRegistry getExtensionRegistry() {
-    if (memoizedExtensionRegistry == null) {
-      ExtensionRegistry registry = ExtensionRegistry.newInstance();
-      for (Class<?> extensionHost : extensionHostClasses) {
-        try {
-          extensionHost
-              .getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class)
-              .invoke(null, registry);
-        } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
-          throw new IllegalStateException(e);
-        }
-      }
-      memoizedExtensionRegistry = registry.getUnmodifiable();
-    }
-    return memoizedExtensionRegistry;
-  }
-
-  
////////////////////////////////////////////////////////////////////////////////////
-  // Private implementation details below.
-
-  /** The {@link Message} type to be coded. */
-  private final Class<T> protoMessageClass;
-
-  /**
-   * All extension host classes included in this {@link ProtoCoder}. The 
extensions from these
-   * classes will be included in the {@link ExtensionRegistry} used during 
encoding and decoding.
-   */
-  private final Set<Class<?>> extensionHostClasses;
-
-  // Constants used to serialize and deserialize
-  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
-  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
-  // Transient fields that are lazy initialized and then memoized.
-  private transient ExtensionRegistry memoizedExtensionRegistry;
-  private transient Parser<T> memoizedParser;
-
-  /** Private constructor. */
-  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> 
extensionHostClasses) {
-    this.protoMessageClass = protoMessageClass;
-    this.extensionHostClasses = extensionHostClasses;
-  }
-
-  /**
-   * @deprecated For JSON deserialization only.
-   */
-  @JsonCreator
-  @Deprecated
-  public static <T extends Message> ProtoCoder<T> of(
-      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
-      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> 
extensionHostClassNames) {
-
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> protoMessageClass = (Class<T>) 
Class.forName(protoMessageClassName);
-      List<Class<?>> extensionHostClasses = Lists.newArrayList();
-      if (extensionHostClassNames != null) {
-        for (String extensionHostClassName : extensionHostClassNames) {
-          extensionHostClasses.add(Class.forName(extensionHostClassName));
-        }
-      }
-      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    Structs.addString(result, PROTO_MESSAGE_CLASS, 
protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (String className : getSortedExtensionClasses()) {
-      extensionHostClassNames.add(CloudObject.forString(className));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-
-  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
-  private Parser<T> getParser() {
-    if (memoizedParser == null) {
-      try {
-        @SuppressWarnings("unchecked")
-        T protoMessageInstance = (T) 
protoMessageClass.getMethod("getDefaultInstance").invoke(null);
-        @SuppressWarnings("unchecked")
-        Parser<T> tParser = (Parser<T>) 
protoMessageInstance.getParserForType();
-        memoizedParser = tParser;
-      } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    return memoizedParser;
-  }
-
-  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() 
{};
-
-  /**
-   * The implementation of the {@link CoderProvider} for this {@link 
ProtoCoder} returned by
-   * {@link #coderProvider()}.
-   */
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException {
-          if (!type.isSubtypeOf(CHECK)) {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide %s because %s is not a subclass of %s",
-                    ProtoCoder.class.getSimpleName(),
-                    type,
-                    Message.class.getName()));
-          }
-
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? 
extends Message>) type;
-          try {
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
-            return coder;
-          } catch (IllegalArgumentException e) {
-            throw new CannotProvideCoderException(e);
-          }
-        }
-      };
-
-  private SortedSet<String> getSortedExtensionClasses() {
-    SortedSet<String> ret = new TreeSet<>();
-    for (Class<?> clazz : extensionHostClasses) {
-      ret.add(clazz.getName());
-    }
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
deleted file mode 100644
index 77afb47..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtil.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
-import com.google.protobuf.Message;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-
-/**
- * Utility functions for reflecting and analyzing Protocol Buffers classes.
- *
- * <p>Used by {@link ProtoCoder}, but in a separate file for testing and 
isolation.
- */
-class ProtobufUtil {
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message}.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
-    try {
-      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
-    } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  /**
-   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message} as well as
-   * every class it can include transitively.
-   *
-   * @throws IllegalArgumentException if there is an error in Java reflection.
-   */
-  static Set<Descriptor> getRecursiveDescriptorsForClass(
-      Class<? extends Message> clazz, ExtensionRegistry registry) {
-    Descriptor root = getDescriptorForClass(clazz);
-    Set<Descriptor> descriptors = new HashSet<>();
-    recursivelyAddDescriptors(root, descriptors, registry);
-    return descriptors;
-  }
-
-  /**
-   * Recursively walks the given {@link Message} class and verifies that every 
field or message
-   * linked in uses the Protocol Buffers proto2 syntax.
-   */
-  static void checkProto2Syntax(Class<? extends Message> clazz, 
ExtensionRegistry registry) {
-    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, 
registry)) {
-      Syntax s = d.getFile().getSyntax();
-      checkArgument(
-          s == Syntax.PROTO2,
-          "Message %s or one of its dependencies does not use proto2 syntax: 
%s in file %s",
-          clazz.getName(),
-          d.getFullName(),
-          d.getFile().getName());
-    }
-  }
-
-  /**
-   * Recursively checks whether the specified class uses any Protocol Buffers 
fields that cannot
-   * be deterministically encoded.
-   *
-   * @throws NonDeterministicException if the object cannot be encoded 
deterministically.
-   */
-  static void verifyDeterministic(ProtoCoder<?> coder) throws 
NonDeterministicException {
-    Class<? extends Message> message = coder.getMessageType();
-    ExtensionRegistry registry = coder.getExtensionRegistry();
-    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, 
registry);
-    for (Descriptor d : descriptors) {
-      for (FieldDescriptor fd : d.getFields()) {
-        // If there is a transitively reachable Protocol Buffers map field, 
then this object cannot
-        // be encoded deterministically.
-        if (fd.isMapField()) {
-          String reason =
-              String.format(
-                  "Protocol Buffers message %s transitively includes Map field 
%s (from file %s)."
-                      + " Maps cannot be deterministically encoded.",
-                  message.getName(),
-                  fd.getFullName(),
-                  fd.getFile().getFullName());
-          throw new NonDeterministicException(coder, reason);
-        }
-      }
-    }
-  }
-
-  
////////////////////////////////////////////////////////////////////////////////////////////////
-  // Disable construction of utility class
-  private ProtobufUtil() {}
-
-  private static void recursivelyAddDescriptors(
-      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
-    if (descriptors.contains(message)) {
-      return;
-    }
-    descriptors.add(message);
-
-    for (FieldDescriptor f : message.getFields()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (FieldDescriptor f : message.getExtensions()) {
-      recursivelyAddDescriptors(f, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        
registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-    for (ExtensionInfo info :
-        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) 
{
-      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
-    }
-  }
-
-  private static void recursivelyAddDescriptors(
-      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
-    switch (field.getType()) {
-      case BOOL:
-      case BYTES:
-      case DOUBLE:
-      case ENUM:
-      case FIXED32:
-      case FIXED64:
-      case FLOAT:
-      case INT32:
-      case INT64:
-      case SFIXED32:
-      case SFIXED64:
-      case SINT32:
-      case SINT64:
-      case STRING:
-      case UINT32:
-      case UINT64:
-        // Primitive types do not transitively access anything else.
-        break;
-
-      case GROUP:
-      case MESSAGE:
-        // Recursively adds all the fields from this nested Message.
-        recursivelyAddDescriptors(field.getMessageType(), descriptors, 
registry);
-        break;
-
-      default:
-        throw new UnsupportedOperationException(
-            "Unexpected Protocol Buffers field type: " + field.getType());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
deleted file mode 100644
index bd16484..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines a {@link org.apache.beam.sdk.coders.Coder}
- * for Protocol Buffers messages, {@code ProtoCoder}.
- *
- * @see org.apache.beam.sdk.coders.protobuf.ProtoCoder
- */
-package org.apache.beam.sdk.coders.protobuf;

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/README.md 
b/sdks/java/core/src/main/proto/README.md
deleted file mode 100644
index b6d91df..0000000
--- a/sdks/java/core/src/main/proto/README.md
+++ /dev/null
@@ -1,45 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-## Protocol Buffers in Apache Beam
-
-This directory contains the Protocol Buffer messages used in Apache Beam.
-
-They aren't, however, used during the Maven build process, and are included 
here
-for completeness only. Instead, the following artifact on Maven Central 
contains
-the binary version of the generated code from these Protocol Buffers:
-
-    <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
-      <version>LATEST</version>
-    </dependency>
-
-Please follow this process for testing changes:
-
-* Make changes to the Protocol Buffer messages in this directory.
-* Use `protoc` to generate the new code, and compile it into a new Java 
library.
-* Install that Java library into your local Maven repository.
-* Update SDK's `pom.xml` to pick up the newly installed library, instead of
-downloading it from Maven Central.
-
-Once the changes are ready for submission, please separate them into two
-commits. The first commit should update the Protocol Buffer messages only. 
After
-that, we need to update the generated artifact on Maven Central. Finally,
-changes that make use of the Protocol Buffer changes may be committed.

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto 
b/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
deleted file mode 100644
index b1abe46..0000000
--- a/sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffer messages used for testing Proto2Coder implementation.
- */
-
-syntax = "proto2";
-
-package proto2_coder_test_messages;
-
-option java_package = "org.apache.beam.sdk.coders";
-
-message MessageA {
-  optional string field1 = 1;
-  repeated MessageB field2 = 2;
-}
-
-message MessageB {
-  optional bool field1 = 1;
-}
-
-message MessageC {
-  extensions 100 to 105;
-}
-
-extend MessageC {
-  optional MessageA field1 = 101;
-  optional MessageB field2 = 102;
-}
-
-message MessageWithMap {
-  map<string, MessageA> field1 = 1;
-}
-
-message ReferencesMessageWithMap {
-  repeated MessageWithMap field1 = 1;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 10e011f..616e88e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -22,10 +22,8 @@ import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
 
 import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.Duration;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -38,7 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -87,17 +84,6 @@ public class CoderRegistryTest {
   }
 
   @Test
-  public void testProtoCoderFallbackCoderProvider() throws Exception {
-    CoderRegistry registry = CoderRegistry.createDefault();
-
-    // MessageA is a Protocol Buffers test message with syntax 2
-    assertEquals(registry.getDefaultCoder(MessageA.class), 
ProtoCoder.of(MessageA.class));
-
-    // Duration is a Protocol Buffers default type with syntax 3
-    assertEquals(registry.getDefaultCoder(Duration.class), 
ProtoCoder.of(Duration.class));
-  }
-
-  @Test
   public void testAvroFallbackCoderProvider() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
     registry.setFallbackCoderProvider(AvroCoder.PROVIDER);

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
deleted file mode 100644
index 8b889da..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtoCoderTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtoCoder}.
- */
-@RunWith(JUnit4.class)
-public class ProtoCoderTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testFactoryMethodAgreement() throws Exception {
-    assertEquals(ProtoCoder.of(new TypeDescriptor<MessageA>() {}), 
ProtoCoder.of(MessageA.class));
-
-    assertEquals(
-        ProtoCoder.of(new TypeDescriptor<MessageA>() {}),
-        ProtoCoder.coderProvider().getCoder(new TypeDescriptor<MessageA>() 
{}));
-  }
-
-  @Test
-  public void testProviderCannotProvideCoder() throws Exception {
-    thrown.expect(CannotProvideCoderException.class);
-    thrown.expectMessage("java.lang.Integer is not a subclass of 
com.google.protobuf.Message");
-
-    ProtoCoder.coderProvider().getCoder(new TypeDescriptor<Integer>() {});
-  }
-
-  @Test
-  public void testCoderEncodeDecodeEqual() throws Exception {
-    MessageA value =
-        MessageA.newBuilder()
-            .setField1("hello")
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(ProtoCoder.of(MessageA.class), 
value);
-  }
-
-  @Test
-  public void testCoderEncodeDecodeEqualNestedContext() throws Exception {
-    MessageA value1 =
-        MessageA.newBuilder()
-            .setField1("hello")
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .build();
-    MessageA value2 =
-        MessageA.newBuilder()
-            .setField1("world")
-            .addField2(MessageB.newBuilder().setField1(false).build())
-            .addField2(MessageB.newBuilder().setField1(true).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(
-        ListCoder.of(ProtoCoder.of(MessageA.class)), ImmutableList.of(value1, 
value2));
-  }
-
-  @Test
-  public void testCoderEncodeDecodeExtensionsEqual() throws Exception {
-    MessageC value =
-        MessageC.newBuilder()
-            .setExtension(
-                Proto2CoderTestMessages.field1,
-                MessageA.newBuilder()
-                    .setField1("hello")
-                    .addField2(MessageB.newBuilder().setField1(true).build())
-                    .build())
-            .setExtension(
-                Proto2CoderTestMessages.field2, 
MessageB.newBuilder().setField1(false).build())
-            .build();
-    CoderProperties.coderDecodeEncodeEqual(
-        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class),
 value);
-  }
-
-  @Test
-  public void testCoderSerialization() throws Exception {
-    ProtoCoder<MessageA> coder = ProtoCoder.of(MessageA.class);
-    CoderProperties.coderSerializable(coder);
-  }
-
-  @Test
-  public void testCoderExtensionsSerialization() throws Exception {
-    ProtoCoder<MessageC> coder =
-        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
-    CoderProperties.coderSerializable(coder);
-  }
-
-  @Test
-  public void testEncodingId() throws Exception {
-    Coder<MessageA> coderA = ProtoCoder.of(MessageA.class);
-    CoderProperties.coderHasEncodingId(coderA, MessageA.class.getName() + 
"[]");
-
-    ProtoCoder<MessageC> coder =
-        
ProtoCoder.of(MessageC.class).withExtensionsFrom(Proto2CoderTestMessages.class);
-    CoderProperties.coderHasEncodingId(
-        coder,
-        String.format("%s[%s]", MessageC.class.getName(), 
Proto2CoderTestMessages.class.getName()));
-  }
-
-  @Test
-  public void encodeNullThrowsCoderException() throws Exception {
-    thrown.expect(CoderException.class);
-    thrown.expectMessage("cannot encode a null MessageA");
-
-    CoderUtils.encodeToBase64(ProtoCoder.of(MessageA.class), null);
-  }
-
-  @Test
-  public void testDeterministicCoder() throws NonDeterministicException {
-    Coder<MessageA> coder = ProtoCoder.of(MessageA.class);
-    coder.verifyDeterministic();
-  }
-
-  @Test
-  public void testNonDeterministicCoder() throws NonDeterministicException {
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(MessageWithMap.class.getName() + " transitively 
includes Map field");
-
-    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
-    coder.verifyDeterministic();
-  }
-
-  @Test
-  public void testNonDeterministicProperty() throws CoderException {
-    MessageWithMap.Builder msg1B = MessageWithMap.newBuilder();
-    MessageWithMap.Builder msg2B = MessageWithMap.newBuilder();
-
-    // Built in reverse order but with equal contents.
-    for (int i = 0; i < 10; ++i) {
-      msg1B.getMutableField1().put("key" + i, MessageA.getDefaultInstance());
-      msg2B.getMutableField1().put("key" + (9 - i), 
MessageA.getDefaultInstance());
-    }
-
-    // Assert the messages are equal.
-    MessageWithMap msg1 = msg1B.build();
-    MessageWithMap msg2 = msg2B.build();
-    assertEquals(msg2, msg1);
-
-    // Assert the encoded messages are not equal.
-    Coder<MessageWithMap> coder = ProtoCoder.of(MessageWithMap.class);
-    assertNotEquals(CoderUtils.encodeToBase64(coder, msg2), 
CoderUtils.encodeToBase64(coder, msg1));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
deleted file mode 100644
index 1408048..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders.protobuf;
-
-import static 
org.apache.beam.sdk.coders.protobuf.ProtobufUtil.checkProto2Syntax;
-import static 
org.apache.beam.sdk.coders.protobuf.ProtobufUtil.getRecursiveDescriptorsForClass;
-import static 
org.apache.beam.sdk.coders.protobuf.ProtobufUtil.verifyDeterministic;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
-import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageC;
-import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageWithMap;
-import 
com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.ReferencesMessageWithMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.protobuf.Any;
-import com.google.protobuf.Descriptors.GenericDescriptor;
-import com.google.protobuf.Duration;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ProtobufUtil}.
- */
-@RunWith(JUnit4.class)
-public class ProtobufUtilTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  private static final Set<String> MESSAGE_A_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageA");
-
-  private static final Set<String> MESSAGE_B_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageB");
-
-  private static final Set<String> MESSAGE_C_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.MessageC");
-
-  // map fields are actually represented as a nested Message in generated Java 
code.
-  private static final Set<String> WITH_MAP_ONLY =
-      ImmutableSet.of(
-          "proto2_coder_test_messages.MessageWithMap",
-          "proto2_coder_test_messages.MessageWithMap.Field1Entry");
-
-  private static final Set<String> REFERS_MAP_ONLY =
-      ImmutableSet.of("proto2_coder_test_messages.ReferencesMessageWithMap");
-
-  // A references A and B.
-  private static final Set<String> MESSAGE_A_ALL = Sets.union(MESSAGE_A_ONLY, 
MESSAGE_B_ONLY);
-
-  // C, only with registered extensions, references A.
-  private static final Set<String> MESSAGE_C_EXT = Sets.union(MESSAGE_C_ONLY, 
MESSAGE_A_ALL);
-
-  // MessageWithMap references A.
-  private static final Set<String> WITH_MAP_ALL = Sets.union(WITH_MAP_ONLY, 
MESSAGE_A_ALL);
-
-  // ReferencesMessageWithMap references MessageWithMap.
-  private static final Set<String> REFERS_MAP_ALL = 
Sets.union(REFERS_MAP_ONLY, WITH_MAP_ALL);
-
-  @Test
-  public void testRecursiveDescriptorsMessageA() {
-    assertThat(getRecursiveDescriptorFullNames(MessageA.class), 
equalTo(MESSAGE_A_ALL));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageB() {
-    assertThat(getRecursiveDescriptorFullNames(MessageB.class), 
equalTo(MESSAGE_B_ONLY));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageC() {
-    assertThat(getRecursiveDescriptorFullNames(MessageC.class), 
equalTo(MESSAGE_C_ONLY));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageCWithExtensions() {
-    // With extensions, Message C has a reference to Message A and Message B.
-    ExtensionRegistry registry = ExtensionRegistry.newInstance();
-    Proto2CoderTestMessages.registerAllExtensions(registry);
-    assertThat(getRecursiveDescriptorFullNames(MessageC.class, registry), 
equalTo(MESSAGE_C_EXT));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsMessageWithMap() {
-    assertThat(getRecursiveDescriptorFullNames(MessageWithMap.class), 
equalTo(WITH_MAP_ALL));
-  }
-
-  @Test
-  public void testRecursiveDescriptorsReferencesMessageWithMap() {
-    assertThat(
-        getRecursiveDescriptorFullNames(ReferencesMessageWithMap.class), 
equalTo(REFERS_MAP_ALL));
-  }
-
-  @Test
-  public void testVerifyProto2() {
-    checkProto2Syntax(MessageA.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageB.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageC.class, ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(MessageWithMap.class, 
ExtensionRegistry.getEmptyRegistry());
-    checkProto2Syntax(ReferencesMessageWithMap.class, 
ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testAnyIsNotProto2() {
-    // Any is a core Protocol Buffers type that uses proto3 syntax.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(Any.class.getCanonicalName());
-    thrown.expectMessage("in file " + Any.getDescriptor().getFile().getName());
-
-    checkProto2Syntax(Any.class, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testDurationIsNotProto2() {
-    // Duration is a core Protocol Buffers type that uses proto3 syntax.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(Duration.class.getCanonicalName());
-    thrown.expectMessage("in file " + 
Duration.getDescriptor().getFile().getName());
-
-    checkProto2Syntax(Duration.class, ExtensionRegistry.getEmptyRegistry());
-  }
-
-  @Test
-  public void testDurationIsDeterministic() throws NonDeterministicException {
-    // Duration can be encoded deterministically.
-    verifyDeterministic(ProtoCoder.of(Duration.class));
-  }
-
-  @Test
-  public void testMessageWithMapIsNotDeterministic() throws 
NonDeterministicException {
-    String mapFieldName = 
MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(MessageWithMap.class.getName());
-    thrown.expectMessage("transitively includes Map field " + mapFieldName);
-    thrown.expectMessage("file " + 
MessageWithMap.getDescriptor().getFile().getName());
-
-    verifyDeterministic(ProtoCoder.of(MessageWithMap.class));
-  }
-
-  @Test
-  public void testMessageWithTransitiveMapIsNotDeterministic() throws 
NonDeterministicException {
-    String mapFieldName = 
MessageWithMap.getDescriptor().findFieldByNumber(1).getFullName();
-    thrown.expect(NonDeterministicException.class);
-    thrown.expectMessage(ReferencesMessageWithMap.class.getName());
-    thrown.expectMessage("transitively includes Map field " + mapFieldName);
-    thrown.expectMessage("file " + 
MessageWithMap.getDescriptor().getFile().getName());
-
-    verifyDeterministic(ProtoCoder.of(ReferencesMessageWithMap.class));
-  }
-
-  
////////////////////////////////////////////////////////////////////////////////////////////
-
-  /** Helper used to test the recursive class traversal and print good error 
messages. */
-  private static Set<String> getRecursiveDescriptorFullNames(Class<? extends 
Message> clazz) {
-    return getRecursiveDescriptorFullNames(clazz, 
ExtensionRegistry.getEmptyRegistry());
-  }
-
-  /** Helper used to test the recursive class traversal and print good error 
messages. */
-  private static Set<String> getRecursiveDescriptorFullNames(
-      Class<? extends Message> clazz, ExtensionRegistry registry) {
-    Set<String> result = new HashSet<>();
-    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, 
registry)) {
-      result.add(d.getFullName());
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index dde8be5..8a48eca 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -35,6 +35,7 @@
     <module>gcp-core</module>
     <module>jackson</module>
     <module>join-library</module>
+    <module>protobuf</module>
     <module>sorter</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml 
b/sdks/java/extensions/protobuf/pom.xml
new file mode 100644
index 0000000..9a54254
--- /dev/null
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-protobuf</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Protobuf</name>
+  <description>Add support to Apache Beam for Google Protobuf.</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <configuration>
+          
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
new file mode 100644
index 0000000..99a0838
--- /dev/null
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link Coder} using Google Protocol Buffers binary format. {@link 
ProtoCoder} supports both
+ * Protocol Buffers syntax versions 2 and 3.
+ *
+ * <p>To learn more about Protocol Buffers, visit:
+ * <a 
href="https://developers.google.com/protocol-buffers";>https://developers.google.com/protocol-buffers</a>
+ *
+ * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as 
the default
+ * {@link Coder} for any {@link Message} object. Custom message extensions are 
also supported, but
+ * these extensions must be registered for a particular {@link ProtoCoder} 
instance and that
+ * instance must be registered on the {@link PCollection} that needs the 
extensions:
+ *
+ * <pre>{@code
+ * import MyProtoFile;
+ * import MyProtoFile.MyMessage;
+ *
+ * Coder<MyMessage> coder = 
ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class);
+ * PCollection<MyMessage> records =  input.apply(...).setCoder(coder);
+ * }</pre>
+ *
+ * <h3>Versioning</h3>
+ *
+ * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol 
Buffers syntax. However,
+ * the Java runtime version of the <code>google.com.protobuf</code> library 
must match exactly the
+ * version of <code>protoc</code> that was used to produce the JAR files 
containing the compiled
+ * <code>.proto</code> messages.
+ *
+ * <p>For more information, see the
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types";>Protocol
 Buffers documentation</a>.
+ *
+ * <h3>{@link ProtoCoder} and Determinism</h3>
+ *
+ * <p>In general, Protocol Buffers messages can be encoded deterministically 
within a single
+ * pipeline as long as:
+ *
+ * <ul>
+ * <li>The encoded messages (and any transitively linked messages) do not use 
<code>map</code>
+ *     fields.</li>
+ * <li>Every Java VM that encodes or decodes the messages use the same runtime 
version of the
+ *     Protocol Buffers library and the same compiled <code>.proto</code> file 
JAR.</li>
+ * </ul>
+ *
+ * <h3>{@link ProtoCoder} and Encoding Stability</h3>
+ *
+ * <p>When changing Protocol Buffers messages, follow the rules in the 
Protocol Buffers language
+ * guides for
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>{@code
 proto2}</a>
+ * and
+ * <a 
href="https://developers.google.com/protocol-buffers/docs/proto3#updating";>{@code
 proto3}</a>
+ * syntaxes, depending on your message type. Following these guidelines will 
ensure that the
+ * old encoded data can be read by new versions of the code.
+ *
+ * <p>Generally, any change to the message type, registered extensions, 
runtime library, or
+ * compiled proto JARs may change the encoding. Thus even if both the original 
and updated messages
+ * can be encoded deterministically within a single job, these deterministic 
encodings may not be
+ * the same across jobs.
+ *
+ * @param <T> the Protocol Buffers {@link Message} handled by this {@link 
Coder}.
+ */
+public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+
+  /**
+   * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
+   * {@link ExtensionRegistry}.
+   */
+  public static CoderProvider coderProvider() {
+    return PROVIDER;
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link 
Message}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(Class<T> 
protoMessageClass) {
+    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} 
indicated by the given
+   * {@link TypeDescriptor}.
+   */
+  public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> 
protoMessageType) {
+    @SuppressWarnings("unchecked")
+    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
+    return of(protoMessageClass);
+  }
+
+  /**
+   * Returns a {@link ProtoCoder} like this one, but with the extensions from 
the given classes
+   * registered.
+   *
+   * <p>Each of the extension host classes must be an class automatically 
generated by the
+   * Protocol Buffers compiler, {@code protoc}, that contains messages.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> 
moreExtensionHosts) {
+    for (Class<?> extensionHost : moreExtensionHosts) {
+      // Attempt to access the required method, to make sure it's present.
+      try {
+        Method registerAllExtensions =
+            extensionHost.getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class);
+        checkArgument(
+            Modifier.isStatic(registerAllExtensions.getModifiers()),
+            "Method registerAllExtensions() must be static");
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new IllegalArgumentException(
+            String.format("Unable to register extensions for %s", 
extensionHost.getCanonicalName()),
+            e);
+      }
+    }
+
+    return new ProtoCoder<>(
+        protoMessageClass,
+        new ImmutableSet.Builder<Class<?>>()
+            .addAll(extensionHostClasses)
+            .addAll(moreExtensionHosts)
+            .build());
+  }
+
+  /**
+   * See {@link #withExtensionsFrom(Iterable)}.
+   *
+   * <p>Does not modify this object.
+   */
+  public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) {
+    return withExtensionsFrom(Arrays.asList(moreExtensionHosts));
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null " + 
protoMessageClass.getSimpleName());
+    }
+    if (context.isWholeStream) {
+      value.writeTo(outStream);
+    } else {
+      value.writeDelimitedTo(outStream);
+    }
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws IOException {
+    if (context.isWholeStream) {
+      return getParser().parseFrom(inStream, getExtensionRegistry());
+    } else {
+      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (!(other instanceof ProtoCoder)) {
+      return false;
+    }
+    ProtoCoder<?> otherCoder = (ProtoCoder<?>) other;
+    return protoMessageClass.equals(otherCoder.protoMessageClass)
+        && Sets.newHashSet(extensionHostClasses)
+            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(protoMessageClass, extensionHostClasses);
+  }
+
+  /**
+   * The encoding identifier is designed to support evolution as per the 
design of Protocol
+   * Buffers. In order to use this class effectively, carefully follow the 
advice in the Protocol
+   * Buffers documentation at
+   * <a 
href="https://developers.google.com/protocol-buffers/docs/proto#updating";>Updating
+   * A Message Type</a>.
+   *
+   * <p>In particular, the encoding identifier is guaranteed to be the same 
for {@link ProtoCoder}
+   * instances of the same principal message class, with the same registered 
extension host classes,
+   * and otherwise distinct. Note that the encoding ID does not encode any 
version of the message
+   * or extensions, nor does it include the message schema.
+   *
+   * <p>When modifying a message class, here are the broadest guidelines; see 
the above link
+   * for greater detail.
+   *
+   * <ul>
+   * <li>Do not change the numeric tags for any fields.
+   * <li>Never remove a <code>required</code> field.
+   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with 
sensible defaults.
+   * <li>When changing the type of a field, consult the Protocol Buffers 
documentation to ensure
+   * the new and old types are interchangeable.
+   * </ul>
+   *
+   * <p>Code consuming this message class should be prepared to support 
<i>all</i> versions of
+   * the class until it is certain that no remaining serialized instances 
exist.
+   *
+   * <p>If backwards incompatible changes must be made, the best recourse is 
to change the name
+   * of your Protocol Buffers message class.
+   */
+  @Override
+  public String getEncodingId() {
+    return protoMessageClass.getName() + 
getSortedExtensionClasses().toString();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    ProtobufUtil.verifyDeterministic(this);
+  }
+
+  /**
+   * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} 
supports.
+   */
+  public Class<T> getMessageType() {
+    return protoMessageClass;
+  }
+
+  /**
+   * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers 
extension messages
+   * to {@code T} registered with this {@link ProtoCoder}.
+   */
+  public ExtensionRegistry getExtensionRegistry() {
+    if (memoizedExtensionRegistry == null) {
+      ExtensionRegistry registry = ExtensionRegistry.newInstance();
+      for (Class<?> extensionHost : extensionHostClasses) {
+        try {
+          extensionHost
+              .getDeclaredMethod("registerAllExtensions", 
ExtensionRegistry.class)
+              .invoke(null, registry);
+        } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      memoizedExtensionRegistry = registry.getUnmodifiable();
+    }
+    return memoizedExtensionRegistry;
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////
+  // Private implementation details below.
+
+  /** The {@link Message} type to be coded. */
+  private final Class<T> protoMessageClass;
+
+  /**
+   * All extension host classes included in this {@link ProtoCoder}. The 
extensions from these
+   * classes will be included in the {@link ExtensionRegistry} used during 
encoding and decoding.
+   */
+  private final Set<Class<?>> extensionHostClasses;
+
+  // Constants used to serialize and deserialize
+  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
+  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
+
+  // Transient fields that are lazy initialized and then memoized.
+  private transient ExtensionRegistry memoizedExtensionRegistry;
+  private transient Parser<T> memoizedParser;
+
+  /** Private constructor. */
+  private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> 
extensionHostClasses) {
+    this.protoMessageClass = protoMessageClass;
+    this.extensionHostClasses = extensionHostClasses;
+  }
+
+  /**
+   * @deprecated For JSON deserialization only.
+   */
+  @JsonCreator
+  @Deprecated
+  public static <T extends Message> ProtoCoder<T> of(
+      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
+      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> 
extensionHostClassNames) {
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<T> protoMessageClass = (Class<T>) 
Class.forName(protoMessageClassName);
+      List<Class<?>> extensionHostClasses = Lists.newArrayList();
+      if (extensionHostClassNames != null) {
+        for (String extensionHostClassName : extensionHostClassNames) {
+          extensionHostClasses.add(Class.forName(extensionHostClassName));
+        }
+      }
+      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  @Override
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
+    Structs.addString(result, PROTO_MESSAGE_CLASS, 
protoMessageClass.getName());
+    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
+    for (String className : getSortedExtensionClasses()) {
+      extensionHostClassNames.add(CloudObject.forString(className));
+    }
+    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
+    return result;
+  }
+
+  /** Get the memoized {@link Parser}, possibly initializing it lazily. */
+  private Parser<T> getParser() {
+    if (memoizedParser == null) {
+      try {
+        @SuppressWarnings("unchecked")
+        T protoMessageInstance = (T) 
protoMessageClass.getMethod("getDefaultInstance").invoke(null);
+        @SuppressWarnings("unchecked")
+        Parser<T> tParser = (Parser<T>) 
protoMessageInstance.getParserForType();
+        memoizedParser = tParser;
+      } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+    return memoizedParser;
+  }
+
+  static final TypeDescriptor<Message> CHECK = new TypeDescriptor<Message>() 
{};
+
+  /**
+   * The implementation of the {@link CoderProvider} for this {@link 
ProtoCoder} returned by
+   * {@link #coderProvider()}.
+   */
+  private static final CoderProvider PROVIDER =
+      new CoderProvider() {
+        @Override
+        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException {
+          if (!type.isSubtypeOf(CHECK)) {
+            throw new CannotProvideCoderException(
+                String.format(
+                    "Cannot provide %s because %s is not a subclass of %s",
+                    ProtoCoder.class.getSimpleName(),
+                    type,
+                    Message.class.getName()));
+          }
+
+          @SuppressWarnings("unchecked")
+          TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? 
extends Message>) type;
+          try {
+            @SuppressWarnings("unchecked")
+            Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType);
+            return coder;
+          } catch (IllegalArgumentException e) {
+            throw new CannotProvideCoderException(e);
+          }
+        }
+      };
+
+  private SortedSet<String> getSortedExtensionClasses() {
+    SortedSet<String> ret = new TreeSet<>();
+    for (Class<?> clazz : extensionHostClasses) {
+      ret.add(clazz.getName());
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
new file mode 100644
index 0000000..68a775a
--- /dev/null
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtobufUtil.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import com.google.protobuf.Descriptors.GenericDescriptor;
+import com.google.protobuf.ExtensionRegistry;
+import com.google.protobuf.ExtensionRegistry.ExtensionInfo;
+import com.google.protobuf.Message;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+
+/**
+ * Utility functions for reflecting and analyzing Protocol Buffers classes.
+ *
+ * <p>Used by {@link ProtoCoder}, but in a separate file for testing and 
isolation.
+ */
+class ProtobufUtil {
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message}.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Descriptor getDescriptorForClass(Class<? extends Message> clazz) {
+    try {
+      return (Descriptor) clazz.getMethod("getDescriptor").invoke(null);
+    } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Returns the {@link Descriptor} for the given Protocol Buffers {@link 
Message} as well as
+   * every class it can include transitively.
+   *
+   * @throws IllegalArgumentException if there is an error in Java reflection.
+   */
+  static Set<Descriptor> getRecursiveDescriptorsForClass(
+      Class<? extends Message> clazz, ExtensionRegistry registry) {
+    Descriptor root = getDescriptorForClass(clazz);
+    Set<Descriptor> descriptors = new HashSet<>();
+    recursivelyAddDescriptors(root, descriptors, registry);
+    return descriptors;
+  }
+
+  /**
+   * Recursively walks the given {@link Message} class and verifies that every 
field or message
+   * linked in uses the Protocol Buffers proto2 syntax.
+   */
+  static void checkProto2Syntax(Class<? extends Message> clazz, 
ExtensionRegistry registry) {
+    for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, 
registry)) {
+      Syntax s = d.getFile().getSyntax();
+      checkArgument(
+          s == Syntax.PROTO2,
+          "Message %s or one of its dependencies does not use proto2 syntax: 
%s in file %s",
+          clazz.getName(),
+          d.getFullName(),
+          d.getFile().getName());
+    }
+  }
+
+  /**
+   * Recursively checks whether the specified class uses any Protocol Buffers 
fields that cannot
+   * be deterministically encoded.
+   *
+   * @throws NonDeterministicException if the object cannot be encoded 
deterministically.
+   */
+  static void verifyDeterministic(ProtoCoder<?> coder) throws 
NonDeterministicException {
+    Class<? extends Message> message = coder.getMessageType();
+    ExtensionRegistry registry = coder.getExtensionRegistry();
+    Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, 
registry);
+    for (Descriptor d : descriptors) {
+      for (FieldDescriptor fd : d.getFields()) {
+        // If there is a transitively reachable Protocol Buffers map field, 
then this object cannot
+        // be encoded deterministically.
+        if (fd.isMapField()) {
+          String reason =
+              String.format(
+                  "Protocol Buffers message %s transitively includes Map field 
%s (from file %s)."
+                      + " Maps cannot be deterministically encoded.",
+                  message.getName(),
+                  fd.getFullName(),
+                  fd.getFile().getFullName());
+          throw new NonDeterministicException(coder, reason);
+        }
+      }
+    }
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+  // Disable construction of utility class
+  private ProtobufUtil() {}
+
+  private static void recursivelyAddDescriptors(
+      Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
+    if (descriptors.contains(message)) {
+      return;
+    }
+    descriptors.add(message);
+
+    for (FieldDescriptor f : message.getFields()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (FieldDescriptor f : message.getExtensions()) {
+      recursivelyAddDescriptors(f, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        
registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) {
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+    for (ExtensionInfo info :
+        registry.getAllMutableExtensionsByExtendedType(message.getFullName())) 
{
+      recursivelyAddDescriptors(info.descriptor, descriptors, registry);
+    }
+  }
+
+  private static void recursivelyAddDescriptors(
+      FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry 
registry) {
+    switch (field.getType()) {
+      case BOOL:
+      case BYTES:
+      case DOUBLE:
+      case ENUM:
+      case FIXED32:
+      case FIXED64:
+      case FLOAT:
+      case INT32:
+      case INT64:
+      case SFIXED32:
+      case SFIXED64:
+      case SINT32:
+      case SINT64:
+      case STRING:
+      case UINT32:
+      case UINT64:
+        // Primitive types do not transitively access anything else.
+        break;
+
+      case GROUP:
+      case MESSAGE:
+        // Recursively adds all the fields from this nested Message.
+        recursivelyAddDescriptors(field.getMessageType(), descriptors, 
registry);
+        break;
+
+      default:
+        throw new UnsupportedOperationException(
+            "Unexpected Protocol Buffers field type: " + field.getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ff1fe7fa/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
new file mode 100644
index 0000000..b69bc8b
--- /dev/null
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a {@link org.apache.beam.sdk.coders.Coder}
+ * for Protocol Buffers messages, {@code ProtoCoder}.
+ *
+ * @see org.apache.beam.sdk.extensions.protobuf.ProtoCoder
+ */
+package org.apache.beam.sdk.extensions.protobuf;

Reply via email to