http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
deleted file mode 100644
index df9c0c8..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java 
serialization.
- */
-public class ByteCoder extends AtomicCoder<Byte> {
-
-  @JsonCreator
-  public static ByteCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final ByteCoder INSTANCE = new ByteCoder();
-
-  private ByteCoder() {}
-
-  @Override
-  public void encode(Byte value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Byte");
-    }
-    outStream.write(value.byteValue());
-  }
-
-  @Override
-  public Byte decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      // value will be between 0-255, -1 for EOF
-      int value = inStream.read();
-      if (value == -1) {
-        throw new EOFException("EOF encountered decoding 1 byte from input 
stream");
-      }
-      return (byte) value;
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * {@link ByteCoder} will never throw a {@link 
Coder.NonDeterministicException}; bytes can always
-   * be encoded deterministically.
-   */
-  @Override
-  public void verifyDeterministic() {}
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns 
a constant.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code 1}, the byte size of a {@link Byte} encoded using Java 
serialization.
-   */
-  @Override
-  protected long getEncodedElementByteSize(Byte value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot estimate size for unsupported null 
value");
-    }
-    return 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
deleted file mode 100644
index 557d2bd..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.io.ByteStreams;
-import com.google.protobuf.ByteString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link ByteString} objects based on their encoded 
Protocol Buffer form.
- *
- * <p>When this code is used in a nested {@link Coder.Context}, the serialized 
{@link ByteString}
- * objects are first delimited by their size.
- */
-public class ByteStringCoder extends AtomicCoder<ByteString> {
-
-  @JsonCreator
-  public static ByteStringCoder of() {
-    return INSTANCE;
-  }
-
-  /***************************/
-
-  private static final ByteStringCoder INSTANCE = new ByteStringCoder();
-
-  private ByteStringCoder() {}
-
-  @Override
-  public void encode(ByteString value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null ByteString");
-    }
-
-    if (!context.isWholeStream) {
-      // ByteString is not delimited, so write its size before its contents.
-      VarInt.encode(value.size(), outStream);
-    }
-    value.writeTo(outStream);
-  }
-
-  @Override
-  public ByteString decode(InputStream inStream, Context context) throws 
IOException {
-    if (context.isWholeStream) {
-      return ByteString.readFrom(inStream);
-    }
-
-    int size = VarInt.decodeInt(inStream);
-    // ByteString reads to the end of the input stream, so give it a limited 
stream of exactly
-    // the right length. Also set its chunk size so that the ByteString will 
contain exactly
-    // one chunk.
-    return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(ByteString value, Context context) 
throws Exception {
-    int size = value.size();
-
-    if (context.isWholeStream) {
-      return size;
-    }
-    return VarInt.getLength(size) + size;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>Returns true; the encoded output of two invocations of {@link 
ByteStringCoder} in the same
-   * {@link Coder.Context} will be identical if and only if the original 
{@link ByteString} objects
-   * are equal according to {@link Object#equals}.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>Returns true. {@link ByteString#size} returns the size of an array and 
a {@link VarInt}.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(ByteString value, Context 
context) {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
deleted file mode 100644
index 66efefa..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-/**
- * The exception thrown when a {@link CoderProvider} cannot
- * provide a {@link Coder} that has been requested.
- */
-public class CannotProvideCoderException extends Exception {
-  private final ReasonCode reason;
-
-  public CannotProvideCoderException(String message) {
-    this(message, ReasonCode.UNKNOWN);
-  }
-
-  public CannotProvideCoderException(String message, ReasonCode reason) {
-    super(message);
-    this.reason = reason;
-  }
-
-  public CannotProvideCoderException(String message, Throwable cause) {
-    this(message, cause, ReasonCode.UNKNOWN);
-  }
-
-  public CannotProvideCoderException(String message, Throwable cause, 
ReasonCode reason) {
-    super(message, cause);
-    this.reason = reason;
-  }
-
-  public CannotProvideCoderException(Throwable cause) {
-    this(cause, ReasonCode.UNKNOWN);
-  }
-
-  public CannotProvideCoderException(Throwable cause, ReasonCode reason) {
-    super(cause);
-    this.reason = reason;
-  }
-
-  /**
-   * @return the reason that Coder inference failed.
-   */
-  public ReasonCode getReason() {
-    return reason;
-  }
-
-  /**
-   * Returns the inner-most {@link CannotProvideCoderException} when they are 
deeply nested.
-   *
-   * <p>For example, if a coder for {@code List<KV<Integer, Whatsit>>} cannot 
be provided because
-   * there is no known coder for {@code Whatsit}, the root cause of the 
exception should be a
-   * CannotProvideCoderException with details pertinent to {@code Whatsit}, 
suppressing the
-   * intermediate layers.
-   */
-  public Throwable getRootCause() {
-    Throwable cause = getCause();
-    if (cause == null) {
-      return this;
-    } else if (!(cause instanceof CannotProvideCoderException)) {
-      return cause;
-    } else {
-      return ((CannotProvideCoderException) cause).getRootCause();
-    }
-  }
-
-  /**
-   * Indicates the reason that {@link Coder} inference failed.
-   */
-  public static enum ReasonCode {
-    /**
-     * The reason a coder could not be provided is unknown or does have an 
established
-     * {@link ReasonCode}.
-     */
-    UNKNOWN,
-
-    /**
-     * The reason a coder could not be provided is type erasure, for example 
when requesting
-     * coder inference for a {@code List<T>} where {@code T} is unknown.
-     */
-    TYPE_ERASURE
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
deleted file mode 100644
index ff2e10e..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Joiner;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder Coder&lt;T&gt;} defines how to encode and decode values of 
type {@code T} into
- * byte streams.
- *
- * <p>{@link Coder} instances are serialized during job creation and 
deserialized
- * before use, via JSON serialization. See {@link SerializableCoder} for an 
example of a
- * {@link Coder} that adds a custom field to
- * the {@link Coder} serialization. It provides a constructor annotated with
- * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory 
method used when
- * deserializing a {@link Coder} instance.
- *
- * <p>{@link Coder} classes for compound types are often composed from coder 
classes for types
- * contains therein. The composition of {@link Coder} instances into a coder 
for the compound
- * class is the subject of the {@link CoderFactory} type, which enables 
automatic generic
- * composition of {@link Coder} classes within the {@link CoderRegistry}. With 
particular
- * static methods on a compound {@link Coder} class, a {@link CoderFactory} 
can be automatically
- * inferred. See {@link KvCoder} for an example of a simple compound {@link 
Coder} that supports
- * automatic composition in the {@link CoderRegistry}.
- *
- * <p>The binary format of a {@link Coder} is identified by {@link 
#getEncodingId()}; be sure to
- * understand the requirements for evolving coder formats.
- *
- * <p>All methods of a {@link Coder} are required to be thread safe.
- *
- * @param <T> the type of the values being transcoded
- */
-public interface Coder<T> extends Serializable {
-  /** The context in which encoding or decoding is being done. */
-  public static class Context {
-    /**
-     * The outer context: the value being encoded or decoded takes
-     * up the remainder of the record/stream contents.
-     */
-    public static final Context OUTER = new Context(true);
-
-    /**
-     * The nested context: the value being encoded or decoded is
-     * (potentially) a part of a larger record/stream contents, and
-     * may have other parts encoded or decoded after it.
-     */
-    public static final Context NESTED = new Context(false);
-
-    /**
-     * Whether the encoded or decoded value fills the remainder of the
-     * output or input (resp.) record/stream contents.  If so, then
-     * the size of the decoded value can be determined from the
-     * remaining size of the record/stream contents, and so explicit
-     * lengths aren't required.
-     */
-    public final boolean isWholeStream;
-
-    public Context(boolean isWholeStream) {
-      this.isWholeStream = isWholeStream;
-    }
-
-    public Context nested() {
-      return NESTED;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof Context)) {
-        return false;
-      }
-      return Objects.equal(isWholeStream, ((Context) obj).isWholeStream);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(isWholeStream);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Context.class)
-          .addValue(isWholeStream ? "OUTER" : "NESTED").toString();
-    }
-  }
-
-  /**
-   * Encodes the given value of type {@code T} onto the given output stream
-   * in the given context.
-   *
-   * @throws IOException if writing to the {@code OutputStream} fails
-   * for some reason
-   * @throws CoderException if the value could not be encoded for some reason
-   */
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException;
-
-  /**
-   * Decodes a value of type {@code T} from the given input stream in
-   * the given context.  Returns the decoded value.
-   *
-   * @throws IOException if reading from the {@code InputStream} fails
-   * for some reason
-   * @throws CoderException if the value could not be decoded for some reason
-   */
-  public T decode(InputStream inStream, Context context)
-      throws CoderException, IOException;
-
-  /**
-   * If this is a {@code Coder} for a parameterized type, returns the
-   * list of {@code Coder}s being used for each of the parameters, or
-   * returns {@code null} if this cannot be done or this is not a
-   * parameterized type.
-   */
-  public List<? extends Coder<?>> getCoderArguments();
-
-  /**
-   * Returns the {@link CloudObject} that represents this {@code Coder}.
-   */
-  public CloudObject asCloudObject();
-
-  /**
-   * Throw {@link NonDeterministicException} if the coding is not 
deterministic.
-   *
-   * <p>In order for a {@code Coder} to be considered deterministic,
-   * the following must be true:
-   * <ul>
-   *   <li>two values that compare as equal (via {@code Object.equals()}
-   *       or {@code Comparable.compareTo()}, if supported) have the same
-   *       encoding.
-   *   <li>the {@code Coder} always produces a canonical encoding, which is the
-   *       same for an instance of an object even if produced on different
-   *       computers at different times.
-   * </ul>
-   *
-   * @throws Coder.NonDeterministicException if this coder is not 
deterministic.
-   */
-  public void verifyDeterministic() throws Coder.NonDeterministicException;
-
-  /**
-   * Returns {@code true} if this {@link Coder} is injective with respect to 
{@link Objects#equals}.
-   *
-   * <p>Whenever the encoded bytes of two values are equal, then the original 
values are equal
-   * according to {@code Objects.equals()}. Note that this is well-defined for 
{@code null}.
-   *
-   * <p>This condition is most notably false for arrays. More generally, this 
condition is false
-   * whenever {@code equals()} compares object identity, rather than 
performing a
-   * semantic/structural comparison.
-   */
-  public boolean consistentWithEquals();
-
-  /**
-   * Returns an object with an {@code Object.equals()} method that represents 
structural equality
-   * on the argument.
-   *
-   * <p>For any two values {@code x} and {@code y} of type {@code T}, if their 
encoded bytes are the
-   * same, then it must be the case that {@code 
structuralValue(x).equals(@code structuralValue(y)}.
-   *
-   * <p>Most notably:
-   * <ul>
-   *   <li>The structural value for an array coder should perform a structural 
comparison of the
-   *   contents of the arrays, rather than the default behavior of comparing 
according to object
-   *   identity.
-   *   <li>The structural value for a coder accepting {@code null} should be a 
proper object with
-   *   an {@code equals()} method, even if the input value is {@code null}.
-   * </ul>
-   *
-   * <p>See also {@link #consistentWithEquals()}.
-   */
-  public Object structuralValue(T value) throws Exception;
-
-  /**
-   * Returns whether {@link #registerByteSizeObserver} cheap enough to
-   * call for every element, that is, if this {@code Coder} can
-   * calculate the byte size of the element to be coded in roughly
-   * constant time (or lazily).
-   *
-   * <p>Not intended to be called by user code, but instead by
-   * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
-   * implementations.
-   */
-  public boolean isRegisterByteSizeObserverCheap(T value, Context context);
-
-  /**
-   * Notifies the {@code ElementByteSizeObserver} about the byte size
-   * of the encoded value using this {@code Coder}.
-   *
-   * <p>Not intended to be called by user code, but instead by
-   * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
-   * implementations.
-   */
-  public void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer, Context context)
-      throws Exception;
-
-  /**
-   * An identifier for the binary format written by {@link #encode}.
-   *
-   * <p>This value, along with the fully qualified class name, forms an 
identifier for the
-   * binary format of this coder. Whenever this value changes, the new 
encoding is considered
-   * incompatible with the prior format: It is presumed that the prior version 
of the coder will
-   * be unable to correctly read the new format and the new version of the 
coder will be unable to
-   * correctly read the old format.
-   *
-   * <p>If the format is changed in a backwards-compatible way (the Coder can 
still accept data from
-   * the prior format), such as by adding optional fields to a Protocol Buffer 
or Avro definition,
-   * and you want Dataflow to understand that the new coder is compatible with 
the prior coder,
-   * this value must remain unchanged. It is then the responsibility of {@link 
#decode} to correctly
-   * read data from the prior format.
-   */
-  @Experimental(Kind.CODER_ENCODING_ID)
-  public String getEncodingId();
-
-  /**
-   * A collection of encodings supported by {@link #decode} in addition to the 
encoding
-   * from {@link #getEncodingId()} (which is assumed supported).
-   *
-   * <p><i>This information is not currently used for any purpose</i>. It is 
descriptive only,
-   * and this method is subject to change.
-   *
-   * @see #getEncodingId()
-   */
-  @Experimental(Kind.CODER_ENCODING_ID)
-  public Collection<String> getAllowedEncodings();
-
-  /**
-   * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is
-   * not deterministic, including details of why the encoding is not 
deterministic.
-   */
-  public static class NonDeterministicException extends Throwable {
-    private Coder<?> coder;
-    private List<String> reasons;
-
-    public NonDeterministicException(
-        Coder<?> coder, String reason, @Nullable NonDeterministicException e) {
-      this(coder, Arrays.asList(reason), e);
-    }
-
-    public NonDeterministicException(Coder<?> coder, String reason) {
-      this(coder, Arrays.asList(reason), null);
-    }
-
-    public NonDeterministicException(Coder<?> coder, List<String> reasons) {
-      this(coder, reasons, null);
-    }
-
-    public NonDeterministicException(
-        Coder<?> coder,
-        List<String> reasons,
-        @Nullable NonDeterministicException cause) {
-      super(cause);
-      Preconditions.checkArgument(reasons.size() > 0,
-          "Reasons must not be empty.");
-      this.reasons = reasons;
-      this.coder = coder;
-    }
-
-    public Iterable<String> getReasons() {
-      return reasons;
-    }
-
-    @Override
-    public String getMessage() {
-      return String.format("%s is not deterministic because:\n  %s",
-          coder, Joiner.on("\n  ").join(reasons));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
deleted file mode 100644
index f7bbb69..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import java.io.IOException;
-
-/**
- * An {@link Exception} thrown if there is a problem encoding or decoding a 
value.
- */
-public class CoderException extends IOException {
-  public CoderException(String message) {
-    super(message);
-  }
-
-  public CoderException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public CoderException(Throwable cause) {
-    super(cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
deleted file mode 100644
index 7f788c7..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Static utility methods for creating and working with {@link Coder}s.
- */
-public final class CoderFactories {
-  private CoderFactories() { } // Static utility class
-
-  /**
-   * Creates a {@link CoderFactory} built from particular static methods of a 
class that
-   * implements {@link Coder}.
-   *
-   * <p>The class must have the following static methods:
-   *
-   * <ul>
-   * <li> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * }
-   * <li> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * }
-   * </ul>
-   *
-   * <p>The {@code of(...)} method will be used to construct a
-   * {@code Coder<T>} from component {@link Coder}s.
-   * It must accept one {@link Coder} argument for each
-   * generic type parameter of {@code T}. If {@code T} takes no generic
-   * type parameters, then the {@code of()} factory method should take
-   * no arguments.
-   *
-   * <p>The {@code getInstanceComponents} method will be used to
-   * decompose a value during the {@link Coder} inference process,
-   * to automatically choose coders for the components.
-   *
-   * <p>Note that the class {@code T} to be coded may be a
-   * not-yet-specialized generic class.
-   * For a generic class {@code MyClass<X>} and an actual type parameter
-   * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will
-   * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}.
-   *
-   * <p>For example, the {@link CoderFactory} returned by
-   * {@code fromStaticMethods(ListCoder.class)}
-   * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}.
-   */
-  public static <T> CoderFactory fromStaticMethods(Class<T> clazz) {
-    return new CoderFactoryFromStaticMethods(clazz);
-  }
-
-  /**
-   * Creates a {@link CoderFactory} that always returns the
-   * given coder.
-   *
-   * <p>The {@code getInstanceComponents} method of this
-   * {@link CoderFactory} always returns an empty list.
-   */
-  public static <T> CoderFactory forCoder(Coder<T> coder) {
-    return new CoderFactoryForCoder<>(coder);
-  }
-
-  /**
-   * See {@link #fromStaticMethods} for a detailed description
-   * of the characteristics of this {@link CoderFactory}.
-   */
-  private static class CoderFactoryFromStaticMethods implements CoderFactory {
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      try {
-        return (Coder) factoryMethod.invoke(
-            null /* static */, componentCoders.toArray());
-      } catch (IllegalAccessException |
-               IllegalArgumentException |
-               InvocationTargetException |
-               NullPointerException |
-               ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder factory method " + factoryMethod,
-            exn);
-      }
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      try {
-        @SuppressWarnings("unchecked")
-        List<Object> components =  (List<Object>) getComponentsMethod.invoke(
-            null /* static */, value);
-        return components;
-      } catch (IllegalAccessException
-          | IllegalArgumentException
-          | InvocationTargetException
-          | NullPointerException
-          | ExceptionInInitializerError exn) {
-        throw new IllegalStateException(
-            "error when invoking Coder getComponents method " + 
getComponentsMethod,
-            exn);
-      }
-    }
-
-    
////////////////////////////////////////////////////////////////////////////////
-
-    // Method to create a coder given component coders
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T>
-    private Method factoryMethod;
-
-    // Method to decompose a value of type T into its parts.
-    // For a Coder class of kind * -> * -> ... n times ... -> *
-    // this has type T -> List<Object>
-    // where the list has n elements.
-    private Method getComponentsMethod;
-
-    /**
-     * Returns a CoderFactory that invokes the given static factory method
-     * to create the Coder.
-     */
-    private CoderFactoryFromStaticMethods(Class<?> coderClazz) {
-      this.factoryMethod = getFactoryMethod(coderClazz);
-      this.getComponentsMethod = getInstanceComponentsMethod(coderClazz);
-    }
-
-    /**
-     * Returns the static {@code of} constructor method on {@code coderClazz}
-     * if it exists. It is assumed to have one {@link Coder} parameter for
-     * each type parameter of {@code coderClazz}.
-     */
-    private Method getFactoryMethod(Class<?> coderClazz) {
-      Method factoryMethodCandidate;
-
-      // Find the static factory method of coderClazz named 'of' with
-      // the appropriate number of type parameters.
-      int numTypeParameters = coderClazz.getTypeParameters().length;
-      Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters];
-      Arrays.fill(factoryMethodArgTypes, Coder.class);
-      try {
-        factoryMethodCandidate =
-            coderClazz.getDeclaredMethod("of", factoryMethodArgTypes);
-      } catch (NoSuchMethodException | SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "does not have an accessible method named 'of' with "
-            + numTypeParameters + " arguments of Coder type",
-            exn);
-      }
-      if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not static");
-      }
-      if 
(!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type does not return a " + coderClazz);
-      }
-      try {
-        if (!factoryMethodCandidate.isAccessible()) {
-          factoryMethodCandidate.setAccessible(true);
-        }
-      } catch (SecurityException exn) {
-        throw new IllegalArgumentException(
-            "cannot register Coder " + coderClazz + ": "
-            + "method named 'of' with " + numTypeParameters
-            + " arguments of Coder type is not accessible",
-            exn);
-      }
-
-      return factoryMethodCandidate;
-    }
-
-    /**
-     * Finds the static method on {@code coderType} to use
-     * to decompose a value of type {@code T} into components,
-     * each corresponding to an argument of the {@code of}
-     * method.
-     */
-    private <T> Method getInstanceComponentsMethod(Class<?> coderClazz) {
-      TypeDescriptor<?> coderType = TypeDescriptor.of(coderClazz);
-      TypeDescriptor<T> argumentType = getCodedType(coderType);
-
-      // getInstanceComponents may be implemented in a superclass,
-      // so we search them all for an applicable method. We do not
-      // try to be clever about finding the best overload. It may
-      // be in a generic superclass, erased to accept an Object.
-      // However, subtypes are listed before supertypes (it is a
-      // topological ordering) so probably the best one will be chosen
-      // if there are more than one (which should be rare)
-      for (TypeDescriptor<?> supertype : coderType.getClasses()) {
-        for (Method method : supertype.getRawType().getDeclaredMethods()) {
-          if (method.getName().equals("getInstanceComponents")) {
-            TypeDescriptor<?> formalArgumentType = 
supertype.getArgumentTypes(method).get(0);
-            if 
(formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) {
-              return method;
-            }
-          }
-        }
-      }
-
-      throw new IllegalArgumentException(
-          "cannot create a CoderFactory from " + coderType + ": "
-          + "does not have an accessible method "
-          + "'getInstanceComponents'");
-    }
-
-    /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific
-     * type {@code T}, returns {@code T.class}. Otherwise, raises 
IllegalArgumentException.
-     */
-    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<?> coderType) {
-      for (TypeDescriptor<?> ifaceType : coderType.getInterfaces()) {
-        if (ifaceType.getRawType().equals(Coder.class)) {
-          ParameterizedType coderIface = (ParameterizedType) 
ifaceType.getType();
-          @SuppressWarnings("unchecked")
-          TypeDescriptor<T> token =
-              (TypeDescriptor<T>) 
TypeDescriptor.of(coderIface.getActualTypeArguments()[0]);
-          return token;
-        }
-      }
-      throw new IllegalArgumentException(
-          "cannot build CoderFactory from class " + coderType
-          + ": does not implement Coder<T> for any T.");
-    }
-  }
-
-  /**
-   * See {@link #forCoder} for a detailed description of this
-   * {@link CoderFactory}.
-   */
-  private static class CoderFactoryForCoder<T> implements CoderFactory {
-    private Coder<T> coder;
-
-    public CoderFactoryForCoder(Coder<T> coder) {
-      this.coder = coder;
-    }
-
-    @Override
-    public Coder<?> create(List<? extends Coder<?>> componentCoders) {
-      return this.coder;
-    }
-
-    @Override
-    public List<Object> getInstanceComponents(Object value) {
-      return Collections.emptyList();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
deleted file mode 100644
index 775bfc6..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import java.util.List;
-
-/**
- * A {@link CoderFactory} creates coders and decomposes values.
- * It may operate on a parameterized type, such as {@link List},
- * in which case the {@link #create} method accepts a list of
- * coders to use for the type parameters.
- */
-public interface CoderFactory {
-
-  /**
-   * Returns a {@code Coder<?>}, given argument coder to use for
-   * values of a particular type, given the Coders for each of
-   * the type's generic parameter types.
-   */
-  public Coder<?> create(List<? extends Coder<?>> componentCoders);
-
-  /**
-   * Returns a list of objects contained in {@code value}, one per
-   * type argument, or {@code null} if none can be determined.
-   * The list of returned objects should be the same size as the
-   * list of coders required by {@link #create}.
-   */
-  public List<Object> getInstanceComponents(Object value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
deleted file mode 100644
index f8e29e6..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-/**
- * A {@link CoderProvider} may create a {@link Coder} for
- * any concrete class.
- */
-public interface CoderProvider {
-
-  /**
-   * Provides a coder for a given class, if possible.
-   *
-   * @throws CannotProvideCoderException if no coder can be provided
-   */
-  public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
deleted file mode 100644
index 64d258d..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
-
-/**
- * Static utility methods for working with {@link CoderProvider 
CoderProviders}.
- */
-public final class CoderProviders {
-
-  // Static utility class
-  private CoderProviders() { }
-
-  /**
-   * Creates a {@link CoderProvider} built from particular static methods of a 
class that
-   * implements {@link Coder}. The requirements for this method are precisely 
the requirements
-   * for a {@link Coder} class to be usable with {@link DefaultCoder} 
annotations.
-   *
-   * <p>The class must have the following static method:
-   *
-   * <pre>{@code
-   * public static Coder<T> of(TypeDescriptor<T> type)
-   * }
-   * </pre>
-   */
-  public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
-    return new CoderProviderFromStaticMethods(clazz);
-  }
-
-
-  /**
-   * Returns a {@link CoderProvider} that consults each of the provider {@code 
coderProviders}
-   * and returns the first {@link Coder} provided.
-   *
-   * <p>Note that the order in which the providers are listed matters: While 
the set of types
-   * handled will be the union of those handled by all of the providers in the 
list, the actual
-   * {@link Coder} provided by the first successful provider may differ, and 
may have inferior
-   * properties. For example, not all {@link Coder Coders} are deterministic, 
handle {@code null}
-   * values, or have comparable performance.
-   */
-  public static CoderProvider firstOf(CoderProvider... coderProviders) {
-    return new FirstOf(ImmutableList.copyOf(coderProviders));
-  }
-
-  
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * @see #firstOf
-   */
-  private static class FirstOf implements CoderProvider {
-
-    private Iterable<CoderProvider> providers;
-
-    public FirstOf(Iterable<CoderProvider> providers) {
-      this.providers = providers;
-    }
-
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException {
-      List<String> messages = Lists.newArrayList();
-      for (CoderProvider provider : providers) {
-        try {
-          return provider.getCoder(type);
-        } catch (CannotProvideCoderException exc) {
-          messages.add(String.format("%s could not provide a Coder for type 
%s: %s",
-              provider, type, exc.getMessage()));
-        }
-      }
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for type %s: %s.",
-              type, Joiner.on("; ").join(messages)));
-    }
-  }
-
-  private static class CoderProviderFromStaticMethods implements CoderProvider 
{
-
-    /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code 
of(Class)}. */
-    private final boolean takesTypeDescriptor;
-    private final Class<?> clazz;
-
-    public CoderProviderFromStaticMethods(Class<?> clazz) {
-      // Note that the second condition supports older classes, which only 
needed to provide
-      // of(Class), not of(TypeDescriptor). Our own classes have updated to 
accept a
-      // TypeDescriptor. Hence the error message points only to the current 
specification,
-      // not both acceptable conditions.
-      checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
-          "Class " + clazz.getCanonicalName()
-          + " is missing required static method of(TypeDescriptor).");
-
-      this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
-      this.clazz = clazz;
-    }
-
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws 
CannotProvideCoderException {
-      try {
-        if (takesTypeDescriptor) {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(TypeDescriptor.class, type)
-              .build();
-          return result;
-        } else {
-          @SuppressWarnings("unchecked")
-          Coder<T> result = InstanceBuilder.ofType(Coder.class)
-              .fromClass(clazz)
-              .fromFactoryMethod("of")
-              .withArg(Class.class, type.getRawType())
-              .build();
-          return result;
-        }
-      } catch (RuntimeException exc) {
-        if (exc.getCause() instanceof InvocationTargetException) {
-          throw new CannotProvideCoderException(exc.getCause().getCause());
-        }
-        throw exc;
-      }
-    }
-
-    private boolean classTakesTypeDescriptor(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", TypeDescriptor.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
-      }
-    }
-
-    private boolean classTakesClass(Class<?> clazz) {
-      try {
-        clazz.getDeclaredMethod("of", Class.class);
-        return true;
-      } catch (NoSuchMethodException | SecurityException exc) {
-        return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
deleted file mode 100644
index da9e0a0..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.bigquery.model.TableRow;
-import 
com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException.ReasonCode;
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.lang.reflect.WildcardType;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link CoderRegistry} allows registering the default {@link Coder} to use 
for a Java class,
- * and looking up and instantiating the default {@link Coder} for a Java type.
- *
- * <p>{@link CoderRegistry} uses the following mechanisms to determine a 
default {@link Coder} for a
- * Java class, in order of precedence:
- * <ol>
- *   <li>Registration:
- *     <ul>
- *       <li>A {@link CoderFactory} can be registered to handle a particular 
class via
- *           {@link #registerCoder(Class, CoderFactory)}.</li>
- *       <li>A {@link Coder} class with the static methods to satisfy
- *           {@link CoderFactories#fromStaticMethods} can be registered via
- *           {@link #registerCoder(Class, Class)}.</li>
- *       <li>Built-in types are registered via
- *           {@link #registerStandardCoders()}.</li>
- *     </ul>
- *   <li>Annotations: {@link DefaultCoder} can be used to annotate a type with
- *       the default {@code Coder} type. The {@link Coder} class must satisfy 
the requirements
- *       of {@link CoderProviders#fromStaticMethods}.
- *   <li>Fallback: A fallback {@link CoderProvider} is used to attempt to 
provide a {@link Coder}
- *       for any type. By default, this is {@link SerializableCoder#PROVIDER}, 
which can provide
- *       a {@link Coder} for any type that is serializable via Java 
serialization. The fallback
- *       {@link CoderProvider} can be get and set via {@link 
#getFallbackCoderProvider()}
- *       and {@link #setFallbackCoderProvider}. Multiple fallbacks can be 
chained together using
- *       {@link CoderProviders#firstOf}.
- * </ol>
- */
-public class CoderRegistry implements CoderProvider {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(CoderRegistry.class);
-
-  public CoderRegistry() {
-    setFallbackCoderProvider(
-        CoderProviders.firstOf(ProtoCoder.coderProvider(), 
SerializableCoder.PROVIDER));
-  }
-
-  /**
-   * Registers standard Coders with this CoderRegistry.
-   */
-  public void registerStandardCoders() {
-    registerCoder(Byte.class, ByteCoder.class);
-    registerCoder(ByteString.class, ByteStringCoder.class);
-    registerCoder(Double.class, DoubleCoder.class);
-    registerCoder(Instant.class, InstantCoder.class);
-    registerCoder(Integer.class, VarIntCoder.class);
-    registerCoder(Iterable.class, IterableCoder.class);
-    registerCoder(KV.class, KvCoder.class);
-    registerCoder(List.class, ListCoder.class);
-    registerCoder(Long.class, VarLongCoder.class);
-    registerCoder(Map.class, MapCoder.class);
-    registerCoder(Set.class, SetCoder.class);
-    registerCoder(String.class, StringUtf8Coder.class);
-    registerCoder(TableRow.class, TableRowJsonCoder.class);
-    registerCoder(TimestampedValue.class, 
TimestampedValue.TimestampedValueCoder.class);
-    registerCoder(Void.class, VoidCoder.class);
-    registerCoder(byte[].class, ByteArrayCoder.class);
-  }
-
-  /**
-   * Registers {@code coderClazz} as the default {@link Coder} class to handle 
encoding and
-   * decoding instances of {@code clazz}, overriding prior registrations if 
any exist.
-   *
-   * <p>Supposing {@code T} is the static type corresponding to the {@code 
clazz}, then
-   * {@code coderClazz} should have a static factory method with the following 
signature:
-   *
-   * <pre> {@code
-   * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...)
-   * } </pre>
-   *
-   * <p>This method will be called to create instances of {@code Coder<T>} for 
values of type
-   * {@code T}, passing Coders for each of the generic type parameters of 
{@code T}.  If {@code T}
-   * takes no generic type parameters, then the {@code of()} factory method 
should have no
-   * arguments.
-   *
-   * <p>If {@code T} is a parameterized type, then it should additionally have 
a method with the
-   * following signature:
-   *
-   * <pre> {@code
-   * public static List<Object> getInstanceComponents(T exampleValue);
-   * } </pre>
-   *
-   * <p>This method will be called to decompose a value during the {@link 
Coder} inference process,
-   * to automatically choose {@link Coder Coders} for the components.
-   *
-   * @param clazz the class of objects to be encoded
-   * @param coderClazz a class with static factory methods to provide {@link 
Coder Coders}
-   */
-  public void registerCoder(Class<?> clazz, Class<?> coderClazz) {
-    registerCoder(clazz, CoderFactories.fromStaticMethods(coderClazz));
-  }
-
-  /**
-   * Registers {@code coderFactory} as the default {@link CoderFactory} to 
produce {@code Coder}
-   * instances to decode and encode instances of {@code clazz}. This will 
override prior
-   * registrations if any exist.
-   */
-  public void registerCoder(Class<?> clazz, CoderFactory coderFactory) {
-    coderFactoryMap.put(clazz, coderFactory);
-  }
-
-  /**
-   * Register the provided {@link Coder} for encoding all values of the 
specified {@code Class}.
-   * This will override prior registrations if any exist.
-   *
-   * <p>Not for use with generic rawtypes. Instead, register a {@link 
CoderFactory} via
-   * {@link #registerCoder(Class, CoderFactory)} or ensure your {@code Coder} 
class has the
-   * appropriate static methods and register it directly via {@link 
#registerCoder(Class, Class)}.
-   */
-  public <T> void registerCoder(Class<T> rawClazz, Coder<T> coder) {
-    Preconditions.checkArgument(
-      rawClazz.getTypeParameters().length == 0,
-      "CoderRegistry.registerCoder(Class<T>, Coder<T>) may not be used "
-      + "with unspecialized generic classes");
-
-    CoderFactory factory = CoderFactories.forCoder(coder);
-    registerCoder(rawClazz, factory);
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type.
-   *
-   * @throws CannotProvideCoderException if there is no default Coder.
-   */
-  public <T> Coder<T> getDefaultCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor, Collections.<Type, 
Coder<?>>emptyMap());
-  }
-
-  /**
-   * See {@link #getDefaultCoder(TypeDescriptor)}.
-   */
-  @Override
-  public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(typeDescriptor);
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type, 
where the given input
-   * type uses the given {@link Coder}.
-   *
-   * @throws CannotProvideCoderException if there is no default Coder.
-   */
-  public <InputT, OutputT> Coder<OutputT> getDefaultCoder(
-      TypeDescriptor<OutputT> typeDescriptor,
-      TypeDescriptor<InputT> inputTypeDescriptor,
-      Coder<InputT> inputCoder)
-      throws CannotProvideCoderException {
-    return getDefaultCoder(
-        typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), 
inputCoder));
-  }
-
-  /**
-   * Returns the {@link Coder} to use on elements produced by this function, 
given the {@link Coder}
-   * used for its input elements.
-   */
-  public <InputT, OutputT> Coder<OutputT> getDefaultOutputCoder(
-      SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder)
-      throws CannotProvideCoderException {
-
-    ParameterizedType fnType = (ParameterizedType)
-        
TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType();
-
-    return getDefaultCoder(
-        fn.getClass(),
-        SerializableFunction.class,
-        ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder),
-        SerializableFunction.class.getTypeParameters()[1]);
-  }
-
-  /**
-   * Returns the {@link Coder} to use for the specified type parameter 
specialization of the
-   * subclass, given {@link Coder Coders} to use for all other type parameters 
(if any).
-   *
-   * @throws CannotProvideCoderException if there is no default Coder.
-   */
-  public <T, OutputT> Coder<OutputT> getDefaultCoder(
-      Class<? extends T> subClass,
-      Class<T> baseClass,
-      Map<Type, ? extends Coder<?>> knownCoders,
-      TypeVariable<?> param)
-      throws CannotProvideCoderException {
-
-    Map<Type, Coder<?>> inferredCoders = getDefaultCoders(subClass, baseClass, 
knownCoders);
-
-    @SuppressWarnings("unchecked")
-    Coder<OutputT> paramCoderOrNull = (Coder<OutputT>) 
inferredCoders.get(param);
-    if (paramCoderOrNull != null) {
-      return paramCoderOrNull;
-    } else {
-      throw new CannotProvideCoderException(
-          "Cannot infer coder for type parameter " + param.getName());
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} to use for the provided example value, if it 
can be determined.
-   *
-   * @throws CannotProvideCoderException if there is no default {@link Coder} 
or
-   * more than one {@link Coder} matches
-   */
-  public <T> Coder<T> getDefaultCoder(T exampleValue) throws 
CannotProvideCoderException {
-    Class<?> clazz = exampleValue == null ? Void.class : 
exampleValue.getClass();
-
-    if (clazz.getTypeParameters().length == 0) {
-      // Trust that getDefaultCoder returns a valid
-      // Coder<T> for non-generic clazz.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) getDefaultCoder(clazz);
-      return coder;
-    } else {
-      CoderFactory factory = getDefaultCoderFactory(clazz);
-
-      List<Object> components = factory.getInstanceComponents(exampleValue);
-      if (components == null) {
-        throw new CannotProvideCoderException(String.format(
-            "Cannot provide coder based on value with class %s: The registered 
CoderFactory with "
-            + "class %s failed to decompose the value, which is required in 
order to provide "
-            + "Coders for the components.",
-            clazz.getCanonicalName(), factory.getClass().getCanonicalName()));
-      }
-
-      // componentcoders = components.map(this.getDefaultCoder)
-      List<Coder<?>> componentCoders = new ArrayList<>();
-      for (Object component : components) {
-        try {
-          Coder<?> componentCoder = getDefaultCoder(component);
-          componentCoders.add(componentCoder);
-        } catch (CannotProvideCoderException exc) {
-          throw new CannotProvideCoderException(
-              String.format("Cannot provide coder based on value with class 
%s",
-                  clazz.getCanonicalName()),
-              exc);
-        }
-      }
-
-      // Trust that factory.create maps from valid component Coders
-      // to a valid Coder<T>.
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) factory.create(componentCoders);
-      return coder;
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given 
class. The following three
-   * sources for a {@link Coder} will be attempted, in order:
-   *
-   * <ol>
-   *   <li>A {@link Coder} class registered explicitly via a call to {@link 
#registerCoder},
-   *   <li>A {@link DefaultCoder} annotation on the class,
-   *   <li>This registry's fallback {@link CoderProvider}, which may be able 
to generate a
-   *   {@link Coder} for an arbitrary class.
-   * </ol>
-   *
-   * @throws CannotProvideCoderException if a {@link Coder} cannot be provided
-   */
-  public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws 
CannotProvideCoderException {
-
-    CannotProvideCoderException factoryException;
-    try {
-      CoderFactory coderFactory = getDefaultCoderFactory(clazz);
-      LOG.debug("Default coder for {} found by factory", clazz);
-      @SuppressWarnings("unchecked")
-      Coder<T> coder = (Coder<T>) 
coderFactory.create(Collections.<Coder<?>>emptyList());
-      return coder;
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      return getDefaultCoderFromAnnotation(clazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    CannotProvideCoderException fallbackException;
-    if (getFallbackCoderProvider() != null) {
-      try {
-        return 
getFallbackCoderProvider().getCoder(TypeDescriptor.<T>of(clazz));
-      } catch (CannotProvideCoderException exc) {
-        fallbackException = exc;
-      }
-    } else {
-      fallbackException = new CannotProvideCoderException("no fallback 
CoderProvider configured");
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for 
").append(clazz.getCanonicalName())
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: 
")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation 
failed: ")
-        .append(annotationException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the fallback CoderProvider failed: 
")
-        .append(fallbackException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  /**
-   * Sets the fallback {@link CoderProvider} for this registry. If no other 
method succeeds in
-   * providing a {@code Coder<T>} for a type {@code T}, then the registry will 
attempt to create
-   * a {@link Coder} using this {@link CoderProvider}.
-   *
-   * <p>By default, this is set to {@link SerializableCoder#PROVIDER}.
-   *
-   * <p>See {@link #getFallbackCoderProvider}.
-   */
-  public void setFallbackCoderProvider(CoderProvider coderProvider) {
-    fallbackCoderProvider = coderProvider;
-  }
-
-  /**
-   * Returns the fallback {@link CoderProvider} for this registry.
-   *
-   * <p>See {@link #setFallbackCoderProvider}.
-   */
-  public CoderProvider getFallbackCoderProvider() {
-    return fallbackCoderProvider;
-  }
-
-  /**
-   * Returns a {@code Map} from each of {@code baseClass}'s type parameters to 
the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s 
specialization of
-   * {@code baseClass}.
-   *
-   * <p>If no {@link Coder} can be inferred for a particular type parameter, 
then that type variable
-   * will be absent from the returned {@code Map}.
-   *
-   * <p>For example, if {@code baseClass} is {@code Map.class}, where {@code 
Map<K, V>} has type
-   * parameters {@code K} and {@code V}, and {@code subClass} extends {@code 
Map<String, Integer>}
-   * then the result will map the type variable {@code K} to a {@code 
Coder<String>} and the
-   * type variable {@code V} to a {@code Coder<Integer>}.
-   *
-   * <p>The {@code knownCoders} parameter can be used to provide known {@link 
Coder Coders} for any
-   * of the parameters; these will be used to infer the others.
-   *
-   * <p>Note that inference is attempted for every type variable. For a type
-   * {@code MyType<One, Two, Three>} inference will be attempted for all of 
{@code One},
-   * {@code Two}, {@code Three}, even if the requester only wants a {@link 
Coder} for {@code Two}.
-   *
-   * <p>For this reason {@code getDefaultCoders} (plural) does not throw an 
exception if a
-   * {@link Coder} for a particular type variable cannot be inferred, but 
merely omits the entry
-   * from the returned {@code Map}. It is the responsibility of the caller 
(usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
-   * {@link CannotProvideCoderException} when appropriate.
-   *
-   * @param subClass the concrete type whose specializations are being inferred
-   * @param baseClass the base type, a parameterized class
-   * @param knownCoders a map corresponding to the set of known {@link Coder 
Coders} indexed by
-   * parameter name
-   *
-   * @deprecated this method is not part of the public interface and will be 
made private
-   */
-  @Deprecated
-  public <T> Map<Type, Coder<?>> getDefaultCoders(
-      Class<? extends T> subClass,
-      Class<T> baseClass,
-      Map<Type, ? extends Coder<?>> knownCoders) {
-    TypeVariable<Class<T>>[] typeParams = baseClass.getTypeParameters();
-    Coder<?>[] knownCodersArray = new Coder<?>[typeParams.length];
-    for (int i = 0; i < typeParams.length; i++) {
-      knownCodersArray[i] = knownCoders.get(typeParams[i]);
-    }
-    Coder<?>[] resultArray = getDefaultCoders(
-      subClass, baseClass, knownCodersArray);
-    Map<Type, Coder<?>> result = new HashMap<>();
-    for (int i = 0; i < typeParams.length; i++) {
-      if (resultArray[i] != null) {
-        result.put(typeParams[i], resultArray[i]);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Returns an array listing, for each of {@code baseClass}'s type 
parameters, the {@link Coder} to
-   * use by default for it, in the context of {@code subClass}'s 
specialization of
-   * {@code baseClass}.
-   *
-   * <p>If a {@link Coder} cannot be inferred for a type variable, its slot in 
the resulting array
-   * will be {@code null}.
-   *
-   * <p>For example, if {@code baseClass} is {@code Map.class}, where {@code 
Map<K, V>} has type
-   * parameters {@code K} and {@code V} in that order, and {@code subClass} 
extends
-   * {@code Map<String, Integer>} then the result will contain a {@code 
Coder<String>} and a
-   * {@code Coder<Integer>}, in that order.
-   *
-   * <p>The {@code knownCoders} parameter can be used to provide known {@link 
Coder Coders} for any
-   * of the type parameters. These will be used to infer the others. If 
non-null, the length of this
-   * array must match the number of type parameters of {@code baseClass}, and 
simply be filled with
-   * {@code null} values for each type parameters without a known {@link 
Coder}.
-   *
-   * <p>Note that inference is attempted for every type variable. For a type
-   * {@code MyType<One, Two, Three>} inference will will be attempted for all 
of {@code One},
-   * {@code Two}, {@code Three}, even if the requester only wants a {@link 
Coder} for {@code Two}.
-   *
-   * <p>For this reason {@code getDefaultCoders} (plural) does not throw an 
exception if a
-   * {@link Coder} for a particular type variable cannot be inferred. Instead, 
it results in a
-   * {@code null} in the array. It is the responsibility of the caller (usually
-   * {@link #getDefaultCoder} to extract the desired coder or throw a
-   * {@link CannotProvideCoderException} when appropriate.
-   *
-   * @param subClass the concrete type whose specializations are being inferred
-   * @param baseClass the base type, a parameterized class
-   * @param knownCoders an array corresponding to the set of base class type 
parameters. Each entry
-   *        can be either a {@link Coder} (in which case it will be used for 
inference) or
-   *        {@code null} (in which case it will be inferred). May be {@code 
null} to indicate the
-   *        entire set of parameters should be inferred.
-   * @throws IllegalArgumentException if baseClass doesn't have type 
parameters or if the length of
-   *         {@code knownCoders} is not equal to the number of type parameters 
of {@code baseClass}.
-   */
-  private <T> Coder<?>[] getDefaultCoders(
-      Class<? extends T> subClass,
-      Class<T> baseClass,
-      @Nullable Coder<?>[] knownCoders) {
-    Type type = TypeDescriptor.of(subClass).getSupertype(baseClass).getType();
-    if (!(type instanceof ParameterizedType)) {
-      throw new IllegalArgumentException(type + " is not a ParameterizedType");
-    }
-    ParameterizedType parameterizedType = (ParameterizedType) type;
-    Type[] typeArgs = parameterizedType.getActualTypeArguments();
-    if (knownCoders == null) {
-      knownCoders = new Coder<?>[typeArgs.length];
-    } else if (typeArgs.length != knownCoders.length) {
-      throw new IllegalArgumentException(
-          String.format("Class %s has %d parameters, but %d coders are 
requested.",
-              baseClass.getCanonicalName(), typeArgs.length, 
knownCoders.length));
-    }
-
-    Map<Type, Coder<?>> context = new HashMap<>();
-    for (int i = 0; i < knownCoders.length; i++) {
-      if (knownCoders[i] != null) {
-        try {
-          verifyCompatible(knownCoders[i], typeArgs[i]);
-        } catch (IncompatibleCoderException exn) {
-          throw new IllegalArgumentException(
-              String.format("Provided coders for type arguments of %s contain 
incompatibilities:"
-                  + " Cannot encode elements of type %s with coder %s",
-                  baseClass,
-                  typeArgs[i], knownCoders[i]),
-              exn);
-        }
-        context.putAll(getTypeToCoderBindings(typeArgs[i], knownCoders[i]));
-      }
-    }
-
-    Coder<?>[] result = new Coder<?>[typeArgs.length];
-    for (int i = 0; i < knownCoders.length; i++) {
-      if (knownCoders[i] != null) {
-        result[i] = knownCoders[i];
-      } else {
-        try {
-          result[i] = getDefaultCoder(typeArgs[i], context);
-        } catch (CannotProvideCoderException exc) {
-          result[i] = null;
-        }
-      }
-    }
-    return result;
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Thrown when a {@link Coder} cannot possibly encode a type, yet has been 
proposed as a
-   * {@link Coder} for that type.
-   */
-  @VisibleForTesting static class IncompatibleCoderException extends 
RuntimeException {
-    private Coder<?> coder;
-    private Type type;
-
-    public IncompatibleCoderException(String message, Coder<?> coder, Type 
type) {
-      super(message);
-      this.coder = coder;
-      this.type = type;
-    }
-
-    public IncompatibleCoderException(String message, Coder<?> coder, Type 
type, Throwable cause) {
-      super(message, cause);
-      this.coder = coder;
-      this.type = type;
-    }
-
-    public Coder<?> getCoder() {
-      return coder;
-    }
-
-    public Type getType() {
-      return type;
-    }
-  }
-
-  /**
-   * Returns {@code true} if the given {@link Coder} can possibly encode 
elements
-   * of the given type.
-   */
-  @VisibleForTesting static <T, CoderT extends Coder<T>, CandidateT>
-  void verifyCompatible(CoderT coder, Type candidateType) throws 
IncompatibleCoderException {
-
-    // Various representations of the coder's class
-    @SuppressWarnings("unchecked")
-    Class<CoderT> coderClass = (Class<CoderT>) coder.getClass();
-    TypeDescriptor<CoderT> coderDescriptor = TypeDescriptor.of(coderClass);
-
-    // Various representations of the actual coded type
-    @SuppressWarnings("unchecked")
-    TypeDescriptor<T> codedDescriptor = 
CoderUtils.getCodedType(coderDescriptor);
-    @SuppressWarnings("unchecked")
-    Class<T> codedClass = (Class<T>) codedDescriptor.getRawType();
-    Type codedType = codedDescriptor.getType();
-
-    // Various representations of the candidate type
-    @SuppressWarnings("unchecked")
-    TypeDescriptor<CandidateT> candidateDescriptor =
-        (TypeDescriptor<CandidateT>) TypeDescriptor.of(candidateType);
-    @SuppressWarnings("unchecked")
-    Class<CandidateT> candidateClass = (Class<CandidateT>) 
candidateDescriptor.getRawType();
-
-    // If coder has type Coder<T> where the actual value of T is lost
-    // to erasure, then we cannot rule it out.
-    if (candidateType instanceof TypeVariable) {
-      return;
-    }
-
-    // If the raw types are not compatible, we can certainly rule out
-    // coder compatibility
-    if (!codedClass.isAssignableFrom(candidateClass)) {
-      throw new IncompatibleCoderException(
-          String.format("Cannot encode elements of type %s with coder %s 
because the"
-              + " coded type %s is not assignable from %s",
-              candidateType, coder, codedClass, candidateType),
-          coder, candidateType);
-    }
-    // we have established that this is a covariant upcast... though
-    // coders are invariant, we are just checking one direction
-    @SuppressWarnings("unchecked")
-    TypeDescriptor<T> candidateOkDescriptor = (TypeDescriptor<T>) 
candidateDescriptor;
-
-    // If the coded type is a parameterized type where any of the actual
-    // type parameters are not compatible, then the whole thing is certainly 
not
-    // compatible.
-    if ((codedType instanceof ParameterizedType) && 
!isNullOrEmpty(coder.getCoderArguments())) {
-      ParameterizedType parameterizedSupertype = ((ParameterizedType)
-           candidateOkDescriptor.getSupertype(codedClass).getType());
-      Type[] typeArguments = parameterizedSupertype.getActualTypeArguments();
-      List<? extends Coder<?>> typeArgumentCoders = coder.getCoderArguments();
-      if (typeArguments.length < typeArgumentCoders.size()) {
-        throw new IncompatibleCoderException(
-            String.format("Cannot encode elements of type %s with coder %s:"
-                + " the generic supertype %s has %s type parameters, which is 
less than the"
-                + " number of coder arguments %s has (%s).",
-                candidateOkDescriptor, coder,
-                parameterizedSupertype, typeArguments.length,
-                coder, typeArgumentCoders.size()),
-            coder, candidateOkDescriptor.getType());
-      }
-      for (int i = 0; i < typeArgumentCoders.size(); i++) {
-        try {
-          verifyCompatible(
-              typeArgumentCoders.get(i),
-              candidateDescriptor.resolveType(typeArguments[i]).getType());
-        } catch (IncompatibleCoderException exn) {
-          throw new IncompatibleCoderException(
-              String.format("Cannot encode elements of type %s with coder %s"
-                  + " because some component coder is incompatible",
-                  candidateType, coder),
-              coder, candidateType, exn);
-        }
-      }
-    }
-  }
-
-  private static boolean isNullOrEmpty(Collection<?> c) {
-    return c == null || c.size() == 0;
-  }
-
-  /**
-   * The map of classes to the CoderFactories to use to create their
-   * default Coders.
-   */
-  private Map<Class<?>, CoderFactory> coderFactoryMap = new HashMap<>();
-
-  /**
-   * A provider of coders for types where no coder is registered.
-   */
-  private CoderProvider fallbackCoderProvider;
-
-  /**
-   * Returns the {@link CoderFactory} to use to create default {@link Coder 
Coders} for instances of
-   * the given class, or {@code null} if there is no default {@link 
CoderFactory} registered.
-   */
-  private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws 
CannotProvideCoderException {
-    CoderFactory coderFactoryOrNull = coderFactoryMap.get(clazz);
-    if (coderFactoryOrNull != null) {
-      return coderFactoryOrNull;
-    } else {
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide coder based on value with class %s: No 
CoderFactory has "
-              + "been registered for the class.", clazz.getCanonicalName()));
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} returned according to the {@link CoderProvider} 
from any
-   * {@link DefaultCoder} annotation on the given class.
-   */
-  private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz)
-      throws CannotProvideCoderException {
-    DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
-    if (defaultAnnotation == null) {
-      throw new CannotProvideCoderException(
-          String.format("Class %s does not have a @DefaultCoder annotation.",
-              clazz.getCanonicalName()));
-    }
-
-    LOG.debug("DefaultCoder annotation found for {} with value {}",
-        clazz, defaultAnnotation.value());
-    CoderProvider coderProvider = 
CoderProviders.fromStaticMethods(defaultAnnotation.value());
-    return coderProvider.getCoder(TypeDescriptor.of(clazz));
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type,
-   * in a context where the given types use the given coders.
-   *
-   * @throws CannotProvideCoderException if a coder cannot be provided
-   */
-  private <T> Coder<T> getDefaultCoder(
-      TypeDescriptor<T> typeDescriptor,
-      Map<Type, Coder<?>> typeCoderBindings)
-      throws CannotProvideCoderException {
-
-    Coder<?> defaultCoder = getDefaultCoder(typeDescriptor.getType(), 
typeCoderBindings);
-    LOG.debug("Default coder for {}: {}", typeDescriptor, defaultCoder);
-    @SuppressWarnings("unchecked")
-    Coder<T> result = (Coder<T>) defaultCoder;
-    return result;
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given type,
-   * in a context where the given types use the given coders.
-   *
-   * @throws CannotProvideCoderException if a coder cannot be provided
-   */
-  private Coder<?> getDefaultCoder(Type type, Map<Type, Coder<?>> 
typeCoderBindings)
-      throws CannotProvideCoderException {
-    Coder<?> coder = typeCoderBindings.get(type);
-    if (coder != null) {
-      return coder;
-    }
-    if (type instanceof Class<?>) {
-      Class<?> clazz = (Class<?>) type;
-      return getDefaultCoder(clazz);
-    } else if (type instanceof ParameterizedType) {
-      return getDefaultCoder((ParameterizedType) type, typeCoderBindings);
-    } else if (type instanceof TypeVariable || type instanceof WildcardType) {
-      // No default coder for an unknown generic type.
-      throw new CannotProvideCoderException(
-          String.format("Cannot provide a coder for type variable %s"
-          + " (declared by %s) because the actual type is unknown due to 
erasure.",
-          type,
-          ((TypeVariable<?>) type).getGenericDeclaration()),
-          ReasonCode.TYPE_ERASURE);
-    } else {
-      throw new RuntimeException(
-          "Internal error: unexpected kind of Type: " + type);
-    }
-  }
-
-  /**
-   * Returns the {@link Coder} to use by default for values of the given
-   * parameterized type, in a context where the given types use the
-   * given {@link Coder Coders}.
-   *
-   * @throws CannotProvideCoderException if no coder can be provided
-   */
-  private Coder<?> getDefaultCoder(
-      ParameterizedType type,
-      Map<Type, Coder<?>> typeCoderBindings)
-          throws CannotProvideCoderException {
-
-    CannotProvideCoderException factoryException;
-    try {
-      return getDefaultCoderFromFactory(type, typeCoderBindings);
-    } catch (CannotProvideCoderException exc) {
-      factoryException = exc;
-    }
-
-    CannotProvideCoderException annotationException;
-    try {
-      Class<?> rawClazz = (Class<?>) type.getRawType();
-      return getDefaultCoderFromAnnotation(rawClazz);
-    } catch (CannotProvideCoderException exc) {
-      annotationException = exc;
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to provide a default Coder for ").append(type)
-        .append(". Correct one of the following root causes:");
-
-    messageBuilder
-        .append("\n  Building a Coder using a registered CoderFactory failed: 
")
-        .append(factoryException.getMessage());
-
-    messageBuilder
-        .append("\n  Building a Coder from the @DefaultCoder annotation 
failed: ")
-        .append(annotationException.getMessage());
-
-    throw new CannotProvideCoderException(messageBuilder.toString());
-  }
-
-  private Coder<?> getDefaultCoderFromFactory(
-      ParameterizedType type,
-      Map<Type, Coder<?>> typeCoderBindings)
-          throws CannotProvideCoderException {
-    Class<?> rawClazz = (Class<?>) type.getRawType();
-    CoderFactory coderFactory = getDefaultCoderFactory(rawClazz);
-    List<Coder<?>> typeArgumentCoders = new ArrayList<>();
-    for (Type typeArgument : type.getActualTypeArguments()) {
-      try {
-        Coder<?> typeArgumentCoder = getDefaultCoder(typeArgument,
-                                                     typeCoderBindings);
-        typeArgumentCoders.add(typeArgumentCoder);
-      } catch (CannotProvideCoderException exc) {
-         throw new CannotProvideCoderException(
-          String.format("Cannot provide coder for parameterized type %s: %s",
-              type,
-              exc.getMessage()),
-          exc);
-      }
-    }
-    return coderFactory.create(typeArgumentCoders);
-  }
-
-  /**
-   * Returns an immutable {@code Map} from each of the type variables
-   * embedded in the given type to the corresponding types
-   * in the given {@link Coder}.
-   */
-  private Map<Type, Coder<?>> getTypeToCoderBindings(Type type, Coder<?> 
coder) {
-    if (type instanceof TypeVariable || type instanceof Class) {
-      return ImmutableMap.<Type, Coder<?>>of(type, coder);
-    } else if (type instanceof ParameterizedType) {
-      return getTypeToCoderBindings((ParameterizedType) type, coder);
-    } else {
-      return ImmutableMap.of();
-    }
-  }
-
-  /**
-   * Returns an immutable {@code Map} from the type arguments of the 
parameterized type to their
-   * corresponding {@link Coder Coders}, and so on recursively for their type 
parameters.
-   *
-   * <p>This method is simply a specialization to break out the most
-   * elaborate case of {@link #getTypeToCoderBindings(Type, Coder)}.
-   */
-  private Map<Type, Coder<?>> getTypeToCoderBindings(ParameterizedType type, 
Coder<?> coder) {
-    List<Type> typeArguments = Arrays.asList(type.getActualTypeArguments());
-    List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
-
-    if ((coderArguments == null) || (typeArguments.size() != 
coderArguments.size())) {
-      return ImmutableMap.of();
-    } else {
-      Map<Type, Coder<?>> typeToCoder = Maps.newHashMap();
-
-      typeToCoder.put(type, coder);
-
-      for (int i = 0; i < typeArguments.size(); i++) {
-        Type typeArgument = typeArguments.get(i);
-        Coder<?> coderArgument = coderArguments.get(i);
-        typeToCoder.putAll(getTypeToCoderBindings(typeArgument, 
coderArgument));
-      }
-
-      return ImmutableMap.<Type, 
Coder<?>>builder().putAll(typeToCoder).build();
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java
deleted file mode 100644
index 728f761..0000000
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link CollectionCoder} encodes {@link Collection Collections} in the 
format
- * of {@link IterableLikeCoder}.
- */
-public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
-
-  public static <T> CollectionCoder<T> of(Coder<T> elemCoder) {
-    return new CollectionCoder<>(elemCoder);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the decoded elements directly, since {@link List} is a subtype of
-   * {@link Collection}.
-   */
-  @Override
-  protected final Collection<T> decodeToIterable(List<T> decodedElements) {
-    return decodedElements;
-  }
-
-  @JsonCreator
-  public static CollectionCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
-  /**
-   * Returns the first element in this collection if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Collection<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  protected CollectionCoder(Coder<T> elemCoder) {
-    super(elemCoder, "Collection");
-  }
-}


Reply via email to