http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java index 0befa88..3cc5c24 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java @@ -38,113 +38,113 @@ import java.util.List; */ @SuppressWarnings("serial") public class UnionCoder extends StandardCoder<RawUnionValue> { - // TODO: Think about how to integrate this with a schema object (i.e. - // a tuple of tuple tags). - /** - * Builds a union coder with the given list of element coders. This list - * corresponds to a mapping of union tag to Coder. Union tags start at 0. - */ - public static UnionCoder of(List<Coder<?>> elementCoders) { - return new UnionCoder(elementCoders); - } - - @JsonCreator - public static UnionCoder jsonOf( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> elements) { - return UnionCoder.of(elements); - } - - private int getIndexForEncoding(RawUnionValue union) { - if (union == null) { - throw new IllegalArgumentException("cannot encode a null tagged union"); - } - int index = union.getUnionTag(); - if (index < 0 || index >= elementCoders.size()) { - throw new IllegalArgumentException( - "union value index " + index + " not in range [0.." + - (elementCoders.size() - 1) + "]"); - } - return index; - } - - @SuppressWarnings("unchecked") - @Override - public void encode( - RawUnionValue union, - OutputStream outStream, - Context context) - throws IOException { - int index = getIndexForEncoding(union); - // Write out the union tag. - VarInt.encode(index, outStream); - - // Write out the actual value. - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - coder.encode( - union.getValue(), - outStream, - context); - } - - @Override - public RawUnionValue decode(InputStream inStream, Context context) - throws IOException { - int index = VarInt.decodeInt(inStream); - Object value = elementCoders.get(index).decode(inStream, context); - return new RawUnionValue(index, value); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public List<? extends Coder<?>> getComponents() { - return elementCoders; - } - - /** - * Since this coder uses elementCoders.get(index) and coders that are known to run in constant - * time, we defer the return value to that coder. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { - int index = getIndexForEncoding(union); - @SuppressWarnings("unchecked") - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); - } - - /** - * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. - */ - @Override - public void registerByteSizeObserver( - RawUnionValue union, ElementByteSizeObserver observer, Context context) - throws Exception { - int index = getIndexForEncoding(union); - // Write out the union tag. - observer.update(VarInt.getLength(index)); - // Write out the actual value. - @SuppressWarnings("unchecked") - Coder<Object> coder = (Coder<Object>) elementCoders.get(index); - coder.registerByteSizeObserver(union.getValue(), observer, context); - } - - ///////////////////////////////////////////////////////////////////////////// - - private final List<Coder<?>> elementCoders; - - private UnionCoder(List<Coder<?>> elementCoders) { - this.elementCoders = elementCoders; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - "UnionCoder is only deterministic if all element coders are", - elementCoders); - } + // TODO: Think about how to integrate this with a schema object (i.e. + // a tuple of tuple tags). + /** + * Builds a union coder with the given list of element coders. This list + * corresponds to a mapping of union tag to Coder. Union tags start at 0. + */ + public static UnionCoder of(List<Coder<?>> elementCoders) { + return new UnionCoder(elementCoders); + } + + @JsonCreator + public static UnionCoder jsonOf( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> elements) { + return UnionCoder.of(elements); + } + + private int getIndexForEncoding(RawUnionValue union) { + if (union == null) { + throw new IllegalArgumentException("cannot encode a null tagged union"); + } + int index = union.getUnionTag(); + if (index < 0 || index >= elementCoders.size()) { + throw new IllegalArgumentException( + "union value index " + index + " not in range [0.." + + (elementCoders.size() - 1) + "]"); + } + return index; + } + + @SuppressWarnings("unchecked") + @Override + public void encode( + RawUnionValue union, + OutputStream outStream, + Context context) + throws IOException { + int index = getIndexForEncoding(union); + // Write out the union tag. + VarInt.encode(index, outStream); + + // Write out the actual value. + Coder<Object> coder = (Coder<Object>) elementCoders.get(index); + coder.encode( + union.getValue(), + outStream, + context); + } + + @Override + public RawUnionValue decode(InputStream inStream, Context context) + throws IOException { + int index = VarInt.decodeInt(inStream); + Object value = elementCoders.get(index).decode(inStream, context); + return new RawUnionValue(index, value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return null; + } + + @Override + public List<? extends Coder<?>> getComponents() { + return elementCoders; + } + + /** + * Since this coder uses elementCoders.get(index) and coders that are known to run in constant + * time, we defer the return value to that coder. + */ + @Override + public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) { + int index = getIndexForEncoding(union); + @SuppressWarnings("unchecked") + Coder<Object> coder = (Coder<Object>) elementCoders.get(index); + return coder.isRegisterByteSizeObserverCheap(union.getValue(), context); + } + + /** + * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. + */ + @Override + public void registerByteSizeObserver( + RawUnionValue union, ElementByteSizeObserver observer, Context context) + throws Exception { + int index = getIndexForEncoding(union); + // Write out the union tag. + observer.update(VarInt.getLength(index)); + // Write out the actual value. + @SuppressWarnings("unchecked") + Coder<Object> coder = (Coder<Object>) elementCoders.get(index); + coder.registerByteSizeObserver(union.getValue(), observer, context); + } + + ///////////////////////////////////////////////////////////////////////////// + + private final List<Coder<?>> elementCoders; + + private UnionCoder(List<Coder<?>> elementCoders) { + this.elementCoders = elementCoders; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + "UnionCoder is only deterministic if all element coders are", + elementCoders); + } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java index e433589..b402f7c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java @@ -32,185 +32,185 @@ import java.io.ObjectInputStream; */ public class CoderComparator<T> extends TypeComparator<T> { - private Coder<T> coder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - public CoderComparator(Coder<T> coder) { - this.coder = coder; - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - } - - @Override - public int hash(T record) { - return record.hashCode(); - } - - @Override - public void setReference(T toCompare) { - referenceBuffer.reset(); - try { - coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(T candidate) { - try { - buffer2.reset(); - coder.encode(candidate, buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator<T> other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - @Override - public int compare(T first, T second) { - try { - buffer1.reset(); - buffer2.reset(); - coder.encode(first, buffer1, Coder.Context.OUTER); - coder.encode(second, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare: ", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder); - T first = serializer.deserialize(firstSource); - T second = serializer.deserialize(secondSource); - return compare(first, second); - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - coder.encode(record, buffer1, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - target.put(offset, data, 0, Math.min(numBytes, buffer1.size())); - - offset += buffer1.size(); - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator<T> duplicate() { - return new CoderComparator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - target[index] = record; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] { this.duplicate() }; - } + private Coder<T> coder; + + // We use these for internal encoding/decoding for creating copies and comparing + // serialized forms using a Coder + private transient InspectableByteArrayOutputStream buffer1; + private transient InspectableByteArrayOutputStream buffer2; + + // For storing the Reference in encoded form + private transient InspectableByteArrayOutputStream referenceBuffer; + + public CoderComparator(Coder<T> coder) { + this.coder = coder; + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + } + + @Override + public int hash(T record) { + return record.hashCode(); + } + + @Override + public void setReference(T toCompare) { + referenceBuffer.reset(); + try { + coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not set reference " + toCompare + ": " + e); + } + } + + @Override + public boolean equalToReference(T candidate) { + try { + buffer2.reset(); + coder.encode(candidate, buffer2, Coder.Context.OUTER); + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (referenceBuffer.size() != buffer2.size()) { + return false; + } + int len = buffer2.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareToReference(TypeComparator<T> other) { + InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer; + + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = otherReferenceBuffer.getBuffer(); + if (referenceBuffer.size() != otherReferenceBuffer.size()) { + return referenceBuffer.size() - otherReferenceBuffer.size(); + } + int len = referenceBuffer.size(); + for (int i = 0; i < len; i++) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } + + @Override + public int compare(T first, T second) { + try { + buffer1.reset(); + buffer2.reset(); + coder.encode(first, buffer1, Coder.Context.OUTER); + coder.encode(second, buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare: ", e); + } + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder); + T first = serializer.deserialize(firstSource); + T second = serializer.deserialize(secondSource); + return compare(first, second); + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + buffer1.reset(); + try { + coder.encode(record, buffer1, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); + } + final byte[] data = buffer1.getBuffer(); + final int limit = offset + numBytes; + + target.put(offset, data, 0, Math.min(numBytes, buffer1.size())); + + offset += buffer1.size(); + + while (offset < limit) { + target.put(offset++, (byte) 0); + } + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator<T> duplicate() { + return new CoderComparator<>(coder); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return new TypeComparator[] { this.duplicate() }; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java index dd9c5f6..ae4309e 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java @@ -32,85 +32,85 @@ import com.google.common.base.Preconditions; */ public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> { - private final Coder<T> coder; - - public CoderTypeInformation(Coder<T> coder) { - Preconditions.checkNotNull(coder); - this.coder = coder; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - @SuppressWarnings("unchecked") - public Class<T> getTypeClass() { - // We don't have the Class, so we have to pass null here. What a shame... - return (Class<T>) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer<T> createSerializer(ExecutionConfig config) { - if (coder instanceof VoidCoder) { - return (TypeSerializer<T>) new VoidCoderTypeSerializer(); - } - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeInformation that = (CoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeInformation; - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig - executionConfig) { - return new CoderComparator<>(coder); - } + private final Coder<T> coder; + + public CoderTypeInformation(Coder<T> coder) { + Preconditions.checkNotNull(coder); + this.coder = coder; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + @SuppressWarnings("unchecked") + public Class<T> getTypeClass() { + // We don't have the Class, so we have to pass null here. What a shame... + return (Class<T>) Object.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + if (coder instanceof VoidCoder) { + return (TypeSerializer<T>) new VoidCoderTypeSerializer(); + } + return new CoderTypeSerializer<>(coder); + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CoderTypeInformation that = (CoderTypeInformation) o; + + return coder.equals(that.coder); + + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeInformation; + } + + @Override + public String toString() { + return "CoderTypeInformation{" + + "coder=" + coder + + '}'; + } + + @Override + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig + executionConfig) { + return new CoderComparator<>(coder); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java index f739397..6ed661c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java @@ -35,118 +35,118 @@ import java.io.ObjectInputStream; * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s */ public class CoderTypeSerializer<T> extends TypeSerializer<T> { - - private Coder<T> coder; - private transient DataInputViewWrapper inputWrapper; - private transient DataOutputViewWrapper outputWrapper; - - // We use this for internal encoding/decoding for creating copies using the Coder. - private transient InspectableByteArrayOutputStream buffer; - - public CoderTypeSerializer(Coder<T> coder) { - this.coder = coder; - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.inputWrapper = new DataInputViewWrapper(null); - this.outputWrapper = new DataOutputViewWrapper(null); - - buffer = new InspectableByteArrayOutputStream(); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public CoderTypeSerializer<T> duplicate() { - return new CoderTypeSerializer<>(coder); - } - - @Override - public T createInstance() { - return null; - } - - @Override - public T copy(T t) { - buffer.reset(); - try { - coder.encode(t, buffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - try { - return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer - .size()), Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not copy.", e); - } - } - - @Override - public T copy(T t, T reuse) { - return copy(t); - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(T t, DataOutputView dataOutputView) throws IOException { - outputWrapper.setOutputView(dataOutputView); - coder.encode(t, outputWrapper, Coder.Context.NESTED); - } - - @Override - public T deserialize(DataInputView dataInputView) throws IOException { - try { - inputWrapper.setInputView(dataInputView); - return coder.decode(inputWrapper, Coder.Context.NESTED); - } catch (CoderException e) { - Throwable cause = e.getCause(); - if (cause instanceof EOFException) { - throw (EOFException) cause; - } else { - throw e; - } - } - } - - @Override - public T deserialize(T t, DataInputView dataInputView) throws IOException { - return deserialize(dataInputView); - } - - @Override - public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { - serialize(deserialize(dataInputView), dataOutputView); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - CoderTypeSerializer that = (CoderTypeSerializer) o; - return coder.equals(that.coder); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof CoderTypeSerializer; - } - - @Override - public int hashCode() { - return coder.hashCode(); - } + + private Coder<T> coder; + private transient DataInputViewWrapper inputWrapper; + private transient DataOutputViewWrapper outputWrapper; + + // We use this for internal encoding/decoding for creating copies using the Coder. + private transient InspectableByteArrayOutputStream buffer; + + public CoderTypeSerializer(Coder<T> coder) { + this.coder = coder; + this.inputWrapper = new DataInputViewWrapper(null); + this.outputWrapper = new DataOutputViewWrapper(null); + + buffer = new InspectableByteArrayOutputStream(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.inputWrapper = new DataInputViewWrapper(null); + this.outputWrapper = new DataOutputViewWrapper(null); + + buffer = new InspectableByteArrayOutputStream(); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public CoderTypeSerializer<T> duplicate() { + return new CoderTypeSerializer<>(coder); + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T t) { + buffer.reset(); + try { + coder.encode(t, buffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not copy.", e); + } + try { + return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer + .size()), Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not copy.", e); + } + } + + @Override + public T copy(T t, T reuse) { + return copy(t); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(T t, DataOutputView dataOutputView) throws IOException { + outputWrapper.setOutputView(dataOutputView); + coder.encode(t, outputWrapper, Coder.Context.NESTED); + } + + @Override + public T deserialize(DataInputView dataInputView) throws IOException { + try { + inputWrapper.setInputView(dataInputView); + return coder.decode(inputWrapper, Coder.Context.NESTED); + } catch (CoderException e) { + Throwable cause = e.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } else { + throw e; + } + } + } + + @Override + public T deserialize(T t, DataInputView dataInputView) throws IOException { + return deserialize(dataInputView); + } + + @Override + public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException { + serialize(deserialize(dataInputView), dataOutputView); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CoderTypeSerializer that = (CoderTypeSerializer) o; + return coder.equals(that.coder); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeSerializer; + } + + @Override + public int hashCode() { + return coder.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java index 5d918cc..be6eadd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java @@ -25,10 +25,10 @@ import java.io.ByteArrayOutputStream; */ public class InspectableByteArrayOutputStream extends ByteArrayOutputStream { - /** - * Get the underlying byte array. - */ - public byte[] getBuffer() { - return buf; - } + /** + * Get the underlying byte array. + */ + public byte[] getBuffer() { + return buf; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java index 815569d..ba09ea9 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java @@ -35,230 +35,230 @@ import java.io.ObjectInputStream; * for {@link KV} that always compares on the key only. */ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { - - private KvCoder<K, V> coder; - private Coder<K> keyCoder; - - // We use these for internal encoding/decoding for creating copies and comparing - // serialized forms using a Coder - private transient InspectableByteArrayOutputStream buffer1; - private transient InspectableByteArrayOutputStream buffer2; - - // For storing the Reference in encoded form - private transient InspectableByteArrayOutputStream referenceBuffer; - - - // For deserializing the key - private transient DataInputViewWrapper inputWrapper; - - public KvCoderComperator(KvCoder<K, V> coder) { - this.coder = coder; - this.keyCoder = coder.getKeyCoder(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - buffer1 = new InspectableByteArrayOutputStream(); - buffer2 = new InspectableByteArrayOutputStream(); - referenceBuffer = new InspectableByteArrayOutputStream(); - - inputWrapper = new DataInputViewWrapper(null); - } - - @Override - public int hash(KV<K, V> record) { - K key = record.getKey(); - if (key != null) { - return key.hashCode(); - } else { - return 0; - } - } - - @Override - public void setReference(KV<K, V> toCompare) { - referenceBuffer.reset(); - try { - keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); - } catch (IOException e) { - throw new RuntimeException("Could not set reference " + toCompare + ": " + e); - } - } - - @Override - public boolean equalToReference(KV<K, V> candidate) { - try { - buffer2.reset(); - keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (referenceBuffer.size() != buffer2.size()) { - return false; - } - int len = buffer2.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareToReference(TypeComparator<KV<K, V>> other) { - InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer; - - byte[] arr = referenceBuffer.getBuffer(); - byte[] arrOther = otherReferenceBuffer.getBuffer(); - if (referenceBuffer.size() != otherReferenceBuffer.size()) { - return referenceBuffer.size() - otherReferenceBuffer.size(); - } - int len = referenceBuffer.size(); - for (int i = 0; i < len; i++) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } - - - @Override - public int compare(KV<K, V> first, KV<K, V> second) { - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); - keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { - - inputWrapper.setInputView(firstSource); - K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - inputWrapper.setInputView(secondSource); - K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); - - try { - buffer1.reset(); - buffer2.reset(); - keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); - keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); - byte[] arr = buffer1.getBuffer(); - byte[] arrOther = buffer2.getBuffer(); - if (buffer1.size() != buffer2.size()) { - return buffer1.size() - buffer2.size(); - } - int len = buffer1.size(); - for(int i = 0; i < len; i++ ) { - if (arr[i] != arrOther[i]) { - return arr[i] - arrOther[i]; - } - } - return 0; - } catch (IOException e) { - throw new RuntimeException("Could not compare reference.", e); - } - } - - @Override - public boolean supportsNormalizedKey() { - return true; - } - - @Override - public boolean supportsSerializationWithKeyNormalization() { - return false; - } - - @Override - public int getNormalizeKeyLen() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isNormalizedKeyPrefixOnly(int keyBytes) { - return true; - } - - @Override - public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) { - buffer1.reset(); - try { - keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); - } catch (IOException e) { - throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); - } - final byte[] data = buffer1.getBuffer(); - final int limit = offset + numBytes; - - int numBytesPut = Math.min(numBytes, buffer1.size()); - - target.put(offset, data, 0, numBytesPut); - - offset += numBytesPut; - - while (offset < limit) { - target.put(offset++, (byte) 0); - } - } - - @Override - public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean invertNormalizedKey() { - return false; - } - - @Override - public TypeComparator<KV<K, V>> duplicate() { - return new KvCoderComperator<>(coder); - } - - @Override - public int extractKeys(Object record, Object[] target, int index) { - KV<K, V> kv = (KV<K, V>) record; - K k = kv.getKey(); - target[index] = k; - return 1; - } - - @Override - public TypeComparator[] getFlatComparators() { - return new TypeComparator[] {new CoderComparator<>(keyCoder)}; - } + + private KvCoder<K, V> coder; + private Coder<K> keyCoder; + + // We use these for internal encoding/decoding for creating copies and comparing + // serialized forms using a Coder + private transient InspectableByteArrayOutputStream buffer1; + private transient InspectableByteArrayOutputStream buffer2; + + // For storing the Reference in encoded form + private transient InspectableByteArrayOutputStream referenceBuffer; + + + // For deserializing the key + private transient DataInputViewWrapper inputWrapper; + + public KvCoderComperator(KvCoder<K, V> coder) { + this.coder = coder; + this.keyCoder = coder.getKeyCoder(); + + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + + inputWrapper = new DataInputViewWrapper(null); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + buffer1 = new InspectableByteArrayOutputStream(); + buffer2 = new InspectableByteArrayOutputStream(); + referenceBuffer = new InspectableByteArrayOutputStream(); + + inputWrapper = new DataInputViewWrapper(null); + } + + @Override + public int hash(KV<K, V> record) { + K key = record.getKey(); + if (key != null) { + return key.hashCode(); + } else { + return 0; + } + } + + @Override + public void setReference(KV<K, V> toCompare) { + referenceBuffer.reset(); + try { + keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER); + } catch (IOException e) { + throw new RuntimeException("Could not set reference " + toCompare + ": " + e); + } + } + + @Override + public boolean equalToReference(KV<K, V> candidate) { + try { + buffer2.reset(); + keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER); + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (referenceBuffer.size() != buffer2.size()) { + return false; + } + int len = buffer2.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareToReference(TypeComparator<KV<K, V>> other) { + InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer; + + byte[] arr = referenceBuffer.getBuffer(); + byte[] arrOther = otherReferenceBuffer.getBuffer(); + if (referenceBuffer.size() != otherReferenceBuffer.size()) { + return referenceBuffer.size() - otherReferenceBuffer.size(); + } + int len = referenceBuffer.size(); + for (int i = 0; i < len; i++) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } + + + @Override + public int compare(KV<K, V> first, KV<K, V> second) { + try { + buffer1.reset(); + buffer2.reset(); + keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER); + keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + + inputWrapper.setInputView(firstSource); + K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); + inputWrapper.setInputView(secondSource); + K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED); + + try { + buffer1.reset(); + buffer2.reset(); + keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER); + keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER); + byte[] arr = buffer1.getBuffer(); + byte[] arrOther = buffer2.getBuffer(); + if (buffer1.size() != buffer2.size()) { + return buffer1.size() - buffer2.size(); + } + int len = buffer1.size(); + for(int i = 0; i < len; i++ ) { + if (arr[i] != arrOther[i]) { + return arr[i] - arrOther[i]; + } + } + return 0; + } catch (IOException e) { + throw new RuntimeException("Could not compare reference.", e); + } + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + @Override + public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) { + buffer1.reset(); + try { + keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED); + } catch (IOException e) { + throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e); + } + final byte[] data = buffer1.getBuffer(); + final int limit = offset + numBytes; + + int numBytesPut = Math.min(numBytes, buffer1.size()); + + target.put(offset, data, 0, numBytesPut); + + offset += numBytesPut; + + while (offset < limit) { + target.put(offset++, (byte) 0); + } + } + + @Override + public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator<KV<K, V>> duplicate() { + return new KvCoderComperator<>(coder); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + KV<K, V> kv = (KV<K, V>) record; + K k = kv.getKey(); + target[index] = k; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return new TypeComparator[] {new CoderComparator<>(keyCoder)}; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java index 090f79d..be11918 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java @@ -34,153 +34,153 @@ import java.util.List; */ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { - private KvCoder<K, V> coder; - - // We don't have the Class, so we have to pass null here. What a shame... - private static Object DUMMY = new Object(); - - @SuppressWarnings("unchecked") - public KvCoderTypeInformation(KvCoder<K, V> coder) { - super(((Class<KV<K,V>>) DUMMY.getClass())); - this.coder = coder; - Preconditions.checkNotNull(coder); - } - - @Override - @SuppressWarnings("unchecked") - public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { - return new KvCoderComperator((KvCoder) coder); - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 2; - } - - @Override - @SuppressWarnings("unchecked") - public Class<KV<K, V>> getTypeClass() { - return privateGetTypeClass(); - } - - @SuppressWarnings("unchecked") - private static <X> Class<X> privateGetTypeClass() { - return (Class<X>) Object.class; - } - - @Override - public boolean isKeyType() { - return true; - } - - @Override - @SuppressWarnings("unchecked") - public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) { - return new CoderTypeSerializer<>(coder); - } - - @Override - public int getTotalFields() { - return 2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - KvCoderTypeInformation that = (KvCoderTypeInformation) o; - - return coder.equals(that.coder); - - } - - @Override - public int hashCode() { - return coder.hashCode(); - } - - @Override - public String toString() { - return "CoderTypeInformation{" + - "coder=" + coder + - '}'; - } - - @Override - @SuppressWarnings("unchecked") - public <X> TypeInformation<X> getTypeAt(int pos) { - if (pos == 0) { - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); - } else if (pos == 1) { - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); - } else { - throw new RuntimeException("Invalid field position " + pos); - } - } - - @Override - @SuppressWarnings("unchecked") - public <X> TypeInformation<X> getTypeAt(String fieldExpression) { - switch (fieldExpression) { - case "key": - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); - case "value": - return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); - default: - throw new UnsupportedOperationException("Only KvCoder has fields."); - } - } - - @Override - public String[] getFieldNames() { - return new String[]{"key", "value"}; - } - - @Override - public int getFieldIndex(String fieldName) { - switch (fieldName) { - case "key": - return 0; - case "value": - return 1; - default: - return -1; - } - } - - @Override - public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { - CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); - result.add(new FlatFieldDescriptor(0, keyTypeInfo)); - } - - @Override - protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() { - return new KvCoderTypeComparatorBuilder(); - } - - private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> { - - @Override - public void initializeTypeComparatorBuilder(int size) {} - - @Override - public void addComparatorField(int fieldId, TypeComparator<?> comparator) {} - - @Override - public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) { - return new KvCoderComperator<>(coder); - } - } + private KvCoder<K, V> coder; + + // We don't have the Class, so we have to pass null here. What a shame... + private static Object DUMMY = new Object(); + + @SuppressWarnings("unchecked") + public KvCoderTypeInformation(KvCoder<K, V> coder) { + super(((Class<KV<K,V>>) DUMMY.getClass())); + this.coder = coder; + Preconditions.checkNotNull(coder); + } + + @Override + @SuppressWarnings("unchecked") + public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) { + return new KvCoderComperator((KvCoder) coder); + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 2; + } + + @Override + @SuppressWarnings("unchecked") + public Class<KV<K, V>> getTypeClass() { + return privateGetTypeClass(); + } + + @SuppressWarnings("unchecked") + private static <X> Class<X> privateGetTypeClass() { + return (Class<X>) Object.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) { + return new CoderTypeSerializer<>(coder); + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KvCoderTypeInformation that = (KvCoderTypeInformation) o; + + return coder.equals(that.coder); + + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public String toString() { + return "CoderTypeInformation{" + + "coder=" + coder + + '}'; + } + + @Override + @SuppressWarnings("unchecked") + public <X> TypeInformation<X> getTypeAt(int pos) { + if (pos == 0) { + return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); + } else if (pos == 1) { + return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); + } else { + throw new RuntimeException("Invalid field position " + pos); + } + } + + @Override + @SuppressWarnings("unchecked") + public <X> TypeInformation<X> getTypeAt(String fieldExpression) { + switch (fieldExpression) { + case "key": + return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder()); + case "value": + return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder()); + default: + throw new UnsupportedOperationException("Only KvCoder has fields."); + } + } + + @Override + public String[] getFieldNames() { + return new String[]{"key", "value"}; + } + + @Override + public int getFieldIndex(String fieldName) { + switch (fieldName) { + case "key": + return 0; + case "value": + return 1; + default: + return -1; + } + } + + @Override + public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) { + CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder()); + result.add(new FlatFieldDescriptor(0, keyTypeInfo)); + } + + @Override + protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() { + return new KvCoderTypeComparatorBuilder(); + } + + private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> { + + @Override + public void initializeTypeComparatorBuilder(int size) {} + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) {} + + @Override + public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) { + return new KvCoderComperator<>(coder); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java index 7ce484a..190d898 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java @@ -31,82 +31,82 @@ import java.io.IOException; */ public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> { - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public VoidCoderTypeSerializer duplicate() { - return this; - } - - @Override - public VoidValue createInstance() { - return VoidValue.INSTANCE; - } - - @Override - public VoidValue copy(VoidValue from) { - return from; - } - - @Override - public VoidValue copy(VoidValue from, VoidValue reuse) { - return from; - } - - @Override - public int getLength() { - return 0; - } - - @Override - public void serialize(VoidValue record, DataOutputView target) throws IOException { - target.writeByte(1); - } - - @Override - public VoidValue deserialize(DataInputView source) throws IOException { - source.readByte(); - return VoidValue.INSTANCE; - } - - @Override - public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - source.readByte(); - target.writeByte(1); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof VoidCoderTypeSerializer) { - VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; - return other.canEqual(this); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof VoidCoderTypeSerializer; - } - - @Override - public int hashCode() { - return 0; - } - - public static class VoidValue { - private VoidValue() {} - - public static VoidValue INSTANCE = new VoidValue(); - } + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public VoidCoderTypeSerializer duplicate() { + return this; + } + + @Override + public VoidValue createInstance() { + return VoidValue.INSTANCE; + } + + @Override + public VoidValue copy(VoidValue from) { + return from; + } + + @Override + public VoidValue copy(VoidValue from, VoidValue reuse) { + return from; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(VoidValue record, DataOutputView target) throws IOException { + target.writeByte(1); + } + + @Override + public VoidValue deserialize(DataInputView source) throws IOException { + source.readByte(); + return VoidValue.INSTANCE; + } + + @Override + public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + source.readByte(); + target.writeByte(1); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof VoidCoderTypeSerializer) { + VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj; + return other.canEqual(this); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof VoidCoderTypeSerializer; + } + + @Override + public int hashCode() { + return 0; + } + + public static class VoidValue { + private VoidValue() {} + + public static VoidValue INSTANCE = new VoidValue(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java index 924b297..8f6d67c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java @@ -31,62 +31,62 @@ import java.io.Serializable; * operation. */ public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> { - - private AA aa; - private Combine.CombineFn<? super AI, AA, AR> combiner; + + private AA aa; + private Combine.CombineFn<? super AI, AA, AR> combiner; - public CombineFnAggregatorWrapper() { - } + public CombineFnAggregatorWrapper() { + } - public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) { - this.combiner = combiner; - this.aa = combiner.createAccumulator(); - } + public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) { + this.combiner = combiner; + this.aa = combiner.createAccumulator(); + } - @Override - public void add(AI value) { - combiner.addInput(aa, value); - } + @Override + public void add(AI value) { + combiner.addInput(aa, value); + } - @Override - public Serializable getLocalValue() { - return (Serializable) combiner.extractOutput(aa); - } + @Override + public Serializable getLocalValue() { + return (Serializable) combiner.extractOutput(aa); + } - @Override - public void resetLocal() { - aa = combiner.createAccumulator(); - } + @Override + public void resetLocal() { + aa = combiner.createAccumulator(); + } - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator<AI, Serializable> other) { - aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa)); - } + @Override + @SuppressWarnings("unchecked") + public void merge(Accumulator<AI, Serializable> other) { + aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa)); + } - @Override - public Accumulator<AI, Serializable> clone() { - // copy it by merging - AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); - CombineFnAggregatorWrapper<AI, AA, AR> result = new - CombineFnAggregatorWrapper<>(combiner); - result.aa = aaCopy; - return result; - } + @Override + public Accumulator<AI, Serializable> clone() { + // copy it by merging + AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa)); + CombineFnAggregatorWrapper<AI, AA, AR> result = new + CombineFnAggregatorWrapper<>(combiner); + result.aa = aaCopy; + return result; + } - @Override - public void addValue(AI value) { - add(value); - } + @Override + public void addValue(AI value) { + add(value); + } - @Override - public String getName() { - return "CombineFn: " + combiner.toString(); - } + @Override + public String getName() { + return "CombineFn: " + combiner.toString(); + } - @Override - public Combine.CombineFn getCombineFn() { - return combiner; - } + @Override + public Combine.CombineFn getCombineFn() { + return combiner; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java index 90582b0..3c96939 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java @@ -31,29 +31,29 @@ import java.io.InputStream; */ public class DataInputViewWrapper extends InputStream { - private DataInputView inputView; - - public DataInputViewWrapper(DataInputView inputView) { - this.inputView = inputView; - } - - public void setInputView(DataInputView inputView) { - this.inputView = inputView; - } - - @Override - public int read() throws IOException { - try { - return inputView.readUnsignedByte(); - } catch (EOFException e) { - // translate between DataInput and InputStream, - // DataInput signals EOF by exception, InputStream does it by returning -1 - return -1; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inputView.read(b, off, len); - } + private DataInputView inputView; + + public DataInputViewWrapper(DataInputView inputView) { + this.inputView = inputView; + } + + public void setInputView(DataInputView inputView) { + this.inputView = inputView; + } + + @Override + public int read() throws IOException { + try { + return inputView.readUnsignedByte(); + } catch (EOFException e) { + // translate between DataInput and InputStream, + // DataInput signals EOF by exception, InputStream does it by returning -1 + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputView.read(b, off, len); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java index 46df8e5..a222cdd 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java @@ -29,24 +29,24 @@ import java.io.OutputStream; * {@link java.io.OutputStream}. */ public class DataOutputViewWrapper extends OutputStream { - - private DataOutputView outputView; + + private DataOutputView outputView; - public DataOutputViewWrapper(DataOutputView outputView) { - this.outputView = outputView; - } + public DataOutputViewWrapper(DataOutputView outputView) { + this.outputView = outputView; + } - public void setOutputView(DataOutputView outputView) { - this.outputView = outputView; - } + public void setOutputView(DataOutputView outputView) { + this.outputView = outputView; + } - @Override - public void write(int b) throws IOException { - outputView.write(b); - } + @Override + public void write(int b) throws IOException { + outputView.write(b); + } - @Override - public void write(byte[] b, int off, int len) throws IOException { - outputView.write(b, off, len); - } + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputView.write(b, off, len); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java index 1c0dae4..c193a4d 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -33,59 +33,59 @@ import java.io.Serializable; */ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> { - private AO aa; - private Combine.CombineFn<AI, ?, AO> combiner; + private AO aa; + private Combine.CombineFn<AI, ?, AO> combiner; - public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) { - this.combiner = combiner; - resetLocal(); - } - - @Override - @SuppressWarnings("unchecked") - public void add(AI value) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); - } + public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) { + this.combiner = combiner; + resetLocal(); + } + + @Override + @SuppressWarnings("unchecked") + public void add(AI value) { + this.aa = combiner.apply(ImmutableList.of((AI) aa, value)); + } - @Override - public Serializable getLocalValue() { - return (Serializable) aa; - } + @Override + public Serializable getLocalValue() { + return (Serializable) aa; + } - @Override - public void resetLocal() { - this.aa = combiner.apply(ImmutableList.<AI>of()); - } + @Override + public void resetLocal() { + this.aa = combiner.apply(ImmutableList.<AI>of()); + } - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator<AI, Serializable> other) { - this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); - } + @Override + @SuppressWarnings("unchecked") + public void merge(Accumulator<AI, Serializable> other) { + this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue())); + } - @Override - public void addValue(AI value) { - add(value); - } + @Override + public void addValue(AI value) { + add(value); + } - @Override - public String getName() { - return "Aggregator :" + combiner.toString(); - } + @Override + public String getName() { + return "Aggregator :" + combiner.toString(); + } - @Override - public Combine.CombineFn<AI, ?, AO> getCombineFn() { - return combiner; - } + @Override + public Combine.CombineFn<AI, ?, AO> getCombineFn() { + return combiner; + } - @Override - public Accumulator<AI, Serializable> clone() { - // copy it by merging - AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); - SerializableFnAggregatorWrapper<AI, AO> result = new - SerializableFnAggregatorWrapper<>(combiner); + @Override + public Accumulator<AI, Serializable> clone() { + // copy it by merging + AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa)); + SerializableFnAggregatorWrapper<AI, AO> result = new + SerializableFnAggregatorWrapper<>(combiner); - result.aa = resultCopy; - return result; - } + result.aa = resultCopy; + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java index 8be9abf..3f28c16 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java @@ -38,84 +38,84 @@ import java.lang.reflect.Field; */ public class SinkOutputFormat<T> implements OutputFormat<T> { - private final Sink<T> sink; - - private transient PipelineOptions pipelineOptions; - - private Sink.WriteOperation<T, ?> writeOperation; - private Sink.Writer<T, ?> writer; - - private AbstractID uid = new AbstractID(); - - public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { - this.sink = extractSink(transform); - this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); - } - - private Sink<T> extractSink(Write.Bound<T> transform) { - // TODO possibly add a getter in the upstream - try { - Field sinkField = transform.getClass().getDeclaredField("sink"); - sinkField.setAccessible(true); - @SuppressWarnings("unchecked") - Sink<T> extractedSink = (Sink<T>) sinkField.get(transform); - return extractedSink; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not acquire custom sink field.", e); - } - } - - @Override - public void configure(Configuration configuration) { - writeOperation = sink.createWriteOperation(pipelineOptions); - try { - writeOperation.initialize(pipelineOptions); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize the write operation.", e); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - try { - writer = writeOperation.createWriter(pipelineOptions); - } catch (Exception e) { - throw new IOException("Couldn't create writer.", e); - } - try { - writer.open(uid + "-" + String.valueOf(taskNumber)); - } catch (Exception e) { - throw new IOException("Couldn't open writer.", e); - } - } - - @Override - public void writeRecord(T record) throws IOException { - try { - writer.write(record); - } catch (Exception e) { - throw new IOException("Couldn't write record.", e); - } - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (Exception e) { - throw new IOException("Couldn't close writer.", e); - } - } - - private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, pipelineOptions); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - pipelineOptions = mapper.readValue(in, PipelineOptions.class); - } + private final Sink<T> sink; + + private transient PipelineOptions pipelineOptions; + + private Sink.WriteOperation<T, ?> writeOperation; + private Sink.Writer<T, ?> writer; + + private AbstractID uid = new AbstractID(); + + public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { + this.sink = extractSink(transform); + this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions); + } + + private Sink<T> extractSink(Write.Bound<T> transform) { + // TODO possibly add a getter in the upstream + try { + Field sinkField = transform.getClass().getDeclaredField("sink"); + sinkField.setAccessible(true); + @SuppressWarnings("unchecked") + Sink<T> extractedSink = (Sink<T>) sinkField.get(transform); + return extractedSink; + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not acquire custom sink field.", e); + } + } + + @Override + public void configure(Configuration configuration) { + writeOperation = sink.createWriteOperation(pipelineOptions); + try { + writeOperation.initialize(pipelineOptions); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize the write operation.", e); + } + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + writer = writeOperation.createWriter(pipelineOptions); + } catch (Exception e) { + throw new IOException("Couldn't create writer.", e); + } + try { + writer.open(uid + "-" + String.valueOf(taskNumber)); + } catch (Exception e) { + throw new IOException("Couldn't open writer.", e); + } + } + + @Override + public void writeRecord(T record) throws IOException { + try { + writer.write(record); + } catch (Exception e) { + throw new IOException("Couldn't write record.", e); + } + } + + @Override + public void close() throws IOException { + try { + writer.close(); + } catch (Exception e) { + throw new IOException("Couldn't close writer.", e); + } + } + + private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, pipelineOptions); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + pipelineOptions = mapper.readValue(in, PipelineOptions.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java index 64dc072..5981618 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java @@ -41,124 +41,124 @@ import java.util.List; * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}. */ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> { - private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); - - private final BoundedSource<T> initialSource; - private transient PipelineOptions options; - - private BoundedSource.BoundedReader<T> reader = null; - private boolean reachedEnd = true; - - public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { - this.initialSource = initialSource; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void configure(Configuration configuration) {} - - @Override - public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { - reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); - reachedEnd = false; - } - - @Override - public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { - try { - final long estimatedSize = initialSource.getEstimatedSizeBytes(options); - - return new BaseStatistics() { - @Override - public long getTotalInputSize() { - return estimatedSize; - - } - - @Override - public long getNumberOfRecords() { - return BaseStatistics.NUM_RECORDS_UNKNOWN; - } - - @Override - public float getAverageRecordWidth() { - return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; - } - }; - } catch (Exception e) { - LOG.warn("Could not read Source statistics: {}", e); - } - - return null; - } - - @Override - @SuppressWarnings("unchecked") - public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { - long desiredSizeBytes; - try { - desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, - options); - List<SourceInputSplit<T>> splits = new ArrayList<>(); - int splitCount = 0; - for (Source<T> shard: shards) { - splits.add(new SourceInputSplit<>(shard, splitCount++)); - } - return splits.toArray(new SourceInputSplit[splits.size()]); - } catch (Exception e) { - throw new IOException("Could not create input splits from Source.", e); - } - } - - @Override - public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { - return new InputSplitAssigner() { - private int index = 0; - private final SourceInputSplit[] splits = sourceInputSplits; - @Override - public InputSplit getNextInputSplit(String host, int taskId) { - if (index < splits.length) { - return splits[index++]; - } else { - return null; - } - } - }; - } - - - @Override - public boolean reachedEnd() throws IOException { - return reachedEnd; - } - - @Override - public T nextRecord(T t) throws IOException { - - reachedEnd = !reader.advance(); - if (!reachedEnd) { - return reader.getCurrent(); - } - return null; - } - - @Override - public void close() throws IOException { - reader.close(); - } + private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); + + private final BoundedSource<T> initialSource; + private transient PipelineOptions options; + + private BoundedSource.BoundedReader<T> reader = null; + private boolean reachedEnd = true; + + public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { + this.initialSource = initialSource; + this.options = options; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, options); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + options = mapper.readValue(in, PipelineOptions.class); + } + + @Override + public void configure(Configuration configuration) {} + + @Override + public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { + reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); + reachedEnd = false; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException { + try { + final long estimatedSize = initialSource.getEstimatedSizeBytes(options); + + return new BaseStatistics() { + @Override + public long getTotalInputSize() { + return estimatedSize; + + } + + @Override + public long getNumberOfRecords() { + return BaseStatistics.NUM_RECORDS_UNKNOWN; + } + + @Override + public float getAverageRecordWidth() { + return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN; + } + }; + } catch (Exception e) { + LOG.warn("Could not read Source statistics: {}", e); + } + + return null; + } + + @Override + @SuppressWarnings("unchecked") + public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { + long desiredSizeBytes; + try { + desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, + options); + List<SourceInputSplit<T>> splits = new ArrayList<>(); + int splitCount = 0; + for (Source<T> shard: shards) { + splits.add(new SourceInputSplit<>(shard, splitCount++)); + } + return splits.toArray(new SourceInputSplit[splits.size()]); + } catch (Exception e) { + throw new IOException("Could not create input splits from Source.", e); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) { + return new InputSplitAssigner() { + private int index = 0; + private final SourceInputSplit[] splits = sourceInputSplits; + @Override + public InputSplit getNextInputSplit(String host, int taskId) { + if (index < splits.length) { + return splits[index++]; + } else { + return null; + } + } + }; + } + + + @Override + public boolean reachedEnd() throws IOException { + return reachedEnd; + } + + @Override + public T nextRecord(T t) throws IOException { + + reachedEnd = !reader.advance(); + if (!reachedEnd) { + return reader.getCurrent(); + } + return null; + } + + @Override + public void close() throws IOException { + reader.close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java index 2b93ab7..86fdada 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java @@ -29,24 +29,24 @@ import org.apache.flink.core.io.InputSplit; */ public class SourceInputSplit<T> implements InputSplit { - private Source<T> source; - private int splitNumber; + private Source<T> source; + private int splitNumber; - public SourceInputSplit() { - } + public SourceInputSplit() { + } - public SourceInputSplit(Source<T> source, int splitNumber) { - this.source = source; - this.splitNumber = splitNumber; - } + public SourceInputSplit(Source<T> source, int splitNumber) { + this.source = source; + this.splitNumber = splitNumber; + } - @Override - public int getSplitNumber() { - return splitNumber; - } + @Override + public int getSplitNumber() { + return splitNumber; + } - public Source<T> getSource() { - return source; - } + public Source<T> getSource() { + return source; + } }
