http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java deleted file mode 100644 index df9c0c8..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteCoder.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization. - */ -public class ByteCoder extends AtomicCoder<Byte> { - - @JsonCreator - public static ByteCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final ByteCoder INSTANCE = new ByteCoder(); - - private ByteCoder() {} - - @Override - public void encode(Byte value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Byte"); - } - outStream.write(value.byteValue()); - } - - @Override - public Byte decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - // value will be between 0-255, -1 for EOF - int value = inStream.read(); - if (value == -1) { - throw new EOFException("EOF encountered decoding 1 byte from input stream"); - } - return (byte) value; - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - - /** - * {@inheritDoc} - * - * {@link ByteCoder} will never throw a {@link Coder.NonDeterministicException}; bytes can always - * be encoded deterministically. - */ - @Override - public void verifyDeterministic() {} - - /** - * {@inheritDoc} - * - * @return {@code true}. This coder is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns a constant. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code 1}, the byte size of a {@link Byte} encoded using Java serialization. - */ - @Override - protected long getEncodedElementByteSize(Byte value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot estimate size for unsupported null value"); - } - return 1; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java deleted file mode 100644 index 557d2bd..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/ByteStringCoder.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.VarInt; -import com.google.common.io.ByteStreams; -import com.google.protobuf.ByteString; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} for {@link ByteString} objects based on their encoded Protocol Buffer form. - * - * <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString} - * objects are first delimited by their size. - */ -public class ByteStringCoder extends AtomicCoder<ByteString> { - - @JsonCreator - public static ByteStringCoder of() { - return INSTANCE; - } - - /***************************/ - - private static final ByteStringCoder INSTANCE = new ByteStringCoder(); - - private ByteStringCoder() {} - - @Override - public void encode(ByteString value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null ByteString"); - } - - if (!context.isWholeStream) { - // ByteString is not delimited, so write its size before its contents. - VarInt.encode(value.size(), outStream); - } - value.writeTo(outStream); - } - - @Override - public ByteString decode(InputStream inStream, Context context) throws IOException { - if (context.isWholeStream) { - return ByteString.readFrom(inStream); - } - - int size = VarInt.decodeInt(inStream); - // ByteString reads to the end of the input stream, so give it a limited stream of exactly - // the right length. Also set its chunk size so that the ByteString will contain exactly - // one chunk. - return ByteString.readFrom(ByteStreams.limit(inStream, size), size); - } - - @Override - protected long getEncodedElementByteSize(ByteString value, Context context) throws Exception { - int size = value.size(); - - if (context.isWholeStream) { - return size; - } - return VarInt.getLength(size) + size; - } - - /** - * {@inheritDoc} - * - * <p>Returns true; the encoded output of two invocations of {@link ByteStringCoder} in the same - * {@link Coder.Context} will be identical if and only if the original {@link ByteString} objects - * are equal according to {@link Object#equals}. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java deleted file mode 100644 index 66efefa..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CannotProvideCoderException.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -/** - * The exception thrown when a {@link CoderProvider} cannot - * provide a {@link Coder} that has been requested. - */ -public class CannotProvideCoderException extends Exception { - private final ReasonCode reason; - - public CannotProvideCoderException(String message) { - this(message, ReasonCode.UNKNOWN); - } - - public CannotProvideCoderException(String message, ReasonCode reason) { - super(message); - this.reason = reason; - } - - public CannotProvideCoderException(String message, Throwable cause) { - this(message, cause, ReasonCode.UNKNOWN); - } - - public CannotProvideCoderException(String message, Throwable cause, ReasonCode reason) { - super(message, cause); - this.reason = reason; - } - - public CannotProvideCoderException(Throwable cause) { - this(cause, ReasonCode.UNKNOWN); - } - - public CannotProvideCoderException(Throwable cause, ReasonCode reason) { - super(cause); - this.reason = reason; - } - - /** - * @return the reason that Coder inference failed. - */ - public ReasonCode getReason() { - return reason; - } - - /** - * Returns the inner-most {@link CannotProvideCoderException} when they are deeply nested. - * - * <p>For example, if a coder for {@code List<KV<Integer, Whatsit>>} cannot be provided because - * there is no known coder for {@code Whatsit}, the root cause of the exception should be a - * CannotProvideCoderException with details pertinent to {@code Whatsit}, suppressing the - * intermediate layers. - */ - public Throwable getRootCause() { - Throwable cause = getCause(); - if (cause == null) { - return this; - } else if (!(cause instanceof CannotProvideCoderException)) { - return cause; - } else { - return ((CannotProvideCoderException) cause).getRootCause(); - } - } - - /** - * Indicates the reason that {@link Coder} inference failed. - */ - public static enum ReasonCode { - /** - * The reason a coder could not be provided is unknown or does have an established - * {@link ReasonCode}. - */ - UNKNOWN, - - /** - * The reason a coder could not be provided is type erasure, for example when requesting - * coder inference for a {@code List<T>} where {@code T} is unknown. - */ - TYPE_ERASURE - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java deleted file mode 100644 index ff2e10e..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Coder.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.common.base.Joiner; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * A {@link Coder Coder<T>} defines how to encode and decode values of type {@code T} into - * byte streams. - * - * <p>{@link Coder} instances are serialized during job creation and deserialized - * before use, via JSON serialization. See {@link SerializableCoder} for an example of a - * {@link Coder} that adds a custom field to - * the {@link Coder} serialization. It provides a constructor annotated with - * {@link com.fasterxml.jackson.annotation.JsonCreator}, which is a factory method used when - * deserializing a {@link Coder} instance. - * - * <p>{@link Coder} classes for compound types are often composed from coder classes for types - * contains therein. The composition of {@link Coder} instances into a coder for the compound - * class is the subject of the {@link CoderFactory} type, which enables automatic generic - * composition of {@link Coder} classes within the {@link CoderRegistry}. With particular - * static methods on a compound {@link Coder} class, a {@link CoderFactory} can be automatically - * inferred. See {@link KvCoder} for an example of a simple compound {@link Coder} that supports - * automatic composition in the {@link CoderRegistry}. - * - * <p>The binary format of a {@link Coder} is identified by {@link #getEncodingId()}; be sure to - * understand the requirements for evolving coder formats. - * - * <p>All methods of a {@link Coder} are required to be thread safe. - * - * @param <T> the type of the values being transcoded - */ -public interface Coder<T> extends Serializable { - /** The context in which encoding or decoding is being done. */ - public static class Context { - /** - * The outer context: the value being encoded or decoded takes - * up the remainder of the record/stream contents. - */ - public static final Context OUTER = new Context(true); - - /** - * The nested context: the value being encoded or decoded is - * (potentially) a part of a larger record/stream contents, and - * may have other parts encoded or decoded after it. - */ - public static final Context NESTED = new Context(false); - - /** - * Whether the encoded or decoded value fills the remainder of the - * output or input (resp.) record/stream contents. If so, then - * the size of the decoded value can be determined from the - * remaining size of the record/stream contents, and so explicit - * lengths aren't required. - */ - public final boolean isWholeStream; - - public Context(boolean isWholeStream) { - this.isWholeStream = isWholeStream; - } - - public Context nested() { - return NESTED; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof Context)) { - return false; - } - return Objects.equal(isWholeStream, ((Context) obj).isWholeStream); - } - - @Override - public int hashCode() { - return Objects.hashCode(isWholeStream); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Context.class) - .addValue(isWholeStream ? "OUTER" : "NESTED").toString(); - } - } - - /** - * Encodes the given value of type {@code T} onto the given output stream - * in the given context. - * - * @throws IOException if writing to the {@code OutputStream} fails - * for some reason - * @throws CoderException if the value could not be encoded for some reason - */ - public void encode(T value, OutputStream outStream, Context context) - throws CoderException, IOException; - - /** - * Decodes a value of type {@code T} from the given input stream in - * the given context. Returns the decoded value. - * - * @throws IOException if reading from the {@code InputStream} fails - * for some reason - * @throws CoderException if the value could not be decoded for some reason - */ - public T decode(InputStream inStream, Context context) - throws CoderException, IOException; - - /** - * If this is a {@code Coder} for a parameterized type, returns the - * list of {@code Coder}s being used for each of the parameters, or - * returns {@code null} if this cannot be done or this is not a - * parameterized type. - */ - public List<? extends Coder<?>> getCoderArguments(); - - /** - * Returns the {@link CloudObject} that represents this {@code Coder}. - */ - public CloudObject asCloudObject(); - - /** - * Throw {@link NonDeterministicException} if the coding is not deterministic. - * - * <p>In order for a {@code Coder} to be considered deterministic, - * the following must be true: - * <ul> - * <li>two values that compare as equal (via {@code Object.equals()} - * or {@code Comparable.compareTo()}, if supported) have the same - * encoding. - * <li>the {@code Coder} always produces a canonical encoding, which is the - * same for an instance of an object even if produced on different - * computers at different times. - * </ul> - * - * @throws Coder.NonDeterministicException if this coder is not deterministic. - */ - public void verifyDeterministic() throws Coder.NonDeterministicException; - - /** - * Returns {@code true} if this {@link Coder} is injective with respect to {@link Objects#equals}. - * - * <p>Whenever the encoded bytes of two values are equal, then the original values are equal - * according to {@code Objects.equals()}. Note that this is well-defined for {@code null}. - * - * <p>This condition is most notably false for arrays. More generally, this condition is false - * whenever {@code equals()} compares object identity, rather than performing a - * semantic/structural comparison. - */ - public boolean consistentWithEquals(); - - /** - * Returns an object with an {@code Object.equals()} method that represents structural equality - * on the argument. - * - * <p>For any two values {@code x} and {@code y} of type {@code T}, if their encoded bytes are the - * same, then it must be the case that {@code structuralValue(x).equals(@code structuralValue(y)}. - * - * <p>Most notably: - * <ul> - * <li>The structural value for an array coder should perform a structural comparison of the - * contents of the arrays, rather than the default behavior of comparing according to object - * identity. - * <li>The structural value for a coder accepting {@code null} should be a proper object with - * an {@code equals()} method, even if the input value is {@code null}. - * </ul> - * - * <p>See also {@link #consistentWithEquals()}. - */ - public Object structuralValue(T value) throws Exception; - - /** - * Returns whether {@link #registerByteSizeObserver} cheap enough to - * call for every element, that is, if this {@code Coder} can - * calculate the byte size of the element to be coded in roughly - * constant time (or lazily). - * - * <p>Not intended to be called by user code, but instead by - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} - * implementations. - */ - public boolean isRegisterByteSizeObserverCheap(T value, Context context); - - /** - * Notifies the {@code ElementByteSizeObserver} about the byte size - * of the encoded value using this {@code Coder}. - * - * <p>Not intended to be called by user code, but instead by - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} - * implementations. - */ - public void registerByteSizeObserver( - T value, ElementByteSizeObserver observer, Context context) - throws Exception; - - /** - * An identifier for the binary format written by {@link #encode}. - * - * <p>This value, along with the fully qualified class name, forms an identifier for the - * binary format of this coder. Whenever this value changes, the new encoding is considered - * incompatible with the prior format: It is presumed that the prior version of the coder will - * be unable to correctly read the new format and the new version of the coder will be unable to - * correctly read the old format. - * - * <p>If the format is changed in a backwards-compatible way (the Coder can still accept data from - * the prior format), such as by adding optional fields to a Protocol Buffer or Avro definition, - * and you want Dataflow to understand that the new coder is compatible with the prior coder, - * this value must remain unchanged. It is then the responsibility of {@link #decode} to correctly - * read data from the prior format. - */ - @Experimental(Kind.CODER_ENCODING_ID) - public String getEncodingId(); - - /** - * A collection of encodings supported by {@link #decode} in addition to the encoding - * from {@link #getEncodingId()} (which is assumed supported). - * - * <p><i>This information is not currently used for any purpose</i>. It is descriptive only, - * and this method is subject to change. - * - * @see #getEncodingId() - */ - @Experimental(Kind.CODER_ENCODING_ID) - public Collection<String> getAllowedEncodings(); - - /** - * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is - * not deterministic, including details of why the encoding is not deterministic. - */ - public static class NonDeterministicException extends Throwable { - private Coder<?> coder; - private List<String> reasons; - - public NonDeterministicException( - Coder<?> coder, String reason, @Nullable NonDeterministicException e) { - this(coder, Arrays.asList(reason), e); - } - - public NonDeterministicException(Coder<?> coder, String reason) { - this(coder, Arrays.asList(reason), null); - } - - public NonDeterministicException(Coder<?> coder, List<String> reasons) { - this(coder, reasons, null); - } - - public NonDeterministicException( - Coder<?> coder, - List<String> reasons, - @Nullable NonDeterministicException cause) { - super(cause); - Preconditions.checkArgument(reasons.size() > 0, - "Reasons must not be empty."); - this.reasons = reasons; - this.coder = coder; - } - - public Iterable<String> getReasons() { - return reasons; - } - - @Override - public String getMessage() { - return String.format("%s is not deterministic because:\n %s", - coder, Joiner.on("\n ").join(reasons)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java deleted file mode 100644 index f7bbb69..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import java.io.IOException; - -/** - * An {@link Exception} thrown if there is a problem encoding or decoding a value. - */ -public class CoderException extends IOException { - public CoderException(String message) { - super(message); - } - - public CoderException(String message, Throwable cause) { - super(message, cause); - } - - public CoderException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java deleted file mode 100644 index 7f788c7..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactories.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Static utility methods for creating and working with {@link Coder}s. - */ -public final class CoderFactories { - private CoderFactories() { } // Static utility class - - /** - * Creates a {@link CoderFactory} built from particular static methods of a class that - * implements {@link Coder}. - * - * <p>The class must have the following static methods: - * - * <ul> - * <li> {@code - * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...) - * } - * <li> {@code - * public static List<Object> getInstanceComponents(T exampleValue); - * } - * </ul> - * - * <p>The {@code of(...)} method will be used to construct a - * {@code Coder<T>} from component {@link Coder}s. - * It must accept one {@link Coder} argument for each - * generic type parameter of {@code T}. If {@code T} takes no generic - * type parameters, then the {@code of()} factory method should take - * no arguments. - * - * <p>The {@code getInstanceComponents} method will be used to - * decompose a value during the {@link Coder} inference process, - * to automatically choose coders for the components. - * - * <p>Note that the class {@code T} to be coded may be a - * not-yet-specialized generic class. - * For a generic class {@code MyClass<X>} and an actual type parameter - * {@code Foo}, the {@link CoderFactoryFromStaticMethods} will - * accept any {@code Coder<Foo>} and produce a {@code Coder<MyClass<Foo>>}. - * - * <p>For example, the {@link CoderFactory} returned by - * {@code fromStaticMethods(ListCoder.class)} - * will produce a {@code Coder<List<X>>} for any {@code Coder Coder<X>}. - */ - public static <T> CoderFactory fromStaticMethods(Class<T> clazz) { - return new CoderFactoryFromStaticMethods(clazz); - } - - /** - * Creates a {@link CoderFactory} that always returns the - * given coder. - * - * <p>The {@code getInstanceComponents} method of this - * {@link CoderFactory} always returns an empty list. - */ - public static <T> CoderFactory forCoder(Coder<T> coder) { - return new CoderFactoryForCoder<>(coder); - } - - /** - * See {@link #fromStaticMethods} for a detailed description - * of the characteristics of this {@link CoderFactory}. - */ - private static class CoderFactoryFromStaticMethods implements CoderFactory { - - @Override - @SuppressWarnings("rawtypes") - public Coder<?> create(List<? extends Coder<?>> componentCoders) { - try { - return (Coder) factoryMethod.invoke( - null /* static */, componentCoders.toArray()); - } catch (IllegalAccessException | - IllegalArgumentException | - InvocationTargetException | - NullPointerException | - ExceptionInInitializerError exn) { - throw new IllegalStateException( - "error when invoking Coder factory method " + factoryMethod, - exn); - } - } - - @Override - public List<Object> getInstanceComponents(Object value) { - try { - @SuppressWarnings("unchecked") - List<Object> components = (List<Object>) getComponentsMethod.invoke( - null /* static */, value); - return components; - } catch (IllegalAccessException - | IllegalArgumentException - | InvocationTargetException - | NullPointerException - | ExceptionInInitializerError exn) { - throw new IllegalStateException( - "error when invoking Coder getComponents method " + getComponentsMethod, - exn); - } - } - - //////////////////////////////////////////////////////////////////////////////// - - // Method to create a coder given component coders - // For a Coder class of kind * -> * -> ... n times ... -> * - // this has type Coder<?> -> Coder<?> -> ... n times ... -> Coder<T> - private Method factoryMethod; - - // Method to decompose a value of type T into its parts. - // For a Coder class of kind * -> * -> ... n times ... -> * - // this has type T -> List<Object> - // where the list has n elements. - private Method getComponentsMethod; - - /** - * Returns a CoderFactory that invokes the given static factory method - * to create the Coder. - */ - private CoderFactoryFromStaticMethods(Class<?> coderClazz) { - this.factoryMethod = getFactoryMethod(coderClazz); - this.getComponentsMethod = getInstanceComponentsMethod(coderClazz); - } - - /** - * Returns the static {@code of} constructor method on {@code coderClazz} - * if it exists. It is assumed to have one {@link Coder} parameter for - * each type parameter of {@code coderClazz}. - */ - private Method getFactoryMethod(Class<?> coderClazz) { - Method factoryMethodCandidate; - - // Find the static factory method of coderClazz named 'of' with - // the appropriate number of type parameters. - int numTypeParameters = coderClazz.getTypeParameters().length; - Class<?>[] factoryMethodArgTypes = new Class<?>[numTypeParameters]; - Arrays.fill(factoryMethodArgTypes, Coder.class); - try { - factoryMethodCandidate = - coderClazz.getDeclaredMethod("of", factoryMethodArgTypes); - } catch (NoSuchMethodException | SecurityException exn) { - throw new IllegalArgumentException( - "cannot register Coder " + coderClazz + ": " - + "does not have an accessible method named 'of' with " - + numTypeParameters + " arguments of Coder type", - exn); - } - if (!Modifier.isStatic(factoryMethodCandidate.getModifiers())) { - throw new IllegalArgumentException( - "cannot register Coder " + coderClazz + ": " - + "method named 'of' with " + numTypeParameters - + " arguments of Coder type is not static"); - } - if (!coderClazz.isAssignableFrom(factoryMethodCandidate.getReturnType())) { - throw new IllegalArgumentException( - "cannot register Coder " + coderClazz + ": " - + "method named 'of' with " + numTypeParameters - + " arguments of Coder type does not return a " + coderClazz); - } - try { - if (!factoryMethodCandidate.isAccessible()) { - factoryMethodCandidate.setAccessible(true); - } - } catch (SecurityException exn) { - throw new IllegalArgumentException( - "cannot register Coder " + coderClazz + ": " - + "method named 'of' with " + numTypeParameters - + " arguments of Coder type is not accessible", - exn); - } - - return factoryMethodCandidate; - } - - /** - * Finds the static method on {@code coderType} to use - * to decompose a value of type {@code T} into components, - * each corresponding to an argument of the {@code of} - * method. - */ - private <T> Method getInstanceComponentsMethod(Class<?> coderClazz) { - TypeDescriptor<?> coderType = TypeDescriptor.of(coderClazz); - TypeDescriptor<T> argumentType = getCodedType(coderType); - - // getInstanceComponents may be implemented in a superclass, - // so we search them all for an applicable method. We do not - // try to be clever about finding the best overload. It may - // be in a generic superclass, erased to accept an Object. - // However, subtypes are listed before supertypes (it is a - // topological ordering) so probably the best one will be chosen - // if there are more than one (which should be rare) - for (TypeDescriptor<?> supertype : coderType.getClasses()) { - for (Method method : supertype.getRawType().getDeclaredMethods()) { - if (method.getName().equals("getInstanceComponents")) { - TypeDescriptor<?> formalArgumentType = supertype.getArgumentTypes(method).get(0); - if (formalArgumentType.getRawType().isAssignableFrom(argumentType.getRawType())) { - return method; - } - } - } - } - - throw new IllegalArgumentException( - "cannot create a CoderFactory from " + coderType + ": " - + "does not have an accessible method " - + "'getInstanceComponents'"); - } - - /** - * If {@code coderType} is a subclass of {@link Coder} for a specific - * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException. - */ - private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<?> coderType) { - for (TypeDescriptor<?> ifaceType : coderType.getInterfaces()) { - if (ifaceType.getRawType().equals(Coder.class)) { - ParameterizedType coderIface = (ParameterizedType) ifaceType.getType(); - @SuppressWarnings("unchecked") - TypeDescriptor<T> token = - (TypeDescriptor<T>) TypeDescriptor.of(coderIface.getActualTypeArguments()[0]); - return token; - } - } - throw new IllegalArgumentException( - "cannot build CoderFactory from class " + coderType - + ": does not implement Coder<T> for any T."); - } - } - - /** - * See {@link #forCoder} for a detailed description of this - * {@link CoderFactory}. - */ - private static class CoderFactoryForCoder<T> implements CoderFactory { - private Coder<T> coder; - - public CoderFactoryForCoder(Coder<T> coder) { - this.coder = coder; - } - - @Override - public Coder<?> create(List<? extends Coder<?>> componentCoders) { - return this.coder; - } - - @Override - public List<Object> getInstanceComponents(Object value) { - return Collections.emptyList(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java deleted file mode 100644 index 775bfc6..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import java.util.List; - -/** - * A {@link CoderFactory} creates coders and decomposes values. - * It may operate on a parameterized type, such as {@link List}, - * in which case the {@link #create} method accepts a list of - * coders to use for the type parameters. - */ -public interface CoderFactory { - - /** - * Returns a {@code Coder<?>}, given argument coder to use for - * values of a particular type, given the Coders for each of - * the type's generic parameter types. - */ - public Coder<?> create(List<? extends Coder<?>> componentCoders); - - /** - * Returns a list of objects contained in {@code value}, one per - * type argument, or {@code null} if none can be determined. - * The list of returned objects should be the same size as the - * list of coders required by {@link #create}. - */ - public List<Object> getInstanceComponents(Object value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java deleted file mode 100644 index f8e29e6..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -/** - * A {@link CoderProvider} may create a {@link Coder} for - * any concrete class. - */ -public interface CoderProvider { - - /** - * Provides a coder for a given class, if possible. - * - * @throws CannotProvideCoderException if no coder can be provided - */ - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java deleted file mode 100644 index 64d258d..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderProviders.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.util.InstanceBuilder; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.lang.reflect.InvocationTargetException; -import java.util.List; - -/** - * Static utility methods for working with {@link CoderProvider CoderProviders}. - */ -public final class CoderProviders { - - // Static utility class - private CoderProviders() { } - - /** - * Creates a {@link CoderProvider} built from particular static methods of a class that - * implements {@link Coder}. The requirements for this method are precisely the requirements - * for a {@link Coder} class to be usable with {@link DefaultCoder} annotations. - * - * <p>The class must have the following static method: - * - * <pre>{@code - * public static Coder<T> of(TypeDescriptor<T> type) - * } - * </pre> - */ - public static <T> CoderProvider fromStaticMethods(Class<T> clazz) { - return new CoderProviderFromStaticMethods(clazz); - } - - - /** - * Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders} - * and returns the first {@link Coder} provided. - * - * <p>Note that the order in which the providers are listed matters: While the set of types - * handled will be the union of those handled by all of the providers in the list, the actual - * {@link Coder} provided by the first successful provider may differ, and may have inferior - * properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null} - * values, or have comparable performance. - */ - public static CoderProvider firstOf(CoderProvider... coderProviders) { - return new FirstOf(ImmutableList.copyOf(coderProviders)); - } - - /////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * @see #firstOf - */ - private static class FirstOf implements CoderProvider { - - private Iterable<CoderProvider> providers; - - public FirstOf(Iterable<CoderProvider> providers) { - this.providers = providers; - } - - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - List<String> messages = Lists.newArrayList(); - for (CoderProvider provider : providers) { - try { - return provider.getCoder(type); - } catch (CannotProvideCoderException exc) { - messages.add(String.format("%s could not provide a Coder for type %s: %s", - provider, type, exc.getMessage())); - } - } - throw new CannotProvideCoderException( - String.format("Cannot provide coder for type %s: %s.", - type, Joiner.on("; ").join(messages))); - } - } - - private static class CoderProviderFromStaticMethods implements CoderProvider { - - /** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */ - private final boolean takesTypeDescriptor; - private final Class<?> clazz; - - public CoderProviderFromStaticMethods(Class<?> clazz) { - // Note that the second condition supports older classes, which only needed to provide - // of(Class), not of(TypeDescriptor). Our own classes have updated to accept a - // TypeDescriptor. Hence the error message points only to the current specification, - // not both acceptable conditions. - checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz), - "Class " + clazz.getCanonicalName() - + " is missing required static method of(TypeDescriptor)."); - - this.takesTypeDescriptor = classTakesTypeDescriptor(clazz); - this.clazz = clazz; - } - - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - try { - if (takesTypeDescriptor) { - @SuppressWarnings("unchecked") - Coder<T> result = InstanceBuilder.ofType(Coder.class) - .fromClass(clazz) - .fromFactoryMethod("of") - .withArg(TypeDescriptor.class, type) - .build(); - return result; - } else { - @SuppressWarnings("unchecked") - Coder<T> result = InstanceBuilder.ofType(Coder.class) - .fromClass(clazz) - .fromFactoryMethod("of") - .withArg(Class.class, type.getRawType()) - .build(); - return result; - } - } catch (RuntimeException exc) { - if (exc.getCause() instanceof InvocationTargetException) { - throw new CannotProvideCoderException(exc.getCause().getCause()); - } - throw exc; - } - } - - private boolean classTakesTypeDescriptor(Class<?> clazz) { - try { - clazz.getDeclaredMethod("of", TypeDescriptor.class); - return true; - } catch (NoSuchMethodException | SecurityException exc) { - return false; - } - } - - private boolean classTakesClass(Class<?> clazz) { - try { - clazz.getDeclaredMethod("of", Class.class); - return true; - } catch (NoSuchMethodException | SecurityException exc) { - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java deleted file mode 100644 index da9e0a0..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java +++ /dev/null @@ -1,844 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException.ReasonCode; -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.TimestampedValue; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; -import java.lang.reflect.WildcardType; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -/** - * A {@link CoderRegistry} allows registering the default {@link Coder} to use for a Java class, - * and looking up and instantiating the default {@link Coder} for a Java type. - * - * <p>{@link CoderRegistry} uses the following mechanisms to determine a default {@link Coder} for a - * Java class, in order of precedence: - * <ol> - * <li>Registration: - * <ul> - * <li>A {@link CoderFactory} can be registered to handle a particular class via - * {@link #registerCoder(Class, CoderFactory)}.</li> - * <li>A {@link Coder} class with the static methods to satisfy - * {@link CoderFactories#fromStaticMethods} can be registered via - * {@link #registerCoder(Class, Class)}.</li> - * <li>Built-in types are registered via - * {@link #registerStandardCoders()}.</li> - * </ul> - * <li>Annotations: {@link DefaultCoder} can be used to annotate a type with - * the default {@code Coder} type. The {@link Coder} class must satisfy the requirements - * of {@link CoderProviders#fromStaticMethods}. - * <li>Fallback: A fallback {@link CoderProvider} is used to attempt to provide a {@link Coder} - * for any type. By default, this is {@link SerializableCoder#PROVIDER}, which can provide - * a {@link Coder} for any type that is serializable via Java serialization. The fallback - * {@link CoderProvider} can be get and set via {@link #getFallbackCoderProvider()} - * and {@link #setFallbackCoderProvider}. Multiple fallbacks can be chained together using - * {@link CoderProviders#firstOf}. - * </ol> - */ -public class CoderRegistry implements CoderProvider { - - private static final Logger LOG = LoggerFactory.getLogger(CoderRegistry.class); - - public CoderRegistry() { - setFallbackCoderProvider( - CoderProviders.firstOf(ProtoCoder.coderProvider(), SerializableCoder.PROVIDER)); - } - - /** - * Registers standard Coders with this CoderRegistry. - */ - public void registerStandardCoders() { - registerCoder(Byte.class, ByteCoder.class); - registerCoder(ByteString.class, ByteStringCoder.class); - registerCoder(Double.class, DoubleCoder.class); - registerCoder(Instant.class, InstantCoder.class); - registerCoder(Integer.class, VarIntCoder.class); - registerCoder(Iterable.class, IterableCoder.class); - registerCoder(KV.class, KvCoder.class); - registerCoder(List.class, ListCoder.class); - registerCoder(Long.class, VarLongCoder.class); - registerCoder(Map.class, MapCoder.class); - registerCoder(Set.class, SetCoder.class); - registerCoder(String.class, StringUtf8Coder.class); - registerCoder(TableRow.class, TableRowJsonCoder.class); - registerCoder(TimestampedValue.class, TimestampedValue.TimestampedValueCoder.class); - registerCoder(Void.class, VoidCoder.class); - registerCoder(byte[].class, ByteArrayCoder.class); - } - - /** - * Registers {@code coderClazz} as the default {@link Coder} class to handle encoding and - * decoding instances of {@code clazz}, overriding prior registrations if any exist. - * - * <p>Supposing {@code T} is the static type corresponding to the {@code clazz}, then - * {@code coderClazz} should have a static factory method with the following signature: - * - * <pre> {@code - * public static Coder<T> of(Coder<X> argCoder1, Coder<Y> argCoder2, ...) - * } </pre> - * - * <p>This method will be called to create instances of {@code Coder<T>} for values of type - * {@code T}, passing Coders for each of the generic type parameters of {@code T}. If {@code T} - * takes no generic type parameters, then the {@code of()} factory method should have no - * arguments. - * - * <p>If {@code T} is a parameterized type, then it should additionally have a method with the - * following signature: - * - * <pre> {@code - * public static List<Object> getInstanceComponents(T exampleValue); - * } </pre> - * - * <p>This method will be called to decompose a value during the {@link Coder} inference process, - * to automatically choose {@link Coder Coders} for the components. - * - * @param clazz the class of objects to be encoded - * @param coderClazz a class with static factory methods to provide {@link Coder Coders} - */ - public void registerCoder(Class<?> clazz, Class<?> coderClazz) { - registerCoder(clazz, CoderFactories.fromStaticMethods(coderClazz)); - } - - /** - * Registers {@code coderFactory} as the default {@link CoderFactory} to produce {@code Coder} - * instances to decode and encode instances of {@code clazz}. This will override prior - * registrations if any exist. - */ - public void registerCoder(Class<?> clazz, CoderFactory coderFactory) { - coderFactoryMap.put(clazz, coderFactory); - } - - /** - * Register the provided {@link Coder} for encoding all values of the specified {@code Class}. - * This will override prior registrations if any exist. - * - * <p>Not for use with generic rawtypes. Instead, register a {@link CoderFactory} via - * {@link #registerCoder(Class, CoderFactory)} or ensure your {@code Coder} class has the - * appropriate static methods and register it directly via {@link #registerCoder(Class, Class)}. - */ - public <T> void registerCoder(Class<T> rawClazz, Coder<T> coder) { - Preconditions.checkArgument( - rawClazz.getTypeParameters().length == 0, - "CoderRegistry.registerCoder(Class<T>, Coder<T>) may not be used " - + "with unspecialized generic classes"); - - CoderFactory factory = CoderFactories.forCoder(coder); - registerCoder(rawClazz, factory); - } - - /** - * Returns the {@link Coder} to use by default for values of the given type. - * - * @throws CannotProvideCoderException if there is no default Coder. - */ - public <T> Coder<T> getDefaultCoder(TypeDescriptor<T> typeDescriptor) - throws CannotProvideCoderException { - return getDefaultCoder(typeDescriptor, Collections.<Type, Coder<?>>emptyMap()); - } - - /** - * See {@link #getDefaultCoder(TypeDescriptor)}. - */ - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) - throws CannotProvideCoderException { - return getDefaultCoder(typeDescriptor); - } - - /** - * Returns the {@link Coder} to use by default for values of the given type, where the given input - * type uses the given {@link Coder}. - * - * @throws CannotProvideCoderException if there is no default Coder. - */ - public <InputT, OutputT> Coder<OutputT> getDefaultCoder( - TypeDescriptor<OutputT> typeDescriptor, - TypeDescriptor<InputT> inputTypeDescriptor, - Coder<InputT> inputCoder) - throws CannotProvideCoderException { - return getDefaultCoder( - typeDescriptor, getTypeToCoderBindings(inputTypeDescriptor.getType(), inputCoder)); - } - - /** - * Returns the {@link Coder} to use on elements produced by this function, given the {@link Coder} - * used for its input elements. - */ - public <InputT, OutputT> Coder<OutputT> getDefaultOutputCoder( - SerializableFunction<InputT, OutputT> fn, Coder<InputT> inputCoder) - throws CannotProvideCoderException { - - ParameterizedType fnType = (ParameterizedType) - TypeDescriptor.of(fn.getClass()).getSupertype(SerializableFunction.class).getType(); - - return getDefaultCoder( - fn.getClass(), - SerializableFunction.class, - ImmutableMap.of(fnType.getActualTypeArguments()[0], inputCoder), - SerializableFunction.class.getTypeParameters()[1]); - } - - /** - * Returns the {@link Coder} to use for the specified type parameter specialization of the - * subclass, given {@link Coder Coders} to use for all other type parameters (if any). - * - * @throws CannotProvideCoderException if there is no default Coder. - */ - public <T, OutputT> Coder<OutputT> getDefaultCoder( - Class<? extends T> subClass, - Class<T> baseClass, - Map<Type, ? extends Coder<?>> knownCoders, - TypeVariable<?> param) - throws CannotProvideCoderException { - - Map<Type, Coder<?>> inferredCoders = getDefaultCoders(subClass, baseClass, knownCoders); - - @SuppressWarnings("unchecked") - Coder<OutputT> paramCoderOrNull = (Coder<OutputT>) inferredCoders.get(param); - if (paramCoderOrNull != null) { - return paramCoderOrNull; - } else { - throw new CannotProvideCoderException( - "Cannot infer coder for type parameter " + param.getName()); - } - } - - /** - * Returns the {@link Coder} to use for the provided example value, if it can be determined. - * - * @throws CannotProvideCoderException if there is no default {@link Coder} or - * more than one {@link Coder} matches - */ - public <T> Coder<T> getDefaultCoder(T exampleValue) throws CannotProvideCoderException { - Class<?> clazz = exampleValue == null ? Void.class : exampleValue.getClass(); - - if (clazz.getTypeParameters().length == 0) { - // Trust that getDefaultCoder returns a valid - // Coder<T> for non-generic clazz. - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) getDefaultCoder(clazz); - return coder; - } else { - CoderFactory factory = getDefaultCoderFactory(clazz); - - List<Object> components = factory.getInstanceComponents(exampleValue); - if (components == null) { - throw new CannotProvideCoderException(String.format( - "Cannot provide coder based on value with class %s: The registered CoderFactory with " - + "class %s failed to decompose the value, which is required in order to provide " - + "Coders for the components.", - clazz.getCanonicalName(), factory.getClass().getCanonicalName())); - } - - // componentcoders = components.map(this.getDefaultCoder) - List<Coder<?>> componentCoders = new ArrayList<>(); - for (Object component : components) { - try { - Coder<?> componentCoder = getDefaultCoder(component); - componentCoders.add(componentCoder); - } catch (CannotProvideCoderException exc) { - throw new CannotProvideCoderException( - String.format("Cannot provide coder based on value with class %s", - clazz.getCanonicalName()), - exc); - } - } - - // Trust that factory.create maps from valid component Coders - // to a valid Coder<T>. - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) factory.create(componentCoders); - return coder; - } - } - - /** - * Returns the {@link Coder} to use by default for values of the given class. The following three - * sources for a {@link Coder} will be attempted, in order: - * - * <ol> - * <li>A {@link Coder} class registered explicitly via a call to {@link #registerCoder}, - * <li>A {@link DefaultCoder} annotation on the class, - * <li>This registry's fallback {@link CoderProvider}, which may be able to generate a - * {@link Coder} for an arbitrary class. - * </ol> - * - * @throws CannotProvideCoderException if a {@link Coder} cannot be provided - */ - public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws CannotProvideCoderException { - - CannotProvideCoderException factoryException; - try { - CoderFactory coderFactory = getDefaultCoderFactory(clazz); - LOG.debug("Default coder for {} found by factory", clazz); - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) coderFactory.create(Collections.<Coder<?>>emptyList()); - return coder; - } catch (CannotProvideCoderException exc) { - factoryException = exc; - } - - CannotProvideCoderException annotationException; - try { - return getDefaultCoderFromAnnotation(clazz); - } catch (CannotProvideCoderException exc) { - annotationException = exc; - } - - CannotProvideCoderException fallbackException; - if (getFallbackCoderProvider() != null) { - try { - return getFallbackCoderProvider().getCoder(TypeDescriptor.<T>of(clazz)); - } catch (CannotProvideCoderException exc) { - fallbackException = exc; - } - } else { - fallbackException = new CannotProvideCoderException("no fallback CoderProvider configured"); - } - - // Build up the error message and list of causes. - StringBuilder messageBuilder = new StringBuilder() - .append("Unable to provide a default Coder for ").append(clazz.getCanonicalName()) - .append(". Correct one of the following root causes:"); - - messageBuilder - .append("\n Building a Coder using a registered CoderFactory failed: ") - .append(factoryException.getMessage()); - - messageBuilder - .append("\n Building a Coder from the @DefaultCoder annotation failed: ") - .append(annotationException.getMessage()); - - messageBuilder - .append("\n Building a Coder from the fallback CoderProvider failed: ") - .append(fallbackException.getMessage()); - - throw new CannotProvideCoderException(messageBuilder.toString()); - } - - /** - * Sets the fallback {@link CoderProvider} for this registry. If no other method succeeds in - * providing a {@code Coder<T>} for a type {@code T}, then the registry will attempt to create - * a {@link Coder} using this {@link CoderProvider}. - * - * <p>By default, this is set to {@link SerializableCoder#PROVIDER}. - * - * <p>See {@link #getFallbackCoderProvider}. - */ - public void setFallbackCoderProvider(CoderProvider coderProvider) { - fallbackCoderProvider = coderProvider; - } - - /** - * Returns the fallback {@link CoderProvider} for this registry. - * - * <p>See {@link #setFallbackCoderProvider}. - */ - public CoderProvider getFallbackCoderProvider() { - return fallbackCoderProvider; - } - - /** - * Returns a {@code Map} from each of {@code baseClass}'s type parameters to the {@link Coder} to - * use by default for it, in the context of {@code subClass}'s specialization of - * {@code baseClass}. - * - * <p>If no {@link Coder} can be inferred for a particular type parameter, then that type variable - * will be absent from the returned {@code Map}. - * - * <p>For example, if {@code baseClass} is {@code Map.class}, where {@code Map<K, V>} has type - * parameters {@code K} and {@code V}, and {@code subClass} extends {@code Map<String, Integer>} - * then the result will map the type variable {@code K} to a {@code Coder<String>} and the - * type variable {@code V} to a {@code Coder<Integer>}. - * - * <p>The {@code knownCoders} parameter can be used to provide known {@link Coder Coders} for any - * of the parameters; these will be used to infer the others. - * - * <p>Note that inference is attempted for every type variable. For a type - * {@code MyType<One, Two, Three>} inference will be attempted for all of {@code One}, - * {@code Two}, {@code Three}, even if the requester only wants a {@link Coder} for {@code Two}. - * - * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a - * {@link Coder} for a particular type variable cannot be inferred, but merely omits the entry - * from the returned {@code Map}. It is the responsibility of the caller (usually - * {@link #getDefaultCoder} to extract the desired coder or throw a - * {@link CannotProvideCoderException} when appropriate. - * - * @param subClass the concrete type whose specializations are being inferred - * @param baseClass the base type, a parameterized class - * @param knownCoders a map corresponding to the set of known {@link Coder Coders} indexed by - * parameter name - * - * @deprecated this method is not part of the public interface and will be made private - */ - @Deprecated - public <T> Map<Type, Coder<?>> getDefaultCoders( - Class<? extends T> subClass, - Class<T> baseClass, - Map<Type, ? extends Coder<?>> knownCoders) { - TypeVariable<Class<T>>[] typeParams = baseClass.getTypeParameters(); - Coder<?>[] knownCodersArray = new Coder<?>[typeParams.length]; - for (int i = 0; i < typeParams.length; i++) { - knownCodersArray[i] = knownCoders.get(typeParams[i]); - } - Coder<?>[] resultArray = getDefaultCoders( - subClass, baseClass, knownCodersArray); - Map<Type, Coder<?>> result = new HashMap<>(); - for (int i = 0; i < typeParams.length; i++) { - if (resultArray[i] != null) { - result.put(typeParams[i], resultArray[i]); - } - } - return result; - } - - /** - * Returns an array listing, for each of {@code baseClass}'s type parameters, the {@link Coder} to - * use by default for it, in the context of {@code subClass}'s specialization of - * {@code baseClass}. - * - * <p>If a {@link Coder} cannot be inferred for a type variable, its slot in the resulting array - * will be {@code null}. - * - * <p>For example, if {@code baseClass} is {@code Map.class}, where {@code Map<K, V>} has type - * parameters {@code K} and {@code V} in that order, and {@code subClass} extends - * {@code Map<String, Integer>} then the result will contain a {@code Coder<String>} and a - * {@code Coder<Integer>}, in that order. - * - * <p>The {@code knownCoders} parameter can be used to provide known {@link Coder Coders} for any - * of the type parameters. These will be used to infer the others. If non-null, the length of this - * array must match the number of type parameters of {@code baseClass}, and simply be filled with - * {@code null} values for each type parameters without a known {@link Coder}. - * - * <p>Note that inference is attempted for every type variable. For a type - * {@code MyType<One, Two, Three>} inference will will be attempted for all of {@code One}, - * {@code Two}, {@code Three}, even if the requester only wants a {@link Coder} for {@code Two}. - * - * <p>For this reason {@code getDefaultCoders} (plural) does not throw an exception if a - * {@link Coder} for a particular type variable cannot be inferred. Instead, it results in a - * {@code null} in the array. It is the responsibility of the caller (usually - * {@link #getDefaultCoder} to extract the desired coder or throw a - * {@link CannotProvideCoderException} when appropriate. - * - * @param subClass the concrete type whose specializations are being inferred - * @param baseClass the base type, a parameterized class - * @param knownCoders an array corresponding to the set of base class type parameters. Each entry - * can be either a {@link Coder} (in which case it will be used for inference) or - * {@code null} (in which case it will be inferred). May be {@code null} to indicate the - * entire set of parameters should be inferred. - * @throws IllegalArgumentException if baseClass doesn't have type parameters or if the length of - * {@code knownCoders} is not equal to the number of type parameters of {@code baseClass}. - */ - private <T> Coder<?>[] getDefaultCoders( - Class<? extends T> subClass, - Class<T> baseClass, - @Nullable Coder<?>[] knownCoders) { - Type type = TypeDescriptor.of(subClass).getSupertype(baseClass).getType(); - if (!(type instanceof ParameterizedType)) { - throw new IllegalArgumentException(type + " is not a ParameterizedType"); - } - ParameterizedType parameterizedType = (ParameterizedType) type; - Type[] typeArgs = parameterizedType.getActualTypeArguments(); - if (knownCoders == null) { - knownCoders = new Coder<?>[typeArgs.length]; - } else if (typeArgs.length != knownCoders.length) { - throw new IllegalArgumentException( - String.format("Class %s has %d parameters, but %d coders are requested.", - baseClass.getCanonicalName(), typeArgs.length, knownCoders.length)); - } - - Map<Type, Coder<?>> context = new HashMap<>(); - for (int i = 0; i < knownCoders.length; i++) { - if (knownCoders[i] != null) { - try { - verifyCompatible(knownCoders[i], typeArgs[i]); - } catch (IncompatibleCoderException exn) { - throw new IllegalArgumentException( - String.format("Provided coders for type arguments of %s contain incompatibilities:" - + " Cannot encode elements of type %s with coder %s", - baseClass, - typeArgs[i], knownCoders[i]), - exn); - } - context.putAll(getTypeToCoderBindings(typeArgs[i], knownCoders[i])); - } - } - - Coder<?>[] result = new Coder<?>[typeArgs.length]; - for (int i = 0; i < knownCoders.length; i++) { - if (knownCoders[i] != null) { - result[i] = knownCoders[i]; - } else { - try { - result[i] = getDefaultCoder(typeArgs[i], context); - } catch (CannotProvideCoderException exc) { - result[i] = null; - } - } - } - return result; - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Thrown when a {@link Coder} cannot possibly encode a type, yet has been proposed as a - * {@link Coder} for that type. - */ - @VisibleForTesting static class IncompatibleCoderException extends RuntimeException { - private Coder<?> coder; - private Type type; - - public IncompatibleCoderException(String message, Coder<?> coder, Type type) { - super(message); - this.coder = coder; - this.type = type; - } - - public IncompatibleCoderException(String message, Coder<?> coder, Type type, Throwable cause) { - super(message, cause); - this.coder = coder; - this.type = type; - } - - public Coder<?> getCoder() { - return coder; - } - - public Type getType() { - return type; - } - } - - /** - * Returns {@code true} if the given {@link Coder} can possibly encode elements - * of the given type. - */ - @VisibleForTesting static <T, CoderT extends Coder<T>, CandidateT> - void verifyCompatible(CoderT coder, Type candidateType) throws IncompatibleCoderException { - - // Various representations of the coder's class - @SuppressWarnings("unchecked") - Class<CoderT> coderClass = (Class<CoderT>) coder.getClass(); - TypeDescriptor<CoderT> coderDescriptor = TypeDescriptor.of(coderClass); - - // Various representations of the actual coded type - @SuppressWarnings("unchecked") - TypeDescriptor<T> codedDescriptor = CoderUtils.getCodedType(coderDescriptor); - @SuppressWarnings("unchecked") - Class<T> codedClass = (Class<T>) codedDescriptor.getRawType(); - Type codedType = codedDescriptor.getType(); - - // Various representations of the candidate type - @SuppressWarnings("unchecked") - TypeDescriptor<CandidateT> candidateDescriptor = - (TypeDescriptor<CandidateT>) TypeDescriptor.of(candidateType); - @SuppressWarnings("unchecked") - Class<CandidateT> candidateClass = (Class<CandidateT>) candidateDescriptor.getRawType(); - - // If coder has type Coder<T> where the actual value of T is lost - // to erasure, then we cannot rule it out. - if (candidateType instanceof TypeVariable) { - return; - } - - // If the raw types are not compatible, we can certainly rule out - // coder compatibility - if (!codedClass.isAssignableFrom(candidateClass)) { - throw new IncompatibleCoderException( - String.format("Cannot encode elements of type %s with coder %s because the" - + " coded type %s is not assignable from %s", - candidateType, coder, codedClass, candidateType), - coder, candidateType); - } - // we have established that this is a covariant upcast... though - // coders are invariant, we are just checking one direction - @SuppressWarnings("unchecked") - TypeDescriptor<T> candidateOkDescriptor = (TypeDescriptor<T>) candidateDescriptor; - - // If the coded type is a parameterized type where any of the actual - // type parameters are not compatible, then the whole thing is certainly not - // compatible. - if ((codedType instanceof ParameterizedType) && !isNullOrEmpty(coder.getCoderArguments())) { - ParameterizedType parameterizedSupertype = ((ParameterizedType) - candidateOkDescriptor.getSupertype(codedClass).getType()); - Type[] typeArguments = parameterizedSupertype.getActualTypeArguments(); - List<? extends Coder<?>> typeArgumentCoders = coder.getCoderArguments(); - if (typeArguments.length < typeArgumentCoders.size()) { - throw new IncompatibleCoderException( - String.format("Cannot encode elements of type %s with coder %s:" - + " the generic supertype %s has %s type parameters, which is less than the" - + " number of coder arguments %s has (%s).", - candidateOkDescriptor, coder, - parameterizedSupertype, typeArguments.length, - coder, typeArgumentCoders.size()), - coder, candidateOkDescriptor.getType()); - } - for (int i = 0; i < typeArgumentCoders.size(); i++) { - try { - verifyCompatible( - typeArgumentCoders.get(i), - candidateDescriptor.resolveType(typeArguments[i]).getType()); - } catch (IncompatibleCoderException exn) { - throw new IncompatibleCoderException( - String.format("Cannot encode elements of type %s with coder %s" - + " because some component coder is incompatible", - candidateType, coder), - coder, candidateType, exn); - } - } - } - } - - private static boolean isNullOrEmpty(Collection<?> c) { - return c == null || c.size() == 0; - } - - /** - * The map of classes to the CoderFactories to use to create their - * default Coders. - */ - private Map<Class<?>, CoderFactory> coderFactoryMap = new HashMap<>(); - - /** - * A provider of coders for types where no coder is registered. - */ - private CoderProvider fallbackCoderProvider; - - /** - * Returns the {@link CoderFactory} to use to create default {@link Coder Coders} for instances of - * the given class, or {@code null} if there is no default {@link CoderFactory} registered. - */ - private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws CannotProvideCoderException { - CoderFactory coderFactoryOrNull = coderFactoryMap.get(clazz); - if (coderFactoryOrNull != null) { - return coderFactoryOrNull; - } else { - throw new CannotProvideCoderException( - String.format("Cannot provide coder based on value with class %s: No CoderFactory has " - + "been registered for the class.", clazz.getCanonicalName())); - } - } - - /** - * Returns the {@link Coder} returned according to the {@link CoderProvider} from any - * {@link DefaultCoder} annotation on the given class. - */ - private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz) - throws CannotProvideCoderException { - DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class); - if (defaultAnnotation == null) { - throw new CannotProvideCoderException( - String.format("Class %s does not have a @DefaultCoder annotation.", - clazz.getCanonicalName())); - } - - LOG.debug("DefaultCoder annotation found for {} with value {}", - clazz, defaultAnnotation.value()); - CoderProvider coderProvider = CoderProviders.fromStaticMethods(defaultAnnotation.value()); - return coderProvider.getCoder(TypeDescriptor.of(clazz)); - } - - /** - * Returns the {@link Coder} to use by default for values of the given type, - * in a context where the given types use the given coders. - * - * @throws CannotProvideCoderException if a coder cannot be provided - */ - private <T> Coder<T> getDefaultCoder( - TypeDescriptor<T> typeDescriptor, - Map<Type, Coder<?>> typeCoderBindings) - throws CannotProvideCoderException { - - Coder<?> defaultCoder = getDefaultCoder(typeDescriptor.getType(), typeCoderBindings); - LOG.debug("Default coder for {}: {}", typeDescriptor, defaultCoder); - @SuppressWarnings("unchecked") - Coder<T> result = (Coder<T>) defaultCoder; - return result; - } - - /** - * Returns the {@link Coder} to use by default for values of the given type, - * in a context where the given types use the given coders. - * - * @throws CannotProvideCoderException if a coder cannot be provided - */ - private Coder<?> getDefaultCoder(Type type, Map<Type, Coder<?>> typeCoderBindings) - throws CannotProvideCoderException { - Coder<?> coder = typeCoderBindings.get(type); - if (coder != null) { - return coder; - } - if (type instanceof Class<?>) { - Class<?> clazz = (Class<?>) type; - return getDefaultCoder(clazz); - } else if (type instanceof ParameterizedType) { - return getDefaultCoder((ParameterizedType) type, typeCoderBindings); - } else if (type instanceof TypeVariable || type instanceof WildcardType) { - // No default coder for an unknown generic type. - throw new CannotProvideCoderException( - String.format("Cannot provide a coder for type variable %s" - + " (declared by %s) because the actual type is unknown due to erasure.", - type, - ((TypeVariable<?>) type).getGenericDeclaration()), - ReasonCode.TYPE_ERASURE); - } else { - throw new RuntimeException( - "Internal error: unexpected kind of Type: " + type); - } - } - - /** - * Returns the {@link Coder} to use by default for values of the given - * parameterized type, in a context where the given types use the - * given {@link Coder Coders}. - * - * @throws CannotProvideCoderException if no coder can be provided - */ - private Coder<?> getDefaultCoder( - ParameterizedType type, - Map<Type, Coder<?>> typeCoderBindings) - throws CannotProvideCoderException { - - CannotProvideCoderException factoryException; - try { - return getDefaultCoderFromFactory(type, typeCoderBindings); - } catch (CannotProvideCoderException exc) { - factoryException = exc; - } - - CannotProvideCoderException annotationException; - try { - Class<?> rawClazz = (Class<?>) type.getRawType(); - return getDefaultCoderFromAnnotation(rawClazz); - } catch (CannotProvideCoderException exc) { - annotationException = exc; - } - - // Build up the error message and list of causes. - StringBuilder messageBuilder = new StringBuilder() - .append("Unable to provide a default Coder for ").append(type) - .append(". Correct one of the following root causes:"); - - messageBuilder - .append("\n Building a Coder using a registered CoderFactory failed: ") - .append(factoryException.getMessage()); - - messageBuilder - .append("\n Building a Coder from the @DefaultCoder annotation failed: ") - .append(annotationException.getMessage()); - - throw new CannotProvideCoderException(messageBuilder.toString()); - } - - private Coder<?> getDefaultCoderFromFactory( - ParameterizedType type, - Map<Type, Coder<?>> typeCoderBindings) - throws CannotProvideCoderException { - Class<?> rawClazz = (Class<?>) type.getRawType(); - CoderFactory coderFactory = getDefaultCoderFactory(rawClazz); - List<Coder<?>> typeArgumentCoders = new ArrayList<>(); - for (Type typeArgument : type.getActualTypeArguments()) { - try { - Coder<?> typeArgumentCoder = getDefaultCoder(typeArgument, - typeCoderBindings); - typeArgumentCoders.add(typeArgumentCoder); - } catch (CannotProvideCoderException exc) { - throw new CannotProvideCoderException( - String.format("Cannot provide coder for parameterized type %s: %s", - type, - exc.getMessage()), - exc); - } - } - return coderFactory.create(typeArgumentCoders); - } - - /** - * Returns an immutable {@code Map} from each of the type variables - * embedded in the given type to the corresponding types - * in the given {@link Coder}. - */ - private Map<Type, Coder<?>> getTypeToCoderBindings(Type type, Coder<?> coder) { - if (type instanceof TypeVariable || type instanceof Class) { - return ImmutableMap.<Type, Coder<?>>of(type, coder); - } else if (type instanceof ParameterizedType) { - return getTypeToCoderBindings((ParameterizedType) type, coder); - } else { - return ImmutableMap.of(); - } - } - - /** - * Returns an immutable {@code Map} from the type arguments of the parameterized type to their - * corresponding {@link Coder Coders}, and so on recursively for their type parameters. - * - * <p>This method is simply a specialization to break out the most - * elaborate case of {@link #getTypeToCoderBindings(Type, Coder)}. - */ - private Map<Type, Coder<?>> getTypeToCoderBindings(ParameterizedType type, Coder<?> coder) { - List<Type> typeArguments = Arrays.asList(type.getActualTypeArguments()); - List<? extends Coder<?>> coderArguments = coder.getCoderArguments(); - - if ((coderArguments == null) || (typeArguments.size() != coderArguments.size())) { - return ImmutableMap.of(); - } else { - Map<Type, Coder<?>> typeToCoder = Maps.newHashMap(); - - typeToCoder.put(type, coder); - - for (int i = 0; i < typeArguments.size(); i++) { - Type typeArgument = typeArguments.get(i); - Coder<?> coderArgument = coderArguments.get(i); - typeToCoder.putAll(getTypeToCoderBindings(typeArgument, coderArgument)); - } - - return ImmutableMap.<Type, Coder<?>>builder().putAll(typeToCoder).build(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java deleted file mode 100644 index 728f761..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/CollectionCoder.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Collection; -import java.util.List; - -/** - * A {@link CollectionCoder} encodes {@link Collection Collections} in the format - * of {@link IterableLikeCoder}. - */ -public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> { - - public static <T> CollectionCoder<T> of(Coder<T> elemCoder) { - return new CollectionCoder<>(elemCoder); - } - - ///////////////////////////////////////////////////////////////////////////// - // Internal operations below here. - - /** - * {@inheritDoc} - * - * @return the decoded elements directly, since {@link List} is a subtype of - * {@link Collection}. - */ - @Override - protected final Collection<T> decodeToIterable(List<T> decodedElements) { - return decodedElements; - } - - @JsonCreator - public static CollectionCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Object> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); - return of((Coder<?>) components.get(0)); - } - - /** - * Returns the first element in this collection if it is non-empty, - * otherwise returns {@code null}. - */ - public static <T> List<Object> getInstanceComponents( - Collection<T> exampleValue) { - return getInstanceComponentsHelper(exampleValue); - } - - protected CollectionCoder(Coder<T> elemCoder) { - super(elemCoder, "Collection"); - } -}