http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java deleted file mode 100644 index 1ce5fe5..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.Structs; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import javax.annotation.Nullable; - -/** - * A {@link Coder} using Google Protocol Buffers 2 binary format. - * - * <p>To learn more about Protocol Buffers, visit: - * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a> - * - * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages. - * - * <pre> - * {@code - * PCollection<MyProto.Message> records = - * input.apply(...) - * .setCoder(Proto2Coder.of(MyProto.Message.class)); - * } - * </pre> - * - * <p>Custom message extensions are also supported, but the coder must be made - * aware of them explicitly: - * - * <pre> - * {@code - * PCollection<MyProto.Message> records = - * input.apply(...) - * .setCoder(Proto2Coder.of(MyProto.Message.class) - * .addExtensionsFrom(MyProto.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder, must extend {@code Message} - * @deprecated Use {@link ProtoCoder}. - */ -@Deprecated -public class Proto2Coder<T extends Message> extends AtomicCoder<T> { - - /** The class of Protobuf message to be encoded. */ - private final Class<T> protoMessageClass; - - /** - * All extension host classes included in this Proto2Coder. The extensions from - * these classes will be included in the {@link ExtensionRegistry} used during - * encoding and decoding. - */ - private final List<Class<?>> extensionHostClasses; - - private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) { - this.protoMessageClass = protoMessageClass; - this.extensionHostClasses = extensionHostClasses; - } - - private static final CoderProvider PROVIDER = - new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) { - @SuppressWarnings("unchecked") - TypeDescriptor<? extends Message> messageType = - (TypeDescriptor<? extends Message>) type; - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType); - return coder; - } else { - throw new CannotProvideCoderException( - String.format( - "Cannot provide Proto2Coder because %s " - + "is not a subclass of protocol buffer Messsage", - type)); - } - } - }; - - public static CoderProvider coderProvider() { - return PROVIDER; - } - - /** - * Returns a {@code Proto2Coder} for the given Protobuf message class. - */ - public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) { - return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList()); - } - - /** - * Returns a {@code Proto2Coder} for the given Protobuf message class. - */ - public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType(); - return of(protoMessageClass); - } - - /** - * Produces a {@code Proto2Coder} like this one, but with the extensions from - * the given classes registered. - * - * @param moreExtensionHosts an iterable of classes that define a static - * method {@code registerAllExtensions(ExtensionRegistry)} - */ - public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) { - for (Class<?> extensionHost : moreExtensionHosts) { - // Attempt to access the required method, to make sure it's present. - try { - Method registerAllExtensions = - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - checkArgument( - Modifier.isStatic(registerAllExtensions.getModifiers()), - "Method registerAllExtensions() must be static for use with Proto2Coder"); - } catch (NoSuchMethodException | SecurityException e) { - throw new IllegalArgumentException(e); - } - } - - return new Proto2Coder<T>( - protoMessageClass, - new ImmutableList.Builder<Class<?>>() - .addAll(extensionHostClasses) - .addAll(moreExtensionHosts) - .build()); - } - - /** - * See {@link #withExtensionsFrom(Iterable)}. - */ - public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) { - return withExtensionsFrom(ImmutableList.copyOf(extensionHosts)); - } - - /** - * Adds custom Protobuf extensions to the coder. Returns {@code this} - * for method chaining. - * - * @param extensionHosts must be a class that defines a static - * method name {@code registerAllExtensions} - * @deprecated use {@link #withExtensionsFrom} - */ - @Deprecated - public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) { - return addExtensionsFrom(ImmutableList.copyOf(extensionHosts)); - } - - /** - * Adds custom Protobuf extensions to the coder. Returns {@code this} - * for method chaining. - * - * @param extensionHosts must be a class that defines a static - * method name {@code registerAllExtensions} - * @deprecated use {@link #withExtensionsFrom} - */ - @Deprecated - public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) { - for (Class<?> extensionHost : extensionHosts) { - try { - // Attempt to access the declared method, to make sure it's present. - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - extensionHostClasses.add(extensionHost); - } - // The memoized extension registry needs to be recomputed because we have mutated this object. - synchronized (this) { - memoizedExtensionRegistry = null; - getExtensionRegistry(); - } - return this; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); - } - if (context.isWholeStream) { - value.writeTo(outStream); - } else { - value.writeDelimitedTo(outStream); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - if (context.isWholeStream) { - return getParser().parseFrom(inStream, getExtensionRegistry()); - } else { - return getParser().parseDelimitedFrom(inStream, getExtensionRegistry()); - } - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof Proto2Coder)) { - return false; - } - Proto2Coder<?> otherCoder = (Proto2Coder<?>) other; - return protoMessageClass.equals(otherCoder.protoMessageClass) - && Sets.newHashSet(extensionHostClasses) - .equals(Sets.newHashSet(otherCoder.extensionHostClasses)); - } - - @Override - public int hashCode() { - return Objects.hash(protoMessageClass, extensionHostClasses); - } - - /** - * The encoding identifier is designed to support evolution as per the design of Protocol - * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol - * Buffers documentation at - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating - * A Message Type</a>. - * - * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder} - * instances of the same principal message class, and otherwise distinct. Loaded extensions do not - * affect the id, nor does it encode the full schema. - * - * <p>When modifying a message class, here are the broadest guidelines; see the above link - * for greater detail. - * - * <ul> - * <li>Do not change the numeric tags for any fields. - * <li>Never remove a <code>required</code> field. - * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults. - * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure - * the new and old types are interchangeable. - * </ul> - * - * <p>Code consuming this message class should be prepared to support <i>all</i> versions of - * the class until it is certain that no remaining serialized instances exist. - * - * <p>If backwards incompatible changes must be made, the best recourse is to change the name - * of your Protocol Buffers message class. - */ - @Override - public String getEncodingId() { - return protoMessageClass.getName(); - } - - private transient Parser<T> memoizedParser; - - private Parser<T> getParser() { - if (memoizedParser == null) { - try { - @SuppressWarnings("unchecked") - T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); - memoizedParser = tParser; - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - return memoizedParser; - } - - private transient ExtensionRegistry memoizedExtensionRegistry; - - private synchronized ExtensionRegistry getExtensionRegistry() { - if (memoizedExtensionRegistry == null) { - ExtensionRegistry registry = ExtensionRegistry.newInstance(); - for (Class<?> extensionHost : extensionHostClasses) { - try { - extensionHost - .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class) - .invoke(null, registry); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalStateException(e); - } - } - memoizedExtensionRegistry = registry.getUnmodifiable(); - } - return memoizedExtensionRegistry; - } - - //////////////////////////////////////////////////////////////////////////////////// - // JSON Serialization details below - - private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; - private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts"; - - /** - * Constructor for JSON deserialization only. - */ - @JsonCreator - public static <T extends Message> Proto2Coder<T> of( - @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, - @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { - - try { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); - List<Class<?>> extensionHostClasses = Lists.newArrayList(); - if (extensionHostClassNames != null) { - for (String extensionHostClassName : extensionHostClassNames) { - extensionHostClasses.add(Class.forName(extensionHostClassName)); - } - } - return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); - List<CloudObject> extensionHostClassNames = Lists.newArrayList(); - for (Class<?> clazz : extensionHostClasses) { - extensionHostClassNames.add(CloudObject.forString(clazz.getName())); - } - Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames); - return result; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java deleted file mode 100644 index 1590b11..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamClass; -import java.io.OutputStream; -import java.io.Serializable; - -/** - * A {@link Coder} for Java classes that implement {@link Serializable}. - * - * <p>To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(SerializableCoder.of(MyRecord.class)); - * } - * </pre> - * - * <p>{@link SerializableCoder} does not guarantee a deterministic encoding, as Java - * serialization may produce different binary encodings for two equivalent - * objects. - * - * @param <T> the type of elements handled by this coder - */ -public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> { - - /** - * Returns a {@link SerializableCoder} instance for the provided element type. - * @param <T> the element type - */ - public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) { - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>) type.getRawType(); - return of(clazz); - } - - /** - * Returns a {@link SerializableCoder} instance for the provided element class. - * @param <T> the element type - */ - public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) { - return new SerializableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static SerializableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Serializable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Serializable"); - } - return of((Class<? extends Serializable>) clazz); - } - - /** - * A {@link CoderProvider} that constructs a {@link SerializableCoder} - * for any class that implements serializable. - */ - public static final CoderProvider PROVIDER = new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) - throws CannotProvideCoderException { - Class<?> clazz = typeDescriptor.getRawType(); - if (Serializable.class.isAssignableFrom(clazz)) { - @SuppressWarnings("unchecked") - Class<? extends Serializable> serializableClazz = - (Class<? extends Serializable>) clazz; - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz); - return coder; - } else { - throw new CannotProvideCoderException( - "Cannot provide SerializableCoder because " + typeDescriptor - + " does not implement Serializable"); - } - } - }; - - - private final Class<T> type; - - protected SerializableCoder(Class<T> type) { - this.type = type; - } - - public Class<T> getRecordType() { - return type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) - throws IOException, CoderException { - try { - ObjectOutputStream oos = new ObjectOutputStream(outStream); - oos.writeObject(value); - oos.flush(); - } catch (IOException exn) { - throw new CoderException("unable to serialize record " + value, exn); - } - } - - @Override - public T decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - ObjectInputStream ois = new ObjectInputStream(inStream); - return type.cast(ois.readObject()); - } catch (ClassNotFoundException e) { - throw new CoderException("unable to deserialize record", e); - } - } - - @Override - public String getEncodingId() { - return String.format("%s:%s", - type.getName(), - ObjectStreamClass.lookup(type).getSerialVersionUID()); - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - result.put("type", type.getName()); - return result; - } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. Java serialization is not - * deterministic with respect to {@link Object#equals} for all types. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Java Serialization may be non-deterministic."); - } - - @Override - public boolean equals(Object other) { - if (getClass() != other.getClass()) { - return false; - } - return type == ((SerializableCoder<?>) other).type; - } - - @Override - public int hashCode() { - return type.hashCode(); - } - - // This coder inherits isRegisterByteSizeObserverCheap, - // getEncodedElementByteSize and registerByteSizeObserver - // from StandardCoder. Looks like we cannot do much better - // in this case. -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java deleted file mode 100644 index 589a372..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The - * elements may not be in a deterministic order, depending on the {@code Set} implementation. - * - * @param <T> the type of the elements of the set - */ -public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> { - - /** - * Produces a {@link SetCoder} with the given {@code elementCoder}. - */ - public static <T> SetCoder<T> of(Coder<T> elementCoder) { - return new SetCoder<>(elementCoder); - } - - /** - * Dynamically typed constructor for JSON deserialization. - */ - @JsonCreator - public static SetCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Object> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); - return of((Coder<?>) components.get(0)); - } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. Sets are not ordered, but - * they are encoded in the order of an arbitrary iteration. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Ordering of elements in a set may be non-deterministic."); - } - - /** - * Returns the first element in this set if it is non-empty, - * otherwise returns {@code null}. - */ - public static <T> List<Object> getInstanceComponents( - Set<T> exampleValue) { - return getInstanceComponentsHelper(exampleValue); - } - - ///////////////////////////////////////////////////////////////////////////// - // Internal operations below here. - - /** - * {@inheritDoc} - * - * @return A new {@link Set} built from the elements in the {@link List} decoded by - * {@link IterableLikeCoder}. - */ - @Override - protected final Set<T> decodeToIterable(List<T> decodedElements) { - return new HashSet<>(decodedElements); - } - - protected SetCoder(Coder<T> elemCoder) { - super(elemCoder, "Set"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java deleted file mode 100644 index ca189b1..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.cloud.dataflow.sdk.util.Structs.addList; -import static com.google.cloud.dataflow.sdk.util.Structs.addString; -import static com.google.cloud.dataflow.sdk.util.Structs.addStringList; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.common.collect.Lists; -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; - -import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing - * via the class name and recursively using {@link #getComponents}. - * - * <p>To extend {@link StandardCoder}, override the following methods as appropriate: - * - * <ul> - * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li> - * <li>{@link #getEncodedElementByteSize} and - * {@link #isRegisterByteSizeObserverCheap}: the - * default implementation encodes values to bytes and counts the bytes, which is considered - * expensive.</li> - * <li>{@link #getEncodingId} and {@link #getAllowedEncodings}: by default, the encoding id - * is the empty string, so only the canonical name of the subclass will be used for - * compatibility checks, and no other encoding ids are allowed.</li> - * </ul> - */ -public abstract class StandardCoder<T> implements Coder<T> { - protected StandardCoder() {} - - @Override - public String getEncodingId() { - return ""; - } - - @Override - public Collection<String> getAllowedEncodings() { - return Collections.emptyList(); - } - - /** - * Returns the list of {@link Coder Coders} that are components of this {@link Coder}. - */ - public List<? extends Coder<?>> getComponents() { - List<? extends Coder<?>> coderArguments = getCoderArguments(); - if (coderArguments == null) { - return Collections.emptyList(); - } else { - return coderArguments; - } - } - - /** - * {@inheritDoc} - * - * @return {@code true} if the two {@link StandardCoder} instances have the - * same class and equal components. - */ - @Override - public boolean equals(Object o) { - if (o == null || this.getClass() != o.getClass()) { - return false; - } - StandardCoder<?> that = (StandardCoder<?>) o; - return this.getComponents().equals(that.getComponents()); - } - - @Override - public int hashCode() { - return getClass().hashCode() * 31 + getComponents().hashCode(); - } - - @Override - public String toString() { - String s = getClass().getName(); - s = s.substring(s.lastIndexOf('.') + 1); - List<? extends Coder<?>> componentCoders = getComponents(); - if (!componentCoders.isEmpty()) { - s += "("; - boolean first = true; - for (Coder<?> componentCoder : componentCoders) { - if (first) { - first = false; - } else { - s += ", "; - } - s += componentCoder.toString(); - } - s += ")"; - } - return s; - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - - List<? extends Coder<?>> components = getComponents(); - if (!components.isEmpty()) { - List<CloudObject> cloudComponents = new ArrayList<>(components.size()); - for (Coder<?> coder : components) { - cloudComponents.add(coder.asCloudObject()); - } - addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); - } - - String encodingId = getEncodingId(); - checkNotNull(encodingId, "Coder.getEncodingId() must not return null."); - if (!encodingId.isEmpty()) { - addString(result, PropertyNames.ENCODING_ID, encodingId); - } - - Collection<String> allowedEncodings = getAllowedEncodings(); - if (!allowedEncodings.isEmpty()) { - addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings)); - } - - return result; - } - - /** - * {@inheritDoc} - * - * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver} - * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element - * unless it is overridden. This is considered expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(T value, Context context) { - return false; - } - - /** - * Returns the size in bytes of the encoded value using this coder. - */ - protected long getEncodedElementByteSize(T value, Context context) - throws Exception { - try { - CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream()); - encode(value, os, context); - return os.getCount(); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - - /** - * {@inheritDoc} - * - * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size - * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. - */ - @Override - public void registerByteSizeObserver( - T value, ElementByteSizeObserver observer, Context context) - throws Exception { - observer.update(getEncodedElementByteSize(value, context)); - } - - protected void verifyDeterministic(String message, Iterable<Coder<?>> coders) - throws NonDeterministicException { - for (Coder<?> coder : coders) { - try { - coder.verifyDeterministic(); - } catch (NonDeterministicException e) { - throw new NonDeterministicException(this, message, e); - } - } - } - - protected void verifyDeterministic(String message, Coder<?>... coders) - throws NonDeterministicException { - verifyDeterministic(message, Arrays.asList(coders)); - } - - /** - * {@inheritDoc} - * - * @return {@code false} for {@link StandardCoder} unless overridden. - */ - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(T value) throws Exception { - if (value != null && consistentWithEquals()) { - return value; - } else { - try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - encode(value, os, Context.OUTER); - return new StructuralByteArray(os.toByteArray()); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java deleted file mode 100644 index 3f352d3..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; - -import java.lang.reflect.InvocationTargetException; - -/** - * A {@link Coder} that wraps a {@code Coder<String>} - * and encodes/decodes values via string representations. - * - * <p>To decode, the input byte stream is decoded to - * a {@link String}, and this is passed to the single-argument - * constructor for {@code T}. - * - * <p>To encode, the input value is converted via {@code toString()}, - * and this string is encoded. - * - * <p>In order for this to operate correctly for a class {@code Clazz}, - * it must be the case for any instance {@code x} that - * {@code x.equals(new Clazz(x.toString()))}. - * - * <p>This method of encoding is not designed for ease of evolution of {@code Clazz}; - * it should only be used in cases where the class is stable or the encoding is not - * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder}, - * or {@link JAXBCoder}. - * - * @param <T> The type of objects coded. - */ -public class StringDelegateCoder<T> extends DelegateCoder<T, String> { - public static <T> StringDelegateCoder<T> of(Class<T> clazz) { - return new StringDelegateCoder<T>(clazz); - } - - @Override - public String toString() { - return "StringDelegateCoder(" + clazz + ")"; - } - - private final Class<T> clazz; - - protected StringDelegateCoder(final Class<T> clazz) { - super(StringUtf8Coder.of(), - new CodingFunction<T, String>() { - @Override - public String apply(T input) { - return input.toString(); - } - }, - new CodingFunction<String, T>() { - @Override - public T apply(String input) throws - NoSuchMethodException, - InstantiationException, - IllegalAccessException, - InvocationTargetException { - return clazz.getConstructor(String.class).newInstance(input); - } - }); - - this.clazz = clazz; - } - - /** - * The encoding id is the fully qualified name of the encoded/decoded class. - */ - @Override - public String getEncodingId() { - return clazz.getName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java deleted file mode 100644 index 25b8f5e..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream; -import com.google.cloud.dataflow.sdk.util.StreamUtils; -import com.google.cloud.dataflow.sdk.util.VarInt; -import com.google.common.base.Utf8; -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; -import java.nio.charset.StandardCharsets; - -/** - * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding. - * If in a nested context, prefixes the string with an integer length field, - * encoded via a {@link VarIntCoder}. - */ -public class StringUtf8Coder extends AtomicCoder<String> { - - @JsonCreator - public static StringUtf8Coder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final StringUtf8Coder INSTANCE = new StringUtf8Coder(); - - private static void writeString(String value, DataOutputStream dos) - throws IOException { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - VarInt.encode(bytes.length, dos); - dos.write(bytes); - } - - private static String readString(DataInputStream dis) throws IOException { - int len = VarInt.decodeInt(dis); - if (len < 0) { - throw new CoderException("Invalid encoded string length: " + len); - } - byte[] bytes = new byte[len]; - dis.readFully(bytes); - return new String(bytes, StandardCharsets.UTF_8); - } - - private StringUtf8Coder() {} - - @Override - public void encode(String value, OutputStream outStream, Context context) - throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null String"); - } - if (context.isWholeStream) { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - if (outStream instanceof ExposedByteArrayOutputStream) { - ((ExposedByteArrayOutputStream) outStream).writeAndOwn(bytes); - } else { - outStream.write(bytes); - } - } else { - writeString(value, new DataOutputStream(outStream)); - } - } - - @Override - public String decode(InputStream inStream, Context context) - throws IOException { - if (context.isWholeStream) { - byte[] bytes = StreamUtils.getBytes(inStream); - return new String(bytes, StandardCharsets.UTF_8); - } else { - try { - return readString(new DataInputStream(inStream)); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. This coder is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return the byte size of the UTF-8 encoding of the a string or, in a nested context, - * the byte size of the encoding plus the encoded length prefix. - */ - @Override - protected long getEncodedElementByteSize(String value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null String"); - } - if (context.isWholeStream) { - return Utf8.encodedLength(value); - } else { - CountingOutputStream countingStream = - new CountingOutputStream(ByteStreams.nullOutputStream()); - DataOutputStream stream = new DataOutputStream(countingStream); - writeString(value, stream); - return countingStream.getCount(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java deleted file mode 100644 index aa44456..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.api.client.util.Base64.encodeBase64String; - -import java.util.Arrays; - -/** - * A wrapper around a byte[] that uses structural, value-based - * equality rather than byte[]'s normal object identity. - */ -public class StructuralByteArray { - byte[] value; - - public StructuralByteArray(byte[] value) { - this.value = value; - } - - public byte[] getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (o instanceof StructuralByteArray) { - StructuralByteArray that = (StructuralByteArray) o; - return Arrays.equals(this.value, that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(value); - } - - @Override - public String toString() { - return "base64:" + encodeBase64String(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java deleted file mode 100644 index b02fb08..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.api.services.bigquery.model.TableRow; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. - */ -public class TableRowJsonCoder extends AtomicCoder<TableRow> { - - @JsonCreator - public static TableRowJsonCoder of() { - return INSTANCE; - } - - @Override - public void encode(TableRow value, OutputStream outStream, Context context) - throws IOException { - String strValue = MAPPER.writeValueAsString(value); - StringUtf8Coder.of().encode(strValue, outStream, context); - } - - @Override - public TableRow decode(InputStream inStream, Context context) - throws IOException { - String strValue = StringUtf8Coder.of().decode(inStream, context); - return MAPPER.readValue(strValue, TableRow.class); - } - - @Override - protected long getEncodedElementByteSize(TableRow value, Context context) - throws Exception { - String strValue = MAPPER.writeValueAsString(value); - return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in - // TableRow. - private static final ObjectMapper MAPPER = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - - private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); - - private TableRowJsonCoder() { } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary - * {@link Object} instances, which makes the encoding non-deterministic. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "TableCell can hold arbitrary instances, which may be non-deterministic."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java deleted file mode 100644 index 539c56a..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of - * their textual, decimal, representation. - */ -public class TextualIntegerCoder extends AtomicCoder<Integer> { - - @JsonCreator - public static TextualIntegerCoder of() { - return new TextualIntegerCoder(); - } - - ///////////////////////////////////////////////////////////////////////////// - - protected TextualIntegerCoder() {} - - @Override - public void encode(Integer value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - String textualValue = value.toString(); - StringUtf8Coder.of().encode(textualValue, outStream, context); - } - - @Override - public Integer decode(InputStream inStream, Context context) - throws IOException, CoderException { - String textualValue = StringUtf8Coder.of().decode(inStream, context); - try { - return Integer.valueOf(textualValue); - } catch (NumberFormatException exn) { - throw new CoderException("error when decoding a textual integer", exn); - } - } - - @Override - protected long getEncodedElementByteSize(Integer value, Context context) throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - String textualValue = value.toString(); - return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java deleted file mode 100644 index 42862bb..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.VarInt; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative - * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for - * integers that are known to often be large or negative. - */ -public class VarIntCoder extends AtomicCoder<Integer> { - - @JsonCreator - public static VarIntCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final VarIntCoder INSTANCE = - new VarIntCoder(); - - private VarIntCoder() {} - - @Override - public void encode(Integer value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - VarInt.encode(value.intValue(), outStream); - } - - @Override - public Integer decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - return VarInt.decodeInt(inStream); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VarIntCoder} is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(Integer value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - return VarInt.getLength(value.longValue()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java deleted file mode 100644 index 669453e..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.VarInt; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative - * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for - * longs that are known to often be large or negative. - */ -public class VarLongCoder extends AtomicCoder<Long> { - - @JsonCreator - public static VarLongCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final VarLongCoder INSTANCE = new VarLongCoder(); - - private VarLongCoder() {} - - @Override - public void encode(Long value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Long"); - } - VarInt.encode(value.longValue(), outStream); - } - - @Override - public Long decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - return VarInt.decodeLong(inStream); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VarLongCoder} is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Long value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(Long value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Long"); - } - return VarInt.getLength(value.longValue()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java deleted file mode 100644 index 813ee2f..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/VoidCoder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}. - */ -public class VoidCoder extends AtomicCoder<Void> { - - @JsonCreator - public static VoidCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final VoidCoder INSTANCE = new VoidCoder(); - - private VoidCoder() {} - - @Override - public void encode(Void value, OutputStream outStream, Context context) { - // Nothing to write! - } - - @Override - public Void decode(InputStream inStream, Context context) { - // Nothing to read! - return null; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VoidCoder} is (vacuously) injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Void value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(Void value, Context context) - throws Exception { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java deleted file mode 100644 index a3bc150..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/package-info.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines {@link com.google.cloud.dataflow.sdk.coders.Coder Coders} - * to specify how data is encoded to and decoded from byte strings. - * - * <p>During execution of a Pipeline, elements in a - * {@link com.google.cloud.dataflow.sdk.values.PCollection} - * may need to be encoded into byte strings. - * This happens both at the beginning and end of a pipeline when data is read from and written to - * persistent storage and also during execution of a pipeline when elements are communicated between - * machines. - * - * <p>Exactly when PCollection elements are encoded during execution depends on which - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} is being used and how that runner - * chooses to execute the pipeline. As such, Dataflow requires that all PCollections have an - * appropriate Coder in case it becomes necessary. In many cases, the Coder can be inferred from - * the available Java type - * information and the Pipeline's {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry}. It - * can be specified per PCollection via - * {@link com.google.cloud.dataflow.sdk.values.PCollection#setCoder(Coder)} or per type using the - * {@link com.google.cloud.dataflow.sdk.coders.DefaultCoder} annotation. - * - * <p>This package provides a number of coders for common types like {@code Integer}, - * {@code String}, and {@code List}, as well as coders like - * {@link com.google.cloud.dataflow.sdk.coders.AvroCoder} that can be used to encode many custom - * types. - * - */ -package com.google.cloud.dataflow.sdk.coders; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java deleted file mode 100644 index 88d13e9..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders.protobuf; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.AtomicCoder; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.coders.CoderProvider; -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.Structs; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import javax.annotation.Nullable; - -/** - * A {@link Coder} using Google Protocol Buffers binary format. {@link ProtoCoder} supports both - * Protocol Buffers syntax versions 2 and 3. - * - * <p>To learn more about Protocol Buffers, visit: - * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a> - * - * <p>{@link ProtoCoder} is registered in the global {@link CoderRegistry} as the default - * {@link Coder} for any {@link Message} object. Custom message extensions are also supported, but - * these extensions must be registered for a particular {@link ProtoCoder} instance and that - * instance must be registered on the {@link PCollection} that needs the extensions: - * - * <pre>{@code - * import MyProtoFile; - * import MyProtoFile.MyMessage; - * - * Coder<MyMessage> coder = ProtoCoder.of(MyMessage.class).withExtensionsFrom(MyProtoFile.class); - * PCollection<MyMessage> records = input.apply(...).setCoder(coder); - * }</pre> - * - * <h3>Versioning</h3> - * - * <p>{@link ProtoCoder} supports both versions 2 and 3 of the Protocol Buffers syntax. However, - * the Java runtime version of the <code>google.com.protobuf</code> library must match exactly the - * version of <code>protoc</code> that was used to produce the JAR files containing the compiled - * <code>.proto</code> messages. - * - * <p>For more information, see the - * <a href="https://developers.google.com/protocol-buffers/docs/proto3#using-proto2-message-types">Protocol Buffers documentation</a>. - * - * <h3>{@link ProtoCoder} and Determinism</h3> - * - * <p>In general, Protocol Buffers messages can be encoded deterministically within a single - * pipeline as long as: - * - * <ul> - * <li>The encoded messages (and any transitively linked messages) do not use <code>map</code> - * fields.</li> - * <li>Every Java VM that encodes or decodes the messages use the same runtime version of the - * Protocol Buffers library and the same compiled <code>.proto</code> file JAR.</li> - * </ul> - * - * <h3>{@link ProtoCoder} and Encoding Stability</h3> - * - * <p>When changing Protocol Buffers messages, follow the rules in the Protocol Buffers language - * guides for - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">{@code proto2}</a> - * and - * <a href="https://developers.google.com/protocol-buffers/docs/proto3#updating">{@code proto3}</a> - * syntaxes, depending on your message type. Following these guidelines will ensure that the - * old encoded data can be read by new versions of the code. - * - * <p>Generally, any change to the message type, registered extensions, runtime library, or - * compiled proto JARs may change the encoding. Thus even if both the original and updated messages - * can be encoded deterministically within a single job, these deterministic encodings may not be - * the same across jobs. - * - * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}. - */ -public class ProtoCoder<T extends Message> extends AtomicCoder<T> { - - /** - * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty - * {@link ExtensionRegistry}. - */ - public static CoderProvider coderProvider() { - return PROVIDER; - } - - /** - * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}. - */ - public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) { - return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of()); - } - - /** - * Returns a {@link ProtoCoder} for the Protocol Buffers {@link Message} indicated by the given - * {@link TypeDescriptor}. - */ - public static <T extends Message> ProtoCoder<T> of(TypeDescriptor<T> protoMessageType) { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType(); - return of(protoMessageClass); - } - - /** - * Returns a {@link ProtoCoder} like this one, but with the extensions from the given classes - * registered. - * - * <p>Each of the extension host classes must be an class automatically generated by the - * Protocol Buffers compiler, {@code protoc}, that contains messages. - * - * <p>Does not modify this object. - */ - public ProtoCoder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) { - for (Class<?> extensionHost : moreExtensionHosts) { - // Attempt to access the required method, to make sure it's present. - try { - Method registerAllExtensions = - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - checkArgument( - Modifier.isStatic(registerAllExtensions.getModifiers()), - "Method registerAllExtensions() must be static"); - } catch (NoSuchMethodException | SecurityException e) { - throw new IllegalArgumentException( - String.format("Unable to register extensions for %s", extensionHost.getCanonicalName()), - e); - } - } - - return new ProtoCoder<T>( - protoMessageClass, - new ImmutableSet.Builder<Class<?>>() - .addAll(extensionHostClasses) - .addAll(moreExtensionHosts) - .build()); - } - - /** - * See {@link #withExtensionsFrom(Iterable)}. - * - * <p>Does not modify this object. - */ - public ProtoCoder<T> withExtensionsFrom(Class<?>... moreExtensionHosts) { - return withExtensionsFrom(Arrays.asList(moreExtensionHosts)); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); - } - if (context.isWholeStream) { - value.writeTo(outStream); - } else { - value.writeDelimitedTo(outStream); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - if (context.isWholeStream) { - return getParser().parseFrom(inStream, getExtensionRegistry()); - } else { - return getParser().parseDelimitedFrom(inStream, getExtensionRegistry()); - } - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof ProtoCoder)) { - return false; - } - ProtoCoder<?> otherCoder = (ProtoCoder<?>) other; - return protoMessageClass.equals(otherCoder.protoMessageClass) - && Sets.newHashSet(extensionHostClasses) - .equals(Sets.newHashSet(otherCoder.extensionHostClasses)); - } - - @Override - public int hashCode() { - return Objects.hash(protoMessageClass, extensionHostClasses); - } - - /** - * The encoding identifier is designed to support evolution as per the design of Protocol - * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol - * Buffers documentation at - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating - * A Message Type</a>. - * - * <p>In particular, the encoding identifier is guaranteed to be the same for {@link ProtoCoder} - * instances of the same principal message class, with the same registered extension host classes, - * and otherwise distinct. Note that the encoding ID does not encode any version of the message - * or extensions, nor does it include the message schema. - * - * <p>When modifying a message class, here are the broadest guidelines; see the above link - * for greater detail. - * - * <ul> - * <li>Do not change the numeric tags for any fields. - * <li>Never remove a <code>required</code> field. - * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults. - * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure - * the new and old types are interchangeable. - * </ul> - * - * <p>Code consuming this message class should be prepared to support <i>all</i> versions of - * the class until it is certain that no remaining serialized instances exist. - * - * <p>If backwards incompatible changes must be made, the best recourse is to change the name - * of your Protocol Buffers message class. - */ - @Override - public String getEncodingId() { - return protoMessageClass.getName() + getSortedExtensionClasses().toString(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - ProtobufUtil.verifyDeterministic(this); - } - - /** - * Returns the Protocol Buffers {@link Message} type this {@link ProtoCoder} supports. - */ - public Class<T> getMessageType() { - return protoMessageClass; - } - - /** - * Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages - * to {@code T} registered with this {@link ProtoCoder}. - */ - public ExtensionRegistry getExtensionRegistry() { - if (memoizedExtensionRegistry == null) { - ExtensionRegistry registry = ExtensionRegistry.newInstance(); - for (Class<?> extensionHost : extensionHostClasses) { - try { - extensionHost - .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class) - .invoke(null, registry); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalStateException(e); - } - } - memoizedExtensionRegistry = registry.getUnmodifiable(); - } - return memoizedExtensionRegistry; - } - - //////////////////////////////////////////////////////////////////////////////////// - // Private implementation details below. - - /** The {@link Message} type to be coded. */ - private final Class<T> protoMessageClass; - - /** - * All extension host classes included in this {@link ProtoCoder}. The extensions from these - * classes will be included in the {@link ExtensionRegistry} used during encoding and decoding. - */ - private final Set<Class<?>> extensionHostClasses; - - // Constants used to serialize and deserialize - private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; - private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts"; - - // Transient fields that are lazy initialized and then memoized. - private transient ExtensionRegistry memoizedExtensionRegistry; - private transient Parser<T> memoizedParser; - - /** Private constructor. */ - private ProtoCoder(Class<T> protoMessageClass, Set<Class<?>> extensionHostClasses) { - this.protoMessageClass = protoMessageClass; - this.extensionHostClasses = extensionHostClasses; - } - - /** - * @deprecated For JSON deserialization only. - */ - @JsonCreator - @Deprecated - public static <T extends Message> ProtoCoder<T> of( - @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, - @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { - - try { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); - List<Class<?>> extensionHostClasses = Lists.newArrayList(); - if (extensionHostClassNames != null) { - for (String extensionHostClassName : extensionHostClassNames) { - extensionHostClasses.add(Class.forName(extensionHostClassName)); - } - } - return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); - List<CloudObject> extensionHostClassNames = Lists.newArrayList(); - for (String className : getSortedExtensionClasses()) { - extensionHostClassNames.add(CloudObject.forString(className)); - } - Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames); - return result; - } - - /** Get the memoized {@link Parser}, possibly initializing it lazily. */ - private Parser<T> getParser() { - if (memoizedParser == null) { - try { - @SuppressWarnings("unchecked") - T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); - memoizedParser = tParser; - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - return memoizedParser; - } - - /** - * The implementation of the {@link CoderProvider} for this {@link ProtoCoder} returned by - * {@link #coderProvider()}. - */ - private static final CoderProvider PROVIDER = - new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - if (!type.isSubtypeOf(new TypeDescriptor<Message>() {})) { - throw new CannotProvideCoderException( - String.format( - "Cannot provide %s because %s is not a subclass of %s", - ProtoCoder.class.getSimpleName(), - type, - Message.class.getName())); - } - - @SuppressWarnings("unchecked") - TypeDescriptor<? extends Message> messageType = (TypeDescriptor<? extends Message>) type; - try { - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) ProtoCoder.of(messageType); - return coder; - } catch (IllegalArgumentException e) { - throw new CannotProvideCoderException(e); - } - } - }; - - private SortedSet<String> getSortedExtensionClasses() { - SortedSet<String> ret = new TreeSet<>(); - for (Class<?> clazz : extensionHostClasses) { - ret.add(clazz.getName()); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java deleted file mode 100644 index 24415ac..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtobufUtil.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.coders.protobuf; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException; -import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Descriptors.FileDescriptor.Syntax; -import com.google.protobuf.Descriptors.GenericDescriptor; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.ExtensionRegistry.ExtensionInfo; -import com.google.protobuf.Message; - -import java.lang.reflect.InvocationTargetException; -import java.util.HashSet; -import java.util.Set; - -/** - * Utility functions for reflecting and analyzing Protocol Buffers classes. - * - * <p>Used by {@link ProtoCoder}, but in a separate file for testing and isolation. - */ -class ProtobufUtil { - /** - * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message}. - * - * @throws IllegalArgumentException if there is an error in Java reflection. - */ - static Descriptor getDescriptorForClass(Class<? extends Message> clazz) { - try { - return (Descriptor) clazz.getMethod("getDescriptor").invoke(null); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Returns the {@link Descriptor} for the given Protocol Buffers {@link Message} as well as - * every class it can include transitively. - * - * @throws IllegalArgumentException if there is an error in Java reflection. - */ - static Set<Descriptor> getRecursiveDescriptorsForClass( - Class<? extends Message> clazz, ExtensionRegistry registry) { - Descriptor root = getDescriptorForClass(clazz); - Set<Descriptor> descriptors = new HashSet<>(); - recursivelyAddDescriptors(root, descriptors, registry); - return descriptors; - } - - /** - * Recursively walks the given {@link Message} class and verifies that every field or message - * linked in uses the Protocol Buffers proto2 syntax. - */ - static void checkProto2Syntax(Class<? extends Message> clazz, ExtensionRegistry registry) { - for (GenericDescriptor d : getRecursiveDescriptorsForClass(clazz, registry)) { - Syntax s = d.getFile().getSyntax(); - checkArgument( - s == Syntax.PROTO2, - "Message %s or one of its dependencies does not use proto2 syntax: %s in file %s", - clazz.getName(), - d.getFullName(), - d.getFile().getName()); - } - } - - /** - * Recursively checks whether the specified class uses any Protocol Buffers fields that cannot - * be deterministically encoded. - * - * @throws NonDeterministicException if the object cannot be encoded deterministically. - */ - static void verifyDeterministic(ProtoCoder<?> coder) throws NonDeterministicException { - Class<? extends Message> message = coder.getMessageType(); - ExtensionRegistry registry = coder.getExtensionRegistry(); - Set<Descriptor> descriptors = getRecursiveDescriptorsForClass(message, registry); - for (Descriptor d : descriptors) { - for (FieldDescriptor fd : d.getFields()) { - // If there is a transitively reachable Protocol Buffers map field, then this object cannot - // be encoded deterministically. - if (fd.isMapField()) { - String reason = - String.format( - "Protocol Buffers message %s transitively includes Map field %s (from file %s)." - + " Maps cannot be deterministically encoded.", - message.getName(), - fd.getFullName(), - fd.getFile().getFullName()); - throw new NonDeterministicException(coder, reason); - } - } - } - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - // Disable construction of utility class - private ProtobufUtil() {} - - private static void recursivelyAddDescriptors( - Descriptor message, Set<Descriptor> descriptors, ExtensionRegistry registry) { - if (descriptors.contains(message)) { - return; - } - descriptors.add(message); - - for (FieldDescriptor f : message.getFields()) { - recursivelyAddDescriptors(f, descriptors, registry); - } - for (FieldDescriptor f : message.getExtensions()) { - recursivelyAddDescriptors(f, descriptors, registry); - } - for (ExtensionInfo info : - registry.getAllImmutableExtensionsByExtendedType(message.getFullName())) { - recursivelyAddDescriptors(info.descriptor, descriptors, registry); - } - for (ExtensionInfo info : - registry.getAllMutableExtensionsByExtendedType(message.getFullName())) { - recursivelyAddDescriptors(info.descriptor, descriptors, registry); - } - } - - private static void recursivelyAddDescriptors( - FieldDescriptor field, Set<Descriptor> descriptors, ExtensionRegistry registry) { - switch (field.getType()) { - case BOOL: - case BYTES: - case DOUBLE: - case ENUM: - case FIXED32: - case FIXED64: - case FLOAT: - case INT32: - case INT64: - case SFIXED32: - case SFIXED64: - case SINT32: - case SINT64: - case STRING: - case UINT32: - case UINT64: - // Primitive types do not transitively access anything else. - break; - - case GROUP: - case MESSAGE: - // Recursively adds all the fields from this nested Message. - recursivelyAddDescriptors(field.getMessageType(), descriptors, registry); - break; - - default: - throw new UnsupportedOperationException( - "Unexpected Protocol Buffers field type: " + field.getType()); - } - } -}