http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java deleted file mode 100644 index 2b0190b..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.Structs; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.FilterInputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; - -/** - * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms - * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object. - * - * @param <T> type of JAXB annotated objects that will be serialized. - */ -public class JAXBCoder<T> extends AtomicCoder<T> { - - private final Class<T> jaxbClass; - private transient Marshaller jaxbMarshaller = null; - private transient Unmarshaller jaxbUnmarshaller = null; - - public Class<T> getJAXBClass() { - return jaxbClass; - } - - private JAXBCoder(Class<T> jaxbClass) { - this.jaxbClass = jaxbClass; - } - - /** - * Create a coder for a given type of JAXB annotated objects. - * - * @param jaxbClass the {@code Class} of the JAXB annotated objects. - */ - public static <T> JAXBCoder<T> of(Class<T> jaxbClass) { - return new JAXBCoder<>(jaxbClass); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) - throws CoderException, IOException { - try { - if (jaxbMarshaller == null) { - JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass); - jaxbMarshaller = jaxbContext.createMarshaller(); - } - - jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) { - // JAXB closes the underyling stream so we must filter out those calls. - @Override - public void close() throws IOException { - } - }); - } catch (JAXBException e) { - throw new CoderException(e); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws CoderException, IOException { - try { - if (jaxbUnmarshaller == null) { - JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass); - jaxbUnmarshaller = jaxbContext.createUnmarshaller(); - } - - @SuppressWarnings("unchecked") - T obj = (T) jaxbUnmarshaller.unmarshal(new FilterInputStream(inStream) { - // JAXB closes the underyling stream so we must filter out those calls. - @Override - public void close() throws IOException { - } - }); - return obj; - } catch (JAXBException e) { - throw new CoderException(e); - } - } - - @Override - public String getEncodingId() { - return getJAXBClass().getName(); - } - - //////////////////////////////////////////////////////////////////////////////////// - // JSON Serialization details below - - private static final String JAXB_CLASS = "jaxb_class"; - - /** - * Constructor for JSON deserialization only. - */ - @JsonCreator - public static <T> JAXBCoder<T> of( - @JsonProperty(JAXB_CLASS) String jaxbClassName) { - try { - @SuppressWarnings("unchecked") - Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName); - return of(jaxbClass); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - Structs.addString(result, JAXB_CLASS, jaxbClass.getName()); - return result; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java deleted file mode 100644 index 33085cf..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; - -/** - * A {@code KvCoder} encodes {@link KV}s. - * - * @param <K> the type of the keys of the KVs being transcoded - * @param <V> the type of the values of the KVs being transcoded - */ -public class KvCoder<K, V> extends KvCoderBase<KV<K, V>> { - public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder, - Coder<V> valueCoder) { - return new KvCoder<>(keyCoder, valueCoder); - } - - @JsonCreator - public static KvCoder<?, ?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - Preconditions.checkArgument(components.size() == 2, - "Expecting 2 components, got " + components.size()); - return of(components.get(0), components.get(1)); - } - - public static <K, V> List<Object> getInstanceComponents( - KV<K, V> exampleValue) { - return Arrays.asList( - exampleValue.getKey(), - exampleValue.getValue()); - } - - public Coder<K> getKeyCoder() { - return keyCoder; - } - - public Coder<V> getValueCoder() { - return valueCoder; - } - - ///////////////////////////////////////////////////////////////////////////// - - private final Coder<K> keyCoder; - private final Coder<V> valueCoder; - - private KvCoder(Coder<K> keyCoder, Coder<V> valueCoder) { - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - } - - @Override - public void encode(KV<K, V> kv, OutputStream outStream, Context context) - throws IOException, CoderException { - if (kv == null) { - throw new CoderException("cannot encode a null KV"); - } - Context nestedContext = context.nested(); - keyCoder.encode(kv.getKey(), outStream, nestedContext); - valueCoder.encode(kv.getValue(), outStream, nestedContext); - } - - @Override - public KV<K, V> decode(InputStream inStream, Context context) - throws IOException, CoderException { - Context nestedContext = context.nested(); - K key = keyCoder.decode(inStream, nestedContext); - V value = valueCoder.decode(inStream, nestedContext); - return KV.of(key, value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(keyCoder, valueCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic("Key coder must be deterministic", getKeyCoder()); - verifyDeterministic("Value coder must be deterministic", getValueCoder()); - } - - @Override - public boolean consistentWithEquals() { - return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals(); - } - - @Override - public Object structuralValue(KV<K, V> kv) throws Exception { - if (consistentWithEquals()) { - return kv; - } else { - return KV.of(getKeyCoder().structuralValue(kv.getKey()), - getValueCoder().structuralValue(kv.getValue())); - } - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - addBoolean(result, PropertyNames.IS_PAIR_LIKE, true); - return result; - } - - /** - * Returns whether both keyCoder and valueCoder are considered not expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) { - return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), - context.nested()) - && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), - context.nested()); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the - * encoded value using this coder. - */ - @Override - public void registerByteSizeObserver( - KV<K, V> kv, ElementByteSizeObserver observer, Context context) - throws Exception { - if (kv == null) { - throw new CoderException("cannot encode a null KV"); - } - keyCoder.registerByteSizeObserver( - kv.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver( - kv.getValue(), observer, context.nested()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java deleted file mode 100644 index 4a12ee0..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -/** - * A abstract base class for KvCoder. Works around a Jackson2 bug tickled when building - * {@link KvCoder} directly (as of this writing, Jackson2 walks off the end of - * an array when it tries to deserialize a class with multiple generic type - * parameters). This class should be removed when possible. - * - * @param <T> the type of values being transcoded - */ -@Deprecated -public abstract class KvCoderBase<T> extends StandardCoder<T> { - /** - * A constructor used only for decoding from JSON. - * - * @param typeId present in the JSON encoding, but unused - * @param isPairLike present in the JSON encoding, but unused - */ - @Deprecated - @JsonCreator - public static KvCoderBase<?> of( - // N.B. typeId is a required parameter here, since a field named "@type" - // is presented to the deserializer as an input. - // - // If this method did not consume the field, Jackson2 would observe an - // unconsumed field and a returned value of a derived type. So Jackson2 - // would attempt to update the returned value with the unconsumed field - // data. The standard JsonDeserializer does not implement a mechanism for - // updating constructed values, so it would throw an exception, causing - // deserialization to fail. - @JsonProperty(value = "@type", required = false) String typeId, - @JsonProperty(value = PropertyNames.IS_PAIR_LIKE, required = false) boolean isPairLike, - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) { - return KvCoder.of(components); - } - - protected KvCoderBase() {} -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java deleted file mode 100644 index bc74404..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -/** - * A {@link Coder} for {@link List}, using the format of {@link IterableLikeCoder}. - * - * @param <T> the type of the elements of the Lists being transcoded - */ -public class ListCoder<T> extends IterableLikeCoder<T, List<T>> { - - public static <T> ListCoder<T> of(Coder<T> elemCoder) { - return new ListCoder<>(elemCoder); - } - - ///////////////////////////////////////////////////////////////////////////// - // Internal operations below here. - - @Override - protected final List<T> decodeToIterable(List<T> decodedElements) { - return decodedElements; - } - - @JsonCreator - public static ListCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); - return of((Coder<?>) components.get(0)); - } - - /** - * Returns the first element in this list if it is non-empty, - * otherwise returns {@code null}. - */ - public static <T> List<Object> getInstanceComponents(List<T> exampleValue) { - return getInstanceComponentsHelper(exampleValue); - } - - protected ListCoder(Coder<T> elemCoder) { - super(elemCoder, "List"); - } - - /** - * List sizes are always known, so ListIterable may be deterministic while - * the general IterableLikeCoder is not. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - "ListCoder.elemCoder must be deterministic", getElemCoder()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java deleted file mode 100644 index b6f3103..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * A {@link Coder} for {@link Map Maps} that encodes them according to provided - * coders for keys and values. - * - * @param <K> the type of the keys of the KVs being transcoded - * @param <V> the type of the values of the KVs being transcoded - */ -public class MapCoder<K, V> extends MapCoderBase<Map<K, V>> { - /** - * Produces a MapCoder with the given keyCoder and valueCoder. - */ - public static <K, V> MapCoder<K, V> of( - Coder<K> keyCoder, - Coder<V> valueCoder) { - return new MapCoder<>(keyCoder, valueCoder); - } - - @JsonCreator - public static MapCoder<?, ?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - Preconditions.checkArgument(components.size() == 2, - "Expecting 2 components, got " + components.size()); - return of((Coder<?>) components.get(0), (Coder<?>) components.get(1)); - } - - /** - * Returns the key and value for an arbitrary element of this map, - * if it is non-empty, otherwise returns {@code null}. - */ - public static <K, V> List<Object> getInstanceComponents( - Map<K, V> exampleValue) { - for (Map.Entry<K, V> entry : exampleValue.entrySet()) { - return Arrays.asList(entry.getKey(), entry.getValue()); - } - return null; - } - - public Coder<K> getKeyCoder() { - return keyCoder; - } - - public Coder<V> getValueCoder() { - return valueCoder; - } - - ///////////////////////////////////////////////////////////////////////////// - - Coder<K> keyCoder; - Coder<V> valueCoder; - - MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) { - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - } - - @Override - public void encode( - Map<K, V> map, - OutputStream outStream, - Context context) - throws IOException, CoderException { - if (map == null) { - throw new CoderException("cannot encode a null Map"); - } - DataOutputStream dataOutStream = new DataOutputStream(outStream); - dataOutStream.writeInt(map.size()); - for (Entry<K, V> entry : map.entrySet()) { - keyCoder.encode(entry.getKey(), outStream, context.nested()); - valueCoder.encode(entry.getValue(), outStream, context.nested()); - } - dataOutStream.flush(); - } - - @Override - public Map<K, V> decode(InputStream inStream, Context context) - throws IOException, CoderException { - DataInputStream dataInStream = new DataInputStream(inStream); - int size = dataInStream.readInt(); - Map<K, V> retval = Maps.newHashMapWithExpectedSize(size); - for (int i = 0; i < size; ++i) { - K key = keyCoder.decode(inStream, context.nested()); - V value = valueCoder.decode(inStream, context.nested()); - retval.put(key, value); - } - return retval; - } - - /** - * {@inheritDoc} - * - * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1. - */ - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.asList(keyCoder, valueCoder); - } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. Not all maps have a deterministic encoding. - * For example, {@code HashMap} comparison does not depend on element order, so - * two {@code HashMap} instances may be equal but produce different encodings. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Ordering of entries in a Map may be non-deterministic."); - } - - @Override - public void registerByteSizeObserver( - Map<K, V> map, ElementByteSizeObserver observer, Context context) - throws Exception { - observer.update(4L); - for (Entry<K, V> entry : map.entrySet()) { - keyCoder.registerByteSizeObserver( - entry.getKey(), observer, context.nested()); - valueCoder.registerByteSizeObserver( - entry.getValue(), observer, context.nested()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java deleted file mode 100644 index d32406c..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -/** - * A abstract base class for MapCoder. Works around a Jackson2 bug tickled when building - * {@link MapCoder} directly (as of this writing, Jackson2 walks off the end of - * an array when it tries to deserialize a class with multiple generic type - * parameters). This should be removed in favor of a better workaround. - * @param <T> the type of values being transcoded - */ -@Deprecated -public abstract class MapCoderBase<T> extends StandardCoder<T> { - @Deprecated - @JsonCreator - public static MapCoderBase<?> of( - // N.B. typeId is a required parameter here, since a field named "@type" - // is presented to the deserializer as an input. - // - // If this method did not consume the field, Jackson2 would observe an - // unconsumed field and a returned value of a derived type. So Jackson2 - // would attempt to update the returned value with the unconsumed field - // data, The standard JsonDeserializer does not implement a mechanism for - // updating constructed values, so it would throw an exception, causing - // deserialization to fail. - @JsonProperty(value = "@type", required = false) String typeId, - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - return MapCoder.of(components); - } - - protected MapCoderBase() {} -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java deleted file mode 100644 index 5598a71..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * A {@link NullableCoder} encodes nullable values of type {@code T} using a nested - * {@code Coder<T>} that does not tolerate {@code null} values. {@link NullableCoder} uses - * exactly 1 byte per entry to indicate whether the value is {@code null}, then adds the encoding - * of the inner coder for non-null values. - * - * @param <T> the type of the values being transcoded - */ -public class NullableCoder<T> extends StandardCoder<T> { - public static <T> NullableCoder<T> of(Coder<T> valueCoder) { - return new NullableCoder<>(valueCoder); - } - - @JsonCreator - public static NullableCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 components, got " + components.size()); - return of(components.get(0)); - } - - ///////////////////////////////////////////////////////////////////////////// - - private final Coder<T> valueCoder; - private static final int ENCODE_NULL = 0; - private static final int ENCODE_PRESENT = 1; - - private NullableCoder(Coder<T> valueCoder) { - this.valueCoder = valueCoder; - } - - @Override - public void encode(@Nullable T value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - outStream.write(ENCODE_NULL); - } else { - outStream.write(ENCODE_PRESENT); - valueCoder.encode(value, outStream, context.nested()); - } - } - - @Override - @Nullable - public T decode(InputStream inStream, Context context) throws IOException, CoderException { - int b = inStream.read(); - if (b == ENCODE_NULL) { - return null; - } else if (b != ENCODE_PRESENT) { - throw new CoderException(String.format( - "NullableCoder expects either a byte valued %s (null) or %s (present), got %s", - ENCODE_NULL, ENCODE_PRESENT, b)); - } - return valueCoder.decode(inStream, context.nested()); - } - - @Override - public List<Coder<T>> getCoderArguments() { - return ImmutableList.of(valueCoder); - } - - /** - * {@code NullableCoder} is deterministic if the nested {@code Coder} is. - * - * {@inheritDoc} - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic("Value coder must be deterministic", valueCoder); - } - - /** - * {@code NullableCoder} is consistent with equals if the nested {@code Coder} is. - * - * {@inheritDoc} - */ - @Override - public boolean consistentWithEquals() { - return valueCoder.consistentWithEquals(); - } - - @Override - public Object structuralValue(@Nullable T value) throws Exception { - if (value == null) { - return Optional.absent(); - } - return Optional.of(valueCoder.structuralValue(value)); - } - - /** - * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and - * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise - * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. - * - * {@inheritDoc} - */ - @Override - public void registerByteSizeObserver( - @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception { - observer.update(1); - if (value != null) { - valueCoder.registerByteSizeObserver(value, observer, context.nested()); - } - } - - /** - * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and - * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise - * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}. - * - * {@inheritDoc} - */ - @Override - protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception { - if (value == null) { - return 1; - } - - if (valueCoder instanceof StandardCoder) { - // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of - // the value, adding 1 byte to count the null indicator. - return 1 + ((StandardCoder<T>) valueCoder) - .getEncodedElementByteSize(value, context.nested()); - } - - // If value is not a StandardCoder then fall back to the default StandardCoder behavior - // of encoding and counting the bytes. The encoding will include the null indicator byte. - return super.getEncodedElementByteSize(value, context); - } - - /** - * {@code NullableCoder} is cheap if {@code valueCoder} is cheap. - * - * {@inheritDoc} - */ - @Override - public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) { - return valueCoder.isRegisterByteSizeObserverCheap(value, context.nested()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java deleted file mode 100644 index ef91ba9..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.Structs; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.protobuf.ExtensionRegistry; -import com.google.protobuf.Message; -import com.google.protobuf.Parser; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import javax.annotation.Nullable; - -/** - * A {@link Coder} using Google Protocol Buffers 2 binary format. - * - * <p>To learn more about Protocol Buffers, visit: - * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a> - * - * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages. - * - * <pre> - * {@code - * PCollection<MyProto.Message> records = - * input.apply(...) - * .setCoder(Proto2Coder.of(MyProto.Message.class)); - * } - * </pre> - * - * <p>Custom message extensions are also supported, but the coder must be made - * aware of them explicitly: - * - * <pre> - * {@code - * PCollection<MyProto.Message> records = - * input.apply(...) - * .setCoder(Proto2Coder.of(MyProto.Message.class) - * .addExtensionsFrom(MyProto.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder, must extend {@code Message} - * @deprecated Use {@link ProtoCoder}. - */ -@Deprecated -public class Proto2Coder<T extends Message> extends AtomicCoder<T> { - - /** The class of Protobuf message to be encoded. */ - private final Class<T> protoMessageClass; - - /** - * All extension host classes included in this Proto2Coder. The extensions from - * these classes will be included in the {@link ExtensionRegistry} used during - * encoding and decoding. - */ - private final List<Class<?>> extensionHostClasses; - - private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) { - this.protoMessageClass = protoMessageClass; - this.extensionHostClasses = extensionHostClasses; - } - - private static final CoderProvider PROVIDER = - new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException { - if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) { - @SuppressWarnings("unchecked") - TypeDescriptor<? extends Message> messageType = - (TypeDescriptor<? extends Message>) type; - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType); - return coder; - } else { - throw new CannotProvideCoderException( - String.format( - "Cannot provide Proto2Coder because %s " - + "is not a subclass of protocol buffer Messsage", - type)); - } - } - }; - - public static CoderProvider coderProvider() { - return PROVIDER; - } - - /** - * Returns a {@code Proto2Coder} for the given Protobuf message class. - */ - public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) { - return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList()); - } - - /** - * Returns a {@code Proto2Coder} for the given Protobuf message class. - */ - public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType(); - return of(protoMessageClass); - } - - /** - * Produces a {@code Proto2Coder} like this one, but with the extensions from - * the given classes registered. - * - * @param moreExtensionHosts an iterable of classes that define a static - * method {@code registerAllExtensions(ExtensionRegistry)} - */ - public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) { - for (Class<?> extensionHost : moreExtensionHosts) { - // Attempt to access the required method, to make sure it's present. - try { - Method registerAllExtensions = - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - checkArgument( - Modifier.isStatic(registerAllExtensions.getModifiers()), - "Method registerAllExtensions() must be static for use with Proto2Coder"); - } catch (NoSuchMethodException | SecurityException e) { - throw new IllegalArgumentException(e); - } - } - - return new Proto2Coder<T>( - protoMessageClass, - new ImmutableList.Builder<Class<?>>() - .addAll(extensionHostClasses) - .addAll(moreExtensionHosts) - .build()); - } - - /** - * See {@link #withExtensionsFrom(Iterable)}. - */ - public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) { - return withExtensionsFrom(ImmutableList.copyOf(extensionHosts)); - } - - /** - * Adds custom Protobuf extensions to the coder. Returns {@code this} - * for method chaining. - * - * @param extensionHosts must be a class that defines a static - * method name {@code registerAllExtensions} - * @deprecated use {@link #withExtensionsFrom} - */ - @Deprecated - public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) { - return addExtensionsFrom(ImmutableList.copyOf(extensionHosts)); - } - - /** - * Adds custom Protobuf extensions to the coder. Returns {@code this} - * for method chaining. - * - * @param extensionHosts must be a class that defines a static - * method name {@code registerAllExtensions} - * @deprecated use {@link #withExtensionsFrom} - */ - @Deprecated - public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) { - for (Class<?> extensionHost : extensionHosts) { - try { - // Attempt to access the declared method, to make sure it's present. - extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - extensionHostClasses.add(extensionHost); - } - // The memoized extension registry needs to be recomputed because we have mutated this object. - synchronized (this) { - memoizedExtensionRegistry = null; - getExtensionRegistry(); - } - return this; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName()); - } - if (context.isWholeStream) { - value.writeTo(outStream); - } else { - value.writeDelimitedTo(outStream); - } - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - if (context.isWholeStream) { - return getParser().parseFrom(inStream, getExtensionRegistry()); - } else { - return getParser().parseDelimitedFrom(inStream, getExtensionRegistry()); - } - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof Proto2Coder)) { - return false; - } - Proto2Coder<?> otherCoder = (Proto2Coder<?>) other; - return protoMessageClass.equals(otherCoder.protoMessageClass) - && Sets.newHashSet(extensionHostClasses) - .equals(Sets.newHashSet(otherCoder.extensionHostClasses)); - } - - @Override - public int hashCode() { - return Objects.hash(protoMessageClass, extensionHostClasses); - } - - /** - * The encoding identifier is designed to support evolution as per the design of Protocol - * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol - * Buffers documentation at - * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating - * A Message Type</a>. - * - * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder} - * instances of the same principal message class, and otherwise distinct. Loaded extensions do not - * affect the id, nor does it encode the full schema. - * - * <p>When modifying a message class, here are the broadest guidelines; see the above link - * for greater detail. - * - * <ul> - * <li>Do not change the numeric tags for any fields. - * <li>Never remove a <code>required</code> field. - * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults. - * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure - * the new and old types are interchangeable. - * </ul> - * - * <p>Code consuming this message class should be prepared to support <i>all</i> versions of - * the class until it is certain that no remaining serialized instances exist. - * - * <p>If backwards incompatible changes must be made, the best recourse is to change the name - * of your Protocol Buffers message class. - */ - @Override - public String getEncodingId() { - return protoMessageClass.getName(); - } - - private transient Parser<T> memoizedParser; - - private Parser<T> getParser() { - if (memoizedParser == null) { - try { - @SuppressWarnings("unchecked") - T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType(); - memoizedParser = tParser; - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); - } - } - return memoizedParser; - } - - private transient ExtensionRegistry memoizedExtensionRegistry; - - private synchronized ExtensionRegistry getExtensionRegistry() { - if (memoizedExtensionRegistry == null) { - ExtensionRegistry registry = ExtensionRegistry.newInstance(); - for (Class<?> extensionHost : extensionHostClasses) { - try { - extensionHost - .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class) - .invoke(null, registry); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalStateException(e); - } - } - memoizedExtensionRegistry = registry.getUnmodifiable(); - } - return memoizedExtensionRegistry; - } - - //////////////////////////////////////////////////////////////////////////////////// - // JSON Serialization details below - - private static final String PROTO_MESSAGE_CLASS = "proto_message_class"; - private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts"; - - /** - * Constructor for JSON deserialization only. - */ - @JsonCreator - public static <T extends Message> Proto2Coder<T> of( - @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName, - @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) { - - try { - @SuppressWarnings("unchecked") - Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName); - List<Class<?>> extensionHostClasses = Lists.newArrayList(); - if (extensionHostClassNames != null) { - for (String extensionHostClassName : extensionHostClassNames) { - extensionHostClasses.add(Class.forName(extensionHostClassName)); - } - } - return of(protoMessageClass).withExtensionsFrom(extensionHostClasses); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName()); - List<CloudObject> extensionHostClassNames = Lists.newArrayList(); - for (Class<?> clazz : extensionHostClasses) { - extensionHostClassNames.add(CloudObject.forString(clazz.getName())); - } - Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java deleted file mode 100644 index 593c9f0..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamClass; -import java.io.OutputStream; -import java.io.Serializable; - -/** - * A {@link Coder} for Java classes that implement {@link Serializable}. - * - * <p>To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(SerializableCoder.of(MyRecord.class)); - * } - * </pre> - * - * <p>{@link SerializableCoder} does not guarantee a deterministic encoding, as Java - * serialization may produce different binary encodings for two equivalent - * objects. - * - * @param <T> the type of elements handled by this coder - */ -public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> { - - /** - * Returns a {@link SerializableCoder} instance for the provided element type. - * @param <T> the element type - */ - public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) { - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>) type.getRawType(); - return of(clazz); - } - - /** - * Returns a {@link SerializableCoder} instance for the provided element class. - * @param <T> the element type - */ - public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) { - return new SerializableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static SerializableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Serializable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Serializable"); - } - return of((Class<? extends Serializable>) clazz); - } - - /** - * A {@link CoderProvider} that constructs a {@link SerializableCoder} - * for any class that implements serializable. - */ - public static final CoderProvider PROVIDER = new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) - throws CannotProvideCoderException { - Class<?> clazz = typeDescriptor.getRawType(); - if (Serializable.class.isAssignableFrom(clazz)) { - @SuppressWarnings("unchecked") - Class<? extends Serializable> serializableClazz = - (Class<? extends Serializable>) clazz; - @SuppressWarnings("unchecked") - Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz); - return coder; - } else { - throw new CannotProvideCoderException( - "Cannot provide SerializableCoder because " + typeDescriptor - + " does not implement Serializable"); - } - } - }; - - - private final Class<T> type; - - protected SerializableCoder(Class<T> type) { - this.type = type; - } - - public Class<T> getRecordType() { - return type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) - throws IOException, CoderException { - try { - ObjectOutputStream oos = new ObjectOutputStream(outStream); - oos.writeObject(value); - oos.flush(); - } catch (IOException exn) { - throw new CoderException("unable to serialize record " + value, exn); - } - } - - @Override - public T decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - ObjectInputStream ois = new ObjectInputStream(inStream); - return type.cast(ois.readObject()); - } catch (ClassNotFoundException e) { - throw new CoderException("unable to deserialize record", e); - } - } - - @Override - public String getEncodingId() { - return String.format("%s:%s", - type.getName(), - ObjectStreamClass.lookup(type).getSerialVersionUID()); - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - result.put("type", type.getName()); - return result; - } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. Java serialization is not - * deterministic with respect to {@link Object#equals} for all types. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Java Serialization may be non-deterministic."); - } - - @Override - public boolean equals(Object other) { - if (getClass() != other.getClass()) { - return false; - } - return type == ((SerializableCoder<?>) other).type; - } - - @Override - public int hashCode() { - return type.hashCode(); - } - - // This coder inherits isRegisterByteSizeObserverCheap, - // getEncodedElementByteSize and registerByteSizeObserver - // from StandardCoder. Looks like we cannot do much better - // in this case. -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java deleted file mode 100644 index 36b3606..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.common.base.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The - * elements may not be in a deterministic order, depending on the {@code Set} implementation. - * - * @param <T> the type of the elements of the set - */ -public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> { - - /** - * Produces a {@link SetCoder} with the given {@code elementCoder}. - */ - public static <T> SetCoder<T> of(Coder<T> elementCoder) { - return new SetCoder<>(elementCoder); - } - - /** - * Dynamically typed constructor for JSON deserialization. - */ - @JsonCreator - public static SetCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Object> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); - return of((Coder<?>) components.get(0)); - } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. Sets are not ordered, but - * they are encoded in the order of an arbitrary iteration. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Ordering of elements in a set may be non-deterministic."); - } - - /** - * Returns the first element in this set if it is non-empty, - * otherwise returns {@code null}. - */ - public static <T> List<Object> getInstanceComponents( - Set<T> exampleValue) { - return getInstanceComponentsHelper(exampleValue); - } - - ///////////////////////////////////////////////////////////////////////////// - // Internal operations below here. - - /** - * {@inheritDoc} - * - * @return A new {@link Set} built from the elements in the {@link List} decoded by - * {@link IterableLikeCoder}. - */ - @Override - protected final Set<T> decodeToIterable(List<T> decodedElements) { - return new HashSet<>(decodedElements); - } - - protected SetCoder(Coder<T> elemCoder) { - super(elemCoder, "Set"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java deleted file mode 100644 index faa9861..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.cloud.dataflow.sdk.util.Structs.addList; -import static com.google.cloud.dataflow.sdk.util.Structs.addString; -import static com.google.cloud.dataflow.sdk.util.Structs.addStringList; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver; -import com.google.common.collect.Lists; -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; - -import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing - * via the class name and recursively using {@link #getComponents}. - * - * <p>To extend {@link StandardCoder}, override the following methods as appropriate: - * - * <ul> - * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li> - * <li>{@link #getEncodedElementByteSize} and - * {@link #isRegisterByteSizeObserverCheap}: the - * default implementation encodes values to bytes and counts the bytes, which is considered - * expensive.</li> - * <li>{@link #getEncodingId} and {@link #getAllowedEncodings}: by default, the encoding id - * is the empty string, so only the canonical name of the subclass will be used for - * compatibility checks, and no other encoding ids are allowed.</li> - * </ul> - */ -public abstract class StandardCoder<T> implements Coder<T> { - protected StandardCoder() {} - - @Override - public String getEncodingId() { - return ""; - } - - @Override - public Collection<String> getAllowedEncodings() { - return Collections.emptyList(); - } - - /** - * Returns the list of {@link Coder Coders} that are components of this {@link Coder}. - */ - public List<? extends Coder<?>> getComponents() { - List<? extends Coder<?>> coderArguments = getCoderArguments(); - if (coderArguments == null) { - return Collections.emptyList(); - } else { - return coderArguments; - } - } - - /** - * {@inheritDoc} - * - * @return {@code true} if the two {@link StandardCoder} instances have the - * same class and equal components. - */ - @Override - public boolean equals(Object o) { - if (o == null || this.getClass() != o.getClass()) { - return false; - } - StandardCoder<?> that = (StandardCoder<?>) o; - return this.getComponents().equals(that.getComponents()); - } - - @Override - public int hashCode() { - return getClass().hashCode() * 31 + getComponents().hashCode(); - } - - @Override - public String toString() { - String s = getClass().getName(); - s = s.substring(s.lastIndexOf('.') + 1); - List<? extends Coder<?>> componentCoders = getComponents(); - if (!componentCoders.isEmpty()) { - s += "("; - boolean first = true; - for (Coder<?> componentCoder : componentCoders) { - if (first) { - first = false; - } else { - s += ", "; - } - s += componentCoder.toString(); - } - s += ")"; - } - return s; - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - - List<? extends Coder<?>> components = getComponents(); - if (!components.isEmpty()) { - List<CloudObject> cloudComponents = new ArrayList<>(components.size()); - for (Coder<?> coder : components) { - cloudComponents.add(coder.asCloudObject()); - } - addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents); - } - - String encodingId = getEncodingId(); - checkNotNull(encodingId, "Coder.getEncodingId() must not return null."); - if (!encodingId.isEmpty()) { - addString(result, PropertyNames.ENCODING_ID, encodingId); - } - - Collection<String> allowedEncodings = getAllowedEncodings(); - if (!allowedEncodings.isEmpty()) { - addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings)); - } - - return result; - } - - /** - * {@inheritDoc} - * - * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver} - * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element - * unless it is overridden. This is considered expensive. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(T value, Context context) { - return false; - } - - /** - * Returns the size in bytes of the encoded value using this coder. - */ - protected long getEncodedElementByteSize(T value, Context context) - throws Exception { - try { - CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream()); - encode(value, os, context); - return os.getCount(); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - - /** - * {@inheritDoc} - * - * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size - * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}. - */ - @Override - public void registerByteSizeObserver( - T value, ElementByteSizeObserver observer, Context context) - throws Exception { - observer.update(getEncodedElementByteSize(value, context)); - } - - protected void verifyDeterministic(String message, Iterable<Coder<?>> coders) - throws NonDeterministicException { - for (Coder<?> coder : coders) { - try { - coder.verifyDeterministic(); - } catch (NonDeterministicException e) { - throw new NonDeterministicException(this, message, e); - } - } - } - - protected void verifyDeterministic(String message, Coder<?>... coders) - throws NonDeterministicException { - verifyDeterministic(message, Arrays.asList(coders)); - } - - /** - * {@inheritDoc} - * - * @return {@code false} for {@link StandardCoder} unless overridden. - */ - @Override - public boolean consistentWithEquals() { - return false; - } - - @Override - public Object structuralValue(T value) throws Exception { - if (value != null && consistentWithEquals()) { - return value; - } else { - try { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - encode(value, os, Context.OUTER); - return new StructuralByteArray(os.toByteArray()); - } catch (Exception exn) { - throw new IllegalArgumentException( - "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java deleted file mode 100644 index 1fc1247..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; - -import java.lang.reflect.InvocationTargetException; - -/** - * A {@link Coder} that wraps a {@code Coder<String>} - * and encodes/decodes values via string representations. - * - * <p>To decode, the input byte stream is decoded to - * a {@link String}, and this is passed to the single-argument - * constructor for {@code T}. - * - * <p>To encode, the input value is converted via {@code toString()}, - * and this string is encoded. - * - * <p>In order for this to operate correctly for a class {@code Clazz}, - * it must be the case for any instance {@code x} that - * {@code x.equals(new Clazz(x.toString()))}. - * - * <p>This method of encoding is not designed for ease of evolution of {@code Clazz}; - * it should only be used in cases where the class is stable or the encoding is not - * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder}, - * or {@link JAXBCoder}. - * - * @param <T> The type of objects coded. - */ -public class StringDelegateCoder<T> extends DelegateCoder<T, String> { - public static <T> StringDelegateCoder<T> of(Class<T> clazz) { - return new StringDelegateCoder<T>(clazz); - } - - @Override - public String toString() { - return "StringDelegateCoder(" + clazz + ")"; - } - - private final Class<T> clazz; - - protected StringDelegateCoder(final Class<T> clazz) { - super(StringUtf8Coder.of(), - new CodingFunction<T, String>() { - @Override - public String apply(T input) { - return input.toString(); - } - }, - new CodingFunction<String, T>() { - @Override - public T apply(String input) throws - NoSuchMethodException, - InstantiationException, - IllegalAccessException, - InvocationTargetException { - return clazz.getConstructor(String.class).newInstance(input); - } - }); - - this.clazz = clazz; - } - - /** - * The encoding id is the fully qualified name of the encoded/decoded class. - */ - @Override - public String getEncodingId() { - return clazz.getName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java deleted file mode 100644 index 179840c..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream; -import com.google.cloud.dataflow.sdk.util.StreamUtils; -import com.google.cloud.dataflow.sdk.util.VarInt; -import com.google.common.base.Utf8; -import com.google.common.io.ByteStreams; -import com.google.common.io.CountingOutputStream; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; -import java.nio.charset.StandardCharsets; - -/** - * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding. - * If in a nested context, prefixes the string with an integer length field, - * encoded via a {@link VarIntCoder}. - */ -public class StringUtf8Coder extends AtomicCoder<String> { - - @JsonCreator - public static StringUtf8Coder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final StringUtf8Coder INSTANCE = new StringUtf8Coder(); - - private static void writeString(String value, DataOutputStream dos) - throws IOException { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - VarInt.encode(bytes.length, dos); - dos.write(bytes); - } - - private static String readString(DataInputStream dis) throws IOException { - int len = VarInt.decodeInt(dis); - if (len < 0) { - throw new CoderException("Invalid encoded string length: " + len); - } - byte[] bytes = new byte[len]; - dis.readFully(bytes); - return new String(bytes, StandardCharsets.UTF_8); - } - - private StringUtf8Coder() {} - - @Override - public void encode(String value, OutputStream outStream, Context context) - throws IOException { - if (value == null) { - throw new CoderException("cannot encode a null String"); - } - if (context.isWholeStream) { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - if (outStream instanceof ExposedByteArrayOutputStream) { - ((ExposedByteArrayOutputStream) outStream).writeAndOwn(bytes); - } else { - outStream.write(bytes); - } - } else { - writeString(value, new DataOutputStream(outStream)); - } - } - - @Override - public String decode(InputStream inStream, Context context) - throws IOException { - if (context.isWholeStream) { - byte[] bytes = StreamUtils.getBytes(inStream); - return new String(bytes, StandardCharsets.UTF_8); - } else { - try { - return readString(new DataInputStream(inStream)); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. This coder is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return the byte size of the UTF-8 encoding of the a string or, in a nested context, - * the byte size of the encoding plus the encoded length prefix. - */ - @Override - protected long getEncodedElementByteSize(String value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null String"); - } - if (context.isWholeStream) { - return Utf8.encodedLength(value); - } else { - CountingOutputStream countingStream = - new CountingOutputStream(ByteStreams.nullOutputStream()); - DataOutputStream stream = new DataOutputStream(countingStream); - writeString(value, stream); - return countingStream.getCount(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java deleted file mode 100644 index ea18eb9..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.api.client.util.Base64.encodeBase64String; - -import java.util.Arrays; - -/** - * A wrapper around a byte[] that uses structural, value-based - * equality rather than byte[]'s normal object identity. - */ -public class StructuralByteArray { - byte[] value; - - public StructuralByteArray(byte[] value) { - this.value = value; - } - - public byte[] getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (o instanceof StructuralByteArray) { - StructuralByteArray that = (StructuralByteArray) o; - return Arrays.equals(this.value, that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(value); - } - - @Override - public String toString() { - return "base64:" + encodeBase64String(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java deleted file mode 100644 index bed88b0..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.api.services.bigquery.model.TableRow; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. - */ -public class TableRowJsonCoder extends AtomicCoder<TableRow> { - - @JsonCreator - public static TableRowJsonCoder of() { - return INSTANCE; - } - - @Override - public void encode(TableRow value, OutputStream outStream, Context context) - throws IOException { - String strValue = MAPPER.writeValueAsString(value); - StringUtf8Coder.of().encode(strValue, outStream, context); - } - - @Override - public TableRow decode(InputStream inStream, Context context) - throws IOException { - String strValue = StringUtf8Coder.of().decode(inStream, context); - return MAPPER.readValue(strValue, TableRow.class); - } - - @Override - protected long getEncodedElementByteSize(TableRow value, Context context) - throws Exception { - String strValue = MAPPER.writeValueAsString(value); - return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in - // TableRow. - private static final ObjectMapper MAPPER = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - - private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder(); - - private TableRowJsonCoder() { } - - /** - * {@inheritDoc} - * - * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary - * {@link Object} instances, which makes the encoding non-deterministic. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "TableCell can hold arbitrary instances, which may be non-deterministic."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java deleted file mode 100644 index 9250c68..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of - * their textual, decimal, representation. - */ -public class TextualIntegerCoder extends AtomicCoder<Integer> { - - @JsonCreator - public static TextualIntegerCoder of() { - return new TextualIntegerCoder(); - } - - ///////////////////////////////////////////////////////////////////////////// - - protected TextualIntegerCoder() {} - - @Override - public void encode(Integer value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - String textualValue = value.toString(); - StringUtf8Coder.of().encode(textualValue, outStream, context); - } - - @Override - public Integer decode(InputStream inStream, Context context) - throws IOException, CoderException { - String textualValue = StringUtf8Coder.of().decode(inStream, context); - try { - return Integer.valueOf(textualValue); - } catch (NumberFormatException exn) { - throw new CoderException("error when decoding a textual integer", exn); - } - } - - @Override - protected long getEncodedElementByteSize(Integer value, Context context) throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - String textualValue = value.toString(); - return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java deleted file mode 100644 index 18ec250..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.VarInt; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative - * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for - * integers that are known to often be large or negative. - */ -public class VarIntCoder extends AtomicCoder<Integer> { - - @JsonCreator - public static VarIntCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final VarIntCoder INSTANCE = - new VarIntCoder(); - - private VarIntCoder() {} - - @Override - public void encode(Integer value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - VarInt.encode(value.intValue(), outStream); - } - - @Override - public Integer decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - return VarInt.decodeInt(inStream); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VarIntCoder} is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(Integer value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Integer"); - } - return VarInt.getLength(value.longValue()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java deleted file mode 100644 index 520245e..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import com.google.cloud.dataflow.sdk.util.VarInt; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative - * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for - * longs that are known to often be large or negative. - */ -public class VarLongCoder extends AtomicCoder<Long> { - - @JsonCreator - public static VarLongCoder of() { - return INSTANCE; - } - - ///////////////////////////////////////////////////////////////////////////// - - private static final VarLongCoder INSTANCE = new VarLongCoder(); - - private VarLongCoder() {} - - @Override - public void encode(Long value, OutputStream outStream, Context context) - throws IOException, CoderException { - if (value == null) { - throw new CoderException("cannot encode a null Long"); - } - VarInt.encode(value.longValue(), outStream); - } - - @Override - public Long decode(InputStream inStream, Context context) - throws IOException, CoderException { - try { - return VarInt.decodeLong(inStream); - } catch (EOFException | UTFDataFormatException exn) { - // These exceptions correspond to decoding problems, so change - // what kind of exception they're branded as. - throw new CoderException(exn); - } - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link VarLongCoder} is injective. - */ - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * {@inheritDoc} - * - * @return {@code true}. {@link #getEncodedElementByteSize} is cheap. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(Long value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(Long value, Context context) - throws Exception { - if (value == null) { - throw new CoderException("cannot encode a null Long"); - } - return VarInt.getLength(value.longValue()); - } -}
