http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
index 0befa88..3cc5c24 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
@@ -38,113 +38,113 @@ import java.util.List;
  */
 @SuppressWarnings("serial")
 public class UnionCoder extends StandardCoder<RawUnionValue> {
-       // TODO: Think about how to integrate this with a schema object (i.e.
-       // a tuple of tuple tags).
-       /**
-        * Builds a union coder with the given list of element coders.  This 
list
-        * corresponds to a mapping of union tag to Coder.  Union tags start at 
0.
-        */
-       public static UnionCoder of(List<Coder<?>> elementCoders) {
-               return new UnionCoder(elementCoders);
-       }
-
-       @JsonCreator
-       public static UnionCoder jsonOf(
-                       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-                       List<Coder<?>> elements) {
-               return UnionCoder.of(elements);
-       }
-
-       private int getIndexForEncoding(RawUnionValue union) {
-               if (union == null) {
-                       throw new IllegalArgumentException("cannot encode a 
null tagged union");
-               }
-               int index = union.getUnionTag();
-               if (index < 0 || index >= elementCoders.size()) {
-                       throw new IllegalArgumentException(
-                                       "union value index " + index + " not in 
range [0.." +
-                                                       (elementCoders.size() - 
1) + "]");
-               }
-               return index;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void encode(
-                       RawUnionValue union,
-                       OutputStream outStream,
-                       Context context)
-                       throws IOException  {
-               int index = getIndexForEncoding(union);
-               // Write out the union tag.
-               VarInt.encode(index, outStream);
-
-               // Write out the actual value.
-               Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-               coder.encode(
-                               union.getValue(),
-                               outStream,
-                               context);
-       }
-
-       @Override
-       public RawUnionValue decode(InputStream inStream, Context context)
-                       throws IOException {
-               int index = VarInt.decodeInt(inStream);
-               Object value = elementCoders.get(index).decode(inStream, 
context);
-               return new RawUnionValue(index, value);
-       }
-
-       @Override
-       public List<? extends Coder<?>> getCoderArguments() {
-               return null;
-       }
-
-       @Override
-       public List<? extends Coder<?>> getComponents() {
-               return elementCoders;
-       }
-
-       /**
-        * Since this coder uses elementCoders.get(index) and coders that are 
known to run in constant
-        * time, we defer the return value to that coder.
-        */
-       @Override
-       public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, 
Context context) {
-               int index = getIndexForEncoding(union);
-               @SuppressWarnings("unchecked")
-               Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-               return coder.isRegisterByteSizeObserverCheap(union.getValue(), 
context);
-       }
-
-       /**
-        * Notifies ElementByteSizeObserver about the byte size of the encoded 
value using this coder.
-        */
-       @Override
-       public void registerByteSizeObserver(
-                       RawUnionValue union, ElementByteSizeObserver observer, 
Context context)
-                       throws Exception {
-               int index = getIndexForEncoding(union);
-               // Write out the union tag.
-               observer.update(VarInt.getLength(index));
-               // Write out the actual value.
-               @SuppressWarnings("unchecked")
-               Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-               coder.registerByteSizeObserver(union.getValue(), observer, 
context);
-       }
-
-       
/////////////////////////////////////////////////////////////////////////////
-
-       private final List<Coder<?>> elementCoders;
-
-       private UnionCoder(List<Coder<?>> elementCoders) {
-               this.elementCoders = elementCoders;
-       }
-
-       @Override
-       public void verifyDeterministic() throws NonDeterministicException {
-               verifyDeterministic(
-                               "UnionCoder is only deterministic if all 
element coders are",
-                               elementCoders);
-       }
+  // TODO: Think about how to integrate this with a schema object (i.e.
+  // a tuple of tuple tags).
+  /**
+   * Builds a union coder with the given list of element coders.  This list
+   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
+   */
+  public static UnionCoder of(List<Coder<?>> elementCoders) {
+    return new UnionCoder(elementCoders);
+  }
+
+  @JsonCreator
+  public static UnionCoder jsonOf(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+      List<Coder<?>> elements) {
+    return UnionCoder.of(elements);
+  }
+
+  private int getIndexForEncoding(RawUnionValue union) {
+    if (union == null) {
+      throw new IllegalArgumentException("cannot encode a null tagged union");
+    }
+    int index = union.getUnionTag();
+    if (index < 0 || index >= elementCoders.size()) {
+      throw new IllegalArgumentException(
+          "union value index " + index + " not in range [0.." +
+              (elementCoders.size() - 1) + "]");
+    }
+    return index;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void encode(
+      RawUnionValue union,
+      OutputStream outStream,
+      Context context)
+      throws IOException  {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    VarInt.encode(index, outStream);
+
+    // Write out the actual value.
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.encode(
+        union.getValue(),
+        outStream,
+        context);
+  }
+
+  @Override
+  public RawUnionValue decode(InputStream inStream, Context context)
+      throws IOException {
+    int index = VarInt.decodeInt(inStream);
+    Object value = elementCoders.get(index).decode(inStream, context);
+    return new RawUnionValue(index, value);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getComponents() {
+    return elementCoders;
+  }
+
+  /**
+   * Since this coder uses elementCoders.get(index) and coders that are known 
to run in constant
+   * time, we defer the return value to that coder.
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context 
context) {
+    int index = getIndexForEncoding(union);
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
+  }
+
+  /**
+   * Notifies ElementByteSizeObserver about the byte size of the encoded value 
using this coder.
+   */
+  @Override
+  public void registerByteSizeObserver(
+      RawUnionValue union, ElementByteSizeObserver observer, Context context)
+      throws Exception {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    observer.update(VarInt.getLength(index));
+    // Write out the actual value.
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.registerByteSizeObserver(union.getValue(), observer, context);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final List<Coder<?>> elementCoders;
+
+  private UnionCoder(List<Coder<?>> elementCoders) {
+    this.elementCoders = elementCoders;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(
+        "UnionCoder is only deterministic if all element coders are",
+        elementCoders);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
index e433589..b402f7c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
@@ -32,185 +32,185 @@ import java.io.ObjectInputStream;
  */
 public class CoderComparator<T> extends TypeComparator<T> {
 
-       private Coder<T> coder;
-
-       // We use these for internal encoding/decoding for creating copies and 
comparing
-       // serialized forms using a Coder
-       private transient InspectableByteArrayOutputStream buffer1;
-       private transient InspectableByteArrayOutputStream buffer2;
-
-       // For storing the Reference in encoded form
-       private transient InspectableByteArrayOutputStream referenceBuffer;
-
-       public CoderComparator(Coder<T> coder) {
-               this.coder = coder;
-               buffer1 = new InspectableByteArrayOutputStream();
-               buffer2 = new InspectableByteArrayOutputStream();
-               referenceBuffer = new InspectableByteArrayOutputStream();
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               buffer1 = new InspectableByteArrayOutputStream();
-               buffer2 = new InspectableByteArrayOutputStream();
-               referenceBuffer = new InspectableByteArrayOutputStream();
-       }
-
-       @Override
-       public int hash(T record) {
-               return record.hashCode();
-       }
-
-       @Override
-       public void setReference(T toCompare) {
-               referenceBuffer.reset();
-               try {
-                       coder.encode(toCompare, referenceBuffer, 
Coder.Context.OUTER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not set reference " + 
toCompare + ": " + e);
-               }
-       }
-
-       @Override
-       public boolean equalToReference(T candidate) {
-               try {
-                       buffer2.reset();
-                       coder.encode(candidate, buffer2, Coder.Context.OUTER);
-                       byte[] arr = referenceBuffer.getBuffer();
-                       byte[] arrOther = buffer2.getBuffer();
-                       if (referenceBuffer.size() != buffer2.size()) {
-                               return false;
-                       }
-                       int len = buffer2.size();
-                       for(int i = 0; i < len; i++ ) {
-                               if (arr[i] != arrOther[i]) {
-                                       return false;
-                               }
-                       }
-                       return true;
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not compare 
reference.", e);
-               }
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<T> other) {
-               InspectableByteArrayOutputStream otherReferenceBuffer = 
((CoderComparator<T>) other).referenceBuffer;
-
-               byte[] arr = referenceBuffer.getBuffer();
-               byte[] arrOther = otherReferenceBuffer.getBuffer();
-               if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-                       return referenceBuffer.size() - 
otherReferenceBuffer.size();
-               }
-               int len = referenceBuffer.size();
-               for (int i = 0; i < len; i++) {
-                       if (arr[i] != arrOther[i]) {
-                               return arr[i] - arrOther[i];
-                       }
-               }
-               return 0;
-       }
-
-       @Override
-       public int compare(T first, T second) {
-               try {
-                       buffer1.reset();
-                       buffer2.reset();
-                       coder.encode(first, buffer1, Coder.Context.OUTER);
-                       coder.encode(second, buffer2, Coder.Context.OUTER);
-                       byte[] arr = buffer1.getBuffer();
-                       byte[] arrOther = buffer2.getBuffer();
-                       if (buffer1.size() != buffer2.size()) {
-                               return buffer1.size() - buffer2.size();
-                       }
-                       int len = buffer1.size();
-                       for(int i = 0; i < len; i++ ) {
-                               if (arr[i] != arrOther[i]) {
-                                       return arr[i] - arrOther[i];
-                               }
-                       }
-                       return 0;
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not compare: ", e);
-               }
-       }
-
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-               CoderTypeSerializer<T> serializer = new 
CoderTypeSerializer<>(coder);
-               T first = serializer.deserialize(firstSource);
-               T second = serializer.deserialize(secondSource);
-               return compare(first, second);
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return true;
-       }
-
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               return Integer.MAX_VALUE;
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return true;
-       }
-
-       @Override
-       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
-               buffer1.reset();
-               try {
-                       coder.encode(record, buffer1, Coder.Context.OUTER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not serializer " + 
record + " using coder " + coder + ": " + e);
-               }
-               final byte[] data = buffer1.getBuffer();
-               final int limit = offset + numBytes;
-
-               target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
-
-               offset += buffer1.size();
-
-               while (offset < limit) {
-                       target.put(offset++, (byte) 0);
-               }
-       }
-
-       @Override
-       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return false;
-       }
-
-       @Override
-       public TypeComparator<T> duplicate() {
-               return new CoderComparator<>(coder);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               target[index] = record;
-               return 1;
-       }
-
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return new TypeComparator[] { this.duplicate() };
-       }
+  private Coder<T> coder;
+
+  // We use these for internal encoding/decoding for creating copies and 
comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+  public CoderComparator(Coder<T> coder) {
+    this.coder = coder;
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  @Override
+  public int hash(T record) {
+    return record.hashCode();
+  }
+
+  @Override
+  public void setReference(T toCompare) {
+    referenceBuffer.reset();
+    try {
+      coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " 
+ e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(T candidate) {
+    try {
+      buffer2.reset();
+      coder.encode(candidate, buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<T> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = 
((CoderComparator<T>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int compare(T first, T second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      coder.encode(first, buffer1, Coder.Context.OUTER);
+      coder.encode(second, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare: ", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+    CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
+    T first = serializer.deserialize(firstSource);
+    T second = serializer.deserialize(secondSource);
+    return compare(first, second);
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(T record, MemorySegment target, int offset, int 
numBytes) {
+    buffer1.reset();
+    try {
+      coder.encode(record, buffer1, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using 
coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
+
+    offset += buffer1.size();
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T readWithKeyDenormalization(T reuse, DataInputView source) throws 
IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<T> duplicate() {
+    return new CoderComparator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    target[index] = record;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] { this.duplicate() };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
index dd9c5f6..ae4309e 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
@@ -32,85 +32,85 @@ import com.google.common.base.Preconditions;
  */
 public class CoderTypeInformation<T> extends TypeInformation<T> implements 
AtomicType<T> {
 
-       private final Coder<T> coder;
-
-       public CoderTypeInformation(Coder<T> coder) {
-               Preconditions.checkNotNull(coder);
-               this.coder = coder;
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 1;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public Class<T> getTypeClass() {
-               // We don't have the Class, so we have to pass null here. What 
a shame...
-               return (Class<T>) Object.class;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return true;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               if (coder instanceof VoidCoder) {
-                       return (TypeSerializer<T>) new 
VoidCoderTypeSerializer();
-               }
-               return new CoderTypeSerializer<>(coder);
-       }
-
-       @Override
-       public int getTotalFields() {
-               return 2;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) return true;
-               if (o == null || getClass() != o.getClass()) return false;
-
-               CoderTypeInformation that = (CoderTypeInformation) o;
-
-               return coder.equals(that.coder);
-
-       }
-
-       @Override
-       public int hashCode() {
-               return coder.hashCode();
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof CoderTypeInformation;
-       }
-
-       @Override
-       public String toString() {
-               return "CoderTypeInformation{" +
-                               "coder=" + coder +
-                               '}';
-       }
-
-       @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig
-                       executionConfig) {
-               return new CoderComparator<>(coder);
-       }
+  private final Coder<T> coder;
+
+  public CoderTypeInformation(Coder<T> coder) {
+    Preconditions.checkNotNull(coder);
+    this.coder = coder;
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 1;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<T> getTypeClass() {
+    // We don't have the Class, so we have to pass null here. What a shame...
+    return (Class<T>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+    if (coder instanceof VoidCoder) {
+      return (TypeSerializer<T>) new VoidCoderTypeSerializer();
+    }
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeInformation that = (CoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeInformation;
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig
+      executionConfig) {
+    return new CoderComparator<>(coder);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
index f739397..6ed661c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
@@ -35,118 +35,118 @@ import java.io.ObjectInputStream;
  * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s
  */
 public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-       
-       private Coder<T> coder;
-       private transient DataInputViewWrapper inputWrapper;
-       private transient DataOutputViewWrapper outputWrapper;
-
-       // We use this for internal encoding/decoding for creating copies using 
the Coder.
-       private transient InspectableByteArrayOutputStream buffer;
-
-       public CoderTypeSerializer(Coder<T> coder) {
-               this.coder = coder;
-               this.inputWrapper = new DataInputViewWrapper(null);
-               this.outputWrapper = new DataOutputViewWrapper(null);
-
-               buffer = new InspectableByteArrayOutputStream();
-       }
-       
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               this.inputWrapper = new DataInputViewWrapper(null);
-               this.outputWrapper = new DataOutputViewWrapper(null);
-
-               buffer = new InspectableByteArrayOutputStream();
-       }
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public CoderTypeSerializer<T> duplicate() {
-               return new CoderTypeSerializer<>(coder);
-       }
-
-       @Override
-       public T createInstance() {
-               return null;
-       }
-
-       @Override
-       public T copy(T t) {
-               buffer.reset();
-               try {
-                       coder.encode(t, buffer, Coder.Context.OUTER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not copy.", e);
-               }
-               try {
-                       return coder.decode(new 
ByteArrayInputStream(buffer.getBuffer(), 0, buffer
-                                       .size()), Coder.Context.OUTER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not copy.", e);
-               }
-       }
-
-       @Override
-       public T copy(T t, T reuse) {
-               return copy(t);
-       }
-
-       @Override
-       public int getLength() {
-               return 0;
-       }
-
-       @Override
-       public void serialize(T t, DataOutputView dataOutputView) throws 
IOException {
-               outputWrapper.setOutputView(dataOutputView);
-               coder.encode(t, outputWrapper, Coder.Context.NESTED);
-       }
-
-       @Override
-       public T deserialize(DataInputView dataInputView) throws IOException {
-               try {
-                       inputWrapper.setInputView(dataInputView);
-                       return coder.decode(inputWrapper, Coder.Context.NESTED);
-               } catch (CoderException e) {
-                       Throwable cause = e.getCause();
-                       if (cause instanceof EOFException) {
-                               throw (EOFException) cause;
-                       } else {
-                               throw e;
-                       }
-               }
-       }
-
-       @Override
-       public T deserialize(T t, DataInputView dataInputView) throws 
IOException {
-               return deserialize(dataInputView);
-       }
-
-       @Override
-       public void copy(DataInputView dataInputView, DataOutputView 
dataOutputView) throws IOException {
-               serialize(deserialize(dataInputView), dataOutputView);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) return true;
-               if (o == null || getClass() != o.getClass()) return false;
-
-               CoderTypeSerializer that = (CoderTypeSerializer) o;
-               return coder.equals(that.coder);
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof CoderTypeSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return coder.hashCode();
-       }
+  
+  private Coder<T> coder;
+  private transient DataInputViewWrapper inputWrapper;
+  private transient DataOutputViewWrapper outputWrapper;
+
+  // We use this for internal encoding/decoding for creating copies using the 
Coder.
+  private transient InspectableByteArrayOutputStream buffer;
+
+  public CoderTypeSerializer(Coder<T> coder) {
+    this.coder = coder;
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public CoderTypeSerializer<T> duplicate() {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public T createInstance() {
+    return null;
+  }
+
+  @Override
+  public T copy(T t) {
+    buffer.reset();
+    try {
+      coder.encode(t, buffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+    try {
+      return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, 
buffer
+          .size()), Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+  }
+
+  @Override
+  public T copy(T t, T reuse) {
+    return copy(t);
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(T t, DataOutputView dataOutputView) throws IOException 
{
+    outputWrapper.setOutputView(dataOutputView);
+    coder.encode(t, outputWrapper, Coder.Context.NESTED);
+  }
+
+  @Override
+  public T deserialize(DataInputView dataInputView) throws IOException {
+    try {
+      inputWrapper.setInputView(dataInputView);
+      return coder.decode(inputWrapper, Coder.Context.NESTED);
+    } catch (CoderException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof EOFException) {
+        throw (EOFException) cause;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public T deserialize(T t, DataInputView dataInputView) throws IOException {
+    return deserialize(dataInputView);
+  }
+
+  @Override
+  public void copy(DataInputView dataInputView, DataOutputView dataOutputView) 
throws IOException {
+    serialize(deserialize(dataInputView), dataOutputView);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeSerializer that = (CoderTypeSerializer) o;
+    return coder.equals(that.coder);
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
index 5d918cc..be6eadd 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
@@ -25,10 +25,10 @@ import java.io.ByteArrayOutputStream;
  */
 public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
 
-       /**
-        * Get the underlying byte array.
-        */
-       public byte[] getBuffer() {
-               return buf;
-       }
+  /**
+   * Get the underlying byte array.
+   */
+  public byte[] getBuffer() {
+    return buf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
index 815569d..ba09ea9 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
@@ -35,230 +35,230 @@ import java.io.ObjectInputStream;
  * for {@link KV} that always compares on the key only.
  */
 public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
-       
-       private KvCoder<K, V> coder;
-       private Coder<K> keyCoder;
-
-       // We use these for internal encoding/decoding for creating copies and 
comparing
-       // serialized forms using a Coder
-       private transient InspectableByteArrayOutputStream buffer1;
-       private transient InspectableByteArrayOutputStream buffer2;
-
-       // For storing the Reference in encoded form
-       private transient InspectableByteArrayOutputStream referenceBuffer;
-
-
-       // For deserializing the key
-       private transient DataInputViewWrapper inputWrapper;
-
-       public KvCoderComperator(KvCoder<K, V> coder) {
-               this.coder = coder;
-               this.keyCoder = coder.getKeyCoder();
-
-               buffer1 = new InspectableByteArrayOutputStream();
-               buffer2 = new InspectableByteArrayOutputStream();
-               referenceBuffer = new InspectableByteArrayOutputStream();
-
-               inputWrapper = new DataInputViewWrapper(null);
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               buffer1 = new InspectableByteArrayOutputStream();
-               buffer2 = new InspectableByteArrayOutputStream();
-               referenceBuffer = new InspectableByteArrayOutputStream();
-
-               inputWrapper = new DataInputViewWrapper(null);
-       }
-
-       @Override
-       public int hash(KV<K, V> record) {
-               K key = record.getKey();
-               if (key != null) {
-                       return key.hashCode();
-               } else {
-                       return 0;
-               }
-       }
-
-       @Override
-       public void setReference(KV<K, V> toCompare) {
-               referenceBuffer.reset();
-               try {
-                       keyCoder.encode(toCompare.getKey(), referenceBuffer, 
Coder.Context.OUTER);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not set reference " + 
toCompare + ": " + e);
-               }
-       }
-
-       @Override
-       public boolean equalToReference(KV<K, V> candidate) {
-               try {
-                       buffer2.reset();
-                       keyCoder.encode(candidate.getKey(), buffer2, 
Coder.Context.OUTER);
-                       byte[] arr = referenceBuffer.getBuffer();
-                       byte[] arrOther = buffer2.getBuffer();
-                       if (referenceBuffer.size() != buffer2.size()) {
-                               return false;
-                       }
-                       int len = buffer2.size();
-                       for(int i = 0; i < len; i++ ) {
-                               if (arr[i] != arrOther[i]) {
-                                       return false;
-                               }
-                       }
-                       return true;
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not compare 
reference.", e);
-               }
-       }
-
-       @Override
-       public int compareToReference(TypeComparator<KV<K, V>> other) {
-               InspectableByteArrayOutputStream otherReferenceBuffer = 
((KvCoderComperator<K, V>) other).referenceBuffer;
-
-               byte[] arr = referenceBuffer.getBuffer();
-               byte[] arrOther = otherReferenceBuffer.getBuffer();
-               if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-                       return referenceBuffer.size() - 
otherReferenceBuffer.size();
-               }
-               int len = referenceBuffer.size();
-               for (int i = 0; i < len; i++) {
-                       if (arr[i] != arrOther[i]) {
-                               return arr[i] - arrOther[i];
-                       }
-               }
-               return 0;
-       }
-
-
-       @Override
-       public int compare(KV<K, V> first, KV<K, V> second) {
-               try {
-                       buffer1.reset();
-                       buffer2.reset();
-                       keyCoder.encode(first.getKey(), buffer1, 
Coder.Context.OUTER);
-                       keyCoder.encode(second.getKey(), buffer2, 
Coder.Context.OUTER);
-                       byte[] arr = buffer1.getBuffer();
-                       byte[] arrOther = buffer2.getBuffer();
-                       if (buffer1.size() != buffer2.size()) {
-                               return buffer1.size() - buffer2.size();
-                       }
-                       int len = buffer1.size();
-                       for(int i = 0; i < len; i++ ) {
-                               if (arr[i] != arrOther[i]) {
-                                       return arr[i] - arrOther[i];
-                               }
-                       }
-                       return 0;
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not compare 
reference.", e);
-               }
-       }
-
-       @Override
-       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
-
-               inputWrapper.setInputView(firstSource);
-               K firstKey = keyCoder.decode(inputWrapper, 
Coder.Context.NESTED);
-               inputWrapper.setInputView(secondSource);
-               K secondKey = keyCoder.decode(inputWrapper, 
Coder.Context.NESTED);
-
-               try {
-                       buffer1.reset();
-                       buffer2.reset();
-                       keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
-                       keyCoder.encode(secondKey, buffer2, 
Coder.Context.OUTER);
-                       byte[] arr = buffer1.getBuffer();
-                       byte[] arrOther = buffer2.getBuffer();
-                       if (buffer1.size() != buffer2.size()) {
-                               return buffer1.size() - buffer2.size();
-                       }
-                       int len = buffer1.size();
-                       for(int i = 0; i < len; i++ ) {
-                               if (arr[i] != arrOther[i]) {
-                                       return arr[i] - arrOther[i];
-                               }
-                       }
-                       return 0;
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not compare 
reference.", e);
-               }
-       }
-
-       @Override
-       public boolean supportsNormalizedKey() {
-               return true;
-       }
-
-       @Override
-       public boolean supportsSerializationWithKeyNormalization() {
-               return false;
-       }
-
-       @Override
-       public int getNormalizeKeyLen() {
-               return Integer.MAX_VALUE;
-       }
-
-       @Override
-       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-               return true;
-       }
-
-       @Override
-       public void putNormalizedKey(KV<K, V> record, MemorySegment target, int 
offset, int numBytes) {
-               buffer1.reset();
-               try {
-                       keyCoder.encode(record.getKey(), buffer1, 
Coder.Context.NESTED);
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not serializer " + 
record + " using coder " + coder + ": " + e);
-               }
-               final byte[] data = buffer1.getBuffer();
-               final int limit = offset + numBytes;
-
-               int numBytesPut = Math.min(numBytes, buffer1.size());
-
-               target.put(offset, data, 0, numBytesPut);
-
-               offset += numBytesPut;
-
-               while (offset < limit) {
-                       target.put(offset++, (byte) 0);
-               }
-       }
-
-       @Override
-       public void writeWithKeyNormalization(KV<K, V> record, DataOutputView 
target) throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, 
DataInputView source) throws IOException {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public boolean invertNormalizedKey() {
-               return false;
-       }
-
-       @Override
-       public TypeComparator<KV<K, V>> duplicate() {
-               return new KvCoderComperator<>(coder);
-       }
-
-       @Override
-       public int extractKeys(Object record, Object[] target, int index) {
-               KV<K, V> kv = (KV<K, V>) record;
-               K k = kv.getKey();
-               target[index] = k;
-               return 1;
-       }
-
-       @Override
-       public TypeComparator[] getFlatComparators() {
-               return new TypeComparator[] {new CoderComparator<>(keyCoder)};
-       }
+  
+  private KvCoder<K, V> coder;
+  private Coder<K> keyCoder;
+
+  // We use these for internal encoding/decoding for creating copies and 
comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+
+  // For deserializing the key
+  private transient DataInputViewWrapper inputWrapper;
+
+  public KvCoderComperator(KvCoder<K, V> coder) {
+    this.coder = coder;
+    this.keyCoder = coder.getKeyCoder();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  @Override
+  public int hash(KV<K, V> record) {
+    K key = record.getKey();
+    if (key != null) {
+      return key.hashCode();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void setReference(KV<K, V> toCompare) {
+    referenceBuffer.reset();
+    try {
+      keyCoder.encode(toCompare.getKey(), referenceBuffer, 
Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " 
+ e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(KV<K, V> candidate) {
+    try {
+      buffer2.reset();
+      keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<KV<K, V>> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = 
((KvCoderComperator<K, V>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+
+  @Override
+  public int compare(KV<K, V> first, KV<K, V> second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
+      keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+
+    inputWrapper.setInputView(firstSource);
+    K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+    inputWrapper.setInputView(secondSource);
+    K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
+      keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(KV<K, V> record, MemorySegment target, int 
offset, int numBytes) {
+    buffer1.reset();
+    try {
+      keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using 
coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    int numBytesPut = Math.min(numBytes, buffer1.size());
+
+    target.put(offset, data, 0, numBytesPut);
+
+    offset += numBytesPut;
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(KV<K, V> record, DataOutputView 
target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView 
source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<KV<K, V>> duplicate() {
+    return new KvCoderComperator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    KV<K, V> kv = (KV<K, V>) record;
+    K k = kv.getKey();
+    target[index] = k;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] {new CoderComparator<>(keyCoder)};
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
index 090f79d..be11918 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
@@ -34,153 +34,153 @@ import java.util.List;
  */
 public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
 
-       private KvCoder<K, V> coder;
-
-       // We don't have the Class, so we have to pass null here. What a 
shame...
-       private static Object DUMMY = new Object();
-
-       @SuppressWarnings("unchecked")
-       public KvCoderTypeInformation(KvCoder<K, V> coder) {
-               super(((Class<KV<K,V>>) DUMMY.getClass()));
-               this.coder = coder;
-               Preconditions.checkNotNull(coder);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public TypeComparator<KV<K, V>> createComparator(int[] 
logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig 
config) {
-               return new KvCoderComperator((KvCoder) coder);
-       }
-
-       @Override
-       public boolean isBasicType() {
-               return false;
-       }
-
-       @Override
-       public boolean isTupleType() {
-               return false;
-       }
-
-       @Override
-       public int getArity() {
-               return 2;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public Class<KV<K, V>> getTypeClass() {
-               return privateGetTypeClass();
-       }
-
-       @SuppressWarnings("unchecked")
-       private static <X> Class<X> privateGetTypeClass() {
-               return (Class<X>) Object.class;
-       }
-
-       @Override
-       public boolean isKeyType() {
-               return true;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig 
config) {
-               return new CoderTypeSerializer<>(coder);
-       }
-
-       @Override
-       public int getTotalFields() {
-               return 2;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) return true;
-               if (o == null || getClass() != o.getClass()) return false;
-
-               KvCoderTypeInformation that = (KvCoderTypeInformation) o;
-
-               return coder.equals(that.coder);
-
-       }
-
-       @Override
-       public int hashCode() {
-               return coder.hashCode();
-       }
-
-       @Override
-       public String toString() {
-               return "CoderTypeInformation{" +
-                               "coder=" + coder +
-                               '}';
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public <X> TypeInformation<X> getTypeAt(int pos) {
-               if (pos == 0) {
-                       return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getKeyCoder());
-               } else if (pos == 1) {
-                       return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getValueCoder());
-               } else {
-                       throw new RuntimeException("Invalid field position " + 
pos);
-               }
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-               switch (fieldExpression) {
-                       case "key":
-                               return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getKeyCoder());
-                       case "value":
-                               return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getValueCoder());
-                       default:
-                               throw new UnsupportedOperationException("Only 
KvCoder has fields.");
-               }
-       }
-
-       @Override
-       public String[] getFieldNames() {
-               return new String[]{"key", "value"};
-       }
-
-       @Override
-       public int getFieldIndex(String fieldName) {
-               switch (fieldName) {
-                       case "key":
-                               return 0;
-                       case "value":
-                               return 1;
-                       default:
-                               return -1;
-               }
-       }
-
-       @Override
-       public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
-                       CoderTypeInformation keyTypeInfo = new 
CoderTypeInformation<>(coder.getKeyCoder());
-                       result.add(new FlatFieldDescriptor(0, keyTypeInfo));
-       }
-
-       @Override
-       protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() 
{
-               return new KvCoderTypeComparatorBuilder();
-       }
-
-       private class KvCoderTypeComparatorBuilder implements 
TypeComparatorBuilder<KV<K, V>> {
-
-               @Override
-               public void initializeTypeComparatorBuilder(int size) {}
-
-               @Override
-               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {}
-
-               @Override
-               public TypeComparator<KV<K, V>> 
createTypeComparator(ExecutionConfig config) {
-                       return new KvCoderComperator<>(coder);
-               }
-       }
+  private KvCoder<K, V> coder;
+
+  // We don't have the Class, so we have to pass null here. What a shame...
+  private static Object DUMMY = new Object();
+
+  @SuppressWarnings("unchecked")
+  public KvCoderTypeInformation(KvCoder<K, V> coder) {
+    super(((Class<KV<K,V>>) DUMMY.getClass()));
+    this.coder = coder;
+    Preconditions.checkNotNull(coder);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, 
boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
+    return new KvCoderComperator((KvCoder) coder);
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 2;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<KV<K, V>> getTypeClass() {
+    return privateGetTypeClass();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <X> Class<X> privateGetTypeClass() {
+    return (Class<X>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    KvCoderTypeInformation that = (KvCoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(int pos) {
+    if (pos == 0) {
+      return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getKeyCoder());
+    } else if (pos == 1) {
+      return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getValueCoder());
+    } else {
+      throw new RuntimeException("Invalid field position " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+    switch (fieldExpression) {
+      case "key":
+        return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getKeyCoder());
+      case "value":
+        return (TypeInformation<X>) new 
CoderTypeInformation<>(coder.getValueCoder());
+      default:
+        throw new UnsupportedOperationException("Only KvCoder has fields.");
+    }
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return new String[]{"key", "value"};
+  }
+
+  @Override
+  public int getFieldIndex(String fieldName) {
+    switch (fieldName) {
+      case "key":
+        return 0;
+      case "value":
+        return 1;
+      default:
+        return -1;
+    }
+  }
+
+  @Override
+  public void getFlatFields(String fieldExpression, int offset, 
List<FlatFieldDescriptor> result) {
+      CoderTypeInformation keyTypeInfo = new 
CoderTypeInformation<>(coder.getKeyCoder());
+      result.add(new FlatFieldDescriptor(0, keyTypeInfo));
+  }
+
+  @Override
+  protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
+    return new KvCoderTypeComparatorBuilder();
+  }
+
+  private class KvCoderTypeComparatorBuilder implements 
TypeComparatorBuilder<KV<K, V>> {
+
+    @Override
+    public void initializeTypeComparatorBuilder(int size) {}
+
+    @Override
+    public void addComparatorField(int fieldId, TypeComparator<?> comparator) 
{}
+
+    @Override
+    public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig 
config) {
+      return new KvCoderComperator<>(coder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
index 7ce484a..190d898 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
@@ -31,82 +31,82 @@ import java.io.IOException;
  */
 public class VoidCoderTypeSerializer extends 
TypeSerializer<VoidCoderTypeSerializer.VoidValue> {
 
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public VoidCoderTypeSerializer duplicate() {
-               return this;
-       }
-
-       @Override
-       public VoidValue createInstance() {
-               return VoidValue.INSTANCE;
-       }
-
-       @Override
-       public VoidValue copy(VoidValue from) {
-               return from;
-       }
-
-       @Override
-       public VoidValue copy(VoidValue from, VoidValue reuse) {
-               return from;
-       }
-
-       @Override
-       public int getLength() {
-               return 0;
-       }
-
-       @Override
-       public void serialize(VoidValue record, DataOutputView target) throws 
IOException {
-               target.writeByte(1);
-       }
-
-       @Override
-       public VoidValue deserialize(DataInputView source) throws IOException {
-               source.readByte();
-               return VoidValue.INSTANCE;
-       }
-
-       @Override
-       public VoidValue deserialize(VoidValue reuse, DataInputView source) 
throws IOException {
-               return deserialize(source);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               source.readByte();
-               target.writeByte(1);
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof VoidCoderTypeSerializer) {
-                       VoidCoderTypeSerializer other = 
(VoidCoderTypeSerializer) obj;
-                       return other.canEqual(this);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof VoidCoderTypeSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return 0;
-       }
-
-       public static class VoidValue {
-               private VoidValue() {}
-               
-               public static VoidValue INSTANCE = new VoidValue();
-       }
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public VoidCoderTypeSerializer duplicate() {
+    return this;
+  }
+
+  @Override
+  public VoidValue createInstance() {
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from) {
+    return from;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from, VoidValue reuse) {
+    return from;
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(VoidValue record, DataOutputView target) throws 
IOException {
+    target.writeByte(1);
+  }
+
+  @Override
+  public VoidValue deserialize(DataInputView source) throws IOException {
+    source.readByte();
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue deserialize(VoidValue reuse, DataInputView source) throws 
IOException {
+    return deserialize(source);
+  }
+
+  @Override
+  public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+    source.readByte();
+    target.writeByte(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof VoidCoderTypeSerializer) {
+      VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
+      return other.canEqual(this);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof VoidCoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public static class VoidValue {
+    private VoidValue() {}
+    
+    public static VoidValue INSTANCE = new VoidValue();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
index 924b297..8f6d67c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
@@ -31,62 +31,62 @@ import java.io.Serializable;
  * operation.
  */
 public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, 
AR>, Accumulator<AI, Serializable> {
-       
-       private AA aa;
-       private Combine.CombineFn<? super AI, AA, AR> combiner;
+  
+  private AA aa;
+  private Combine.CombineFn<? super AI, AA, AR> combiner;
 
-       public CombineFnAggregatorWrapper() {
-       }
+  public CombineFnAggregatorWrapper() {
+  }
 
-       public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> 
combiner) {
-               this.combiner = combiner;
-               this.aa = combiner.createAccumulator();
-       }
+  public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> 
combiner) {
+    this.combiner = combiner;
+    this.aa = combiner.createAccumulator();
+  }
 
-       @Override
-       public void add(AI value) {
-               combiner.addInput(aa, value);
-       }
+  @Override
+  public void add(AI value) {
+    combiner.addInput(aa, value);
+  }
 
-       @Override
-       public Serializable getLocalValue() {
-               return (Serializable) combiner.extractOutput(aa);
-       }
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) combiner.extractOutput(aa);
+  }
 
-       @Override
-       public void resetLocal() {
-               aa = combiner.createAccumulator();
-       }
+  @Override
+  public void resetLocal() {
+    aa = combiner.createAccumulator();
+  }
 
-       @Override
-       @SuppressWarnings("unchecked")
-       public void merge(Accumulator<AI, Serializable> other) {
-               aa = combiner.mergeAccumulators(Lists.newArrayList(aa, 
((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
-       }
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    aa = combiner.mergeAccumulators(Lists.newArrayList(aa, 
((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
+  }
 
-       @Override
-       public Accumulator<AI, Serializable> clone() {
-               // copy it by merging
-               AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
-               CombineFnAggregatorWrapper<AI, AA, AR> result = new
-                               CombineFnAggregatorWrapper<>(combiner);
-               result.aa = aaCopy;
-               return result;
-       }
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
+    CombineFnAggregatorWrapper<AI, AA, AR> result = new
+        CombineFnAggregatorWrapper<>(combiner);
+    result.aa = aaCopy;
+    return result;
+  }
 
-       @Override
-       public void addValue(AI value) {
-               add(value);
-       }
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
 
-       @Override
-       public String getName() {
-               return "CombineFn: " + combiner.toString();
-       }
+  @Override
+  public String getName() {
+    return "CombineFn: " + combiner.toString();
+  }
 
-       @Override
-       public Combine.CombineFn getCombineFn() {
-               return combiner;
-       }
+  @Override
+  public Combine.CombineFn getCombineFn() {
+    return combiner;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
index 90582b0..3c96939 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
@@ -31,29 +31,29 @@ import java.io.InputStream;
  */
 public class DataInputViewWrapper extends InputStream {
 
-       private DataInputView inputView;
-
-       public DataInputViewWrapper(DataInputView inputView) {
-               this.inputView = inputView;
-       }
-
-       public void setInputView(DataInputView inputView) {
-               this.inputView = inputView;
-       }
-
-       @Override
-       public int read() throws IOException {
-               try {
-                       return inputView.readUnsignedByte();
-               } catch (EOFException e) {
-                       // translate between DataInput and InputStream,
-                       // DataInput signals EOF by exception, InputStream does 
it by returning -1
-                       return -1;
-               }
-       }
-
-       @Override
-       public int read(byte[] b, int off, int len) throws IOException {
-               return inputView.read(b, off, len);
-       }
+  private DataInputView inputView;
+
+  public DataInputViewWrapper(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  public void setInputView(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  @Override
+  public int read() throws IOException {
+    try {
+      return inputView.readUnsignedByte();
+    } catch (EOFException e) {
+      // translate between DataInput and InputStream,
+      // DataInput signals EOF by exception, InputStream does it by returning 
-1
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return inputView.read(b, off, len);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
index 46df8e5..a222cdd 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
@@ -29,24 +29,24 @@ import java.io.OutputStream;
  * {@link java.io.OutputStream}.
  */
 public class DataOutputViewWrapper extends OutputStream {
-       
-       private DataOutputView outputView;
+  
+  private DataOutputView outputView;
 
-       public DataOutputViewWrapper(DataOutputView outputView) {
-               this.outputView = outputView;
-       }
+  public DataOutputViewWrapper(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
 
-       public void setOutputView(DataOutputView outputView) {
-               this.outputView = outputView;
-       }
+  public void setOutputView(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
 
-       @Override
-       public void write(int b) throws IOException {
-               outputView.write(b);
-       }
+  @Override
+  public void write(int b) throws IOException {
+    outputView.write(b);
+  }
 
-       @Override
-       public void write(byte[] b, int off, int len) throws IOException {
-               outputView.write(b, off, len);
-       }
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputView.write(b, off, len);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
index 1c0dae4..c193a4d 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -33,59 +33,59 @@ import java.io.Serializable;
  */
 public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, 
AO>, Accumulator<AI, Serializable> {
 
-       private AO aa;
-       private Combine.CombineFn<AI, ?, AO> combiner;
+  private AO aa;
+  private Combine.CombineFn<AI, ?, AO> combiner;
 
-       public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> 
combiner) {
-               this.combiner = combiner;
-               resetLocal();
-       }
-       
-       @Override
-       @SuppressWarnings("unchecked")
-       public void add(AI value) {
-               this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
-       }
+  public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> 
combiner) {
+    this.combiner = combiner;
+    resetLocal();
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  public void add(AI value) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
+  }
 
-       @Override
-       public Serializable getLocalValue() {
-               return (Serializable) aa;
-       }
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) aa;
+  }
 
-       @Override
-       public void resetLocal() {
-               this.aa = combiner.apply(ImmutableList.<AI>of());
-       }
+  @Override
+  public void resetLocal() {
+    this.aa = combiner.apply(ImmutableList.<AI>of());
+  }
 
-       @Override
-       @SuppressWarnings("unchecked")
-       public void merge(Accumulator<AI, Serializable> other) {
-               this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) 
other.getLocalValue()));
-       }
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) 
other.getLocalValue()));
+  }
 
-       @Override
-       public void addValue(AI value) {
-               add(value);
-       }
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
 
-       @Override
-       public String getName() {
-               return "Aggregator :" + combiner.toString();
-       }
+  @Override
+  public String getName() {
+    return "Aggregator :" + combiner.toString();
+  }
 
-       @Override
-       public Combine.CombineFn<AI, ?, AO> getCombineFn() {
-               return combiner;
-       }
+  @Override
+  public Combine.CombineFn<AI, ?, AO> getCombineFn() {
+    return combiner;
+  }
 
-       @Override
-       public Accumulator<AI, Serializable> clone() {
-               // copy it by merging
-               AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
-               SerializableFnAggregatorWrapper<AI, AO> result = new
-                               SerializableFnAggregatorWrapper<>(combiner);
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
+    SerializableFnAggregatorWrapper<AI, AO> result = new
+        SerializableFnAggregatorWrapper<>(combiner);
 
-               result.aa = resultCopy;
-               return result;
-       }
+    result.aa = resultCopy;
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
index 8be9abf..3f28c16 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
@@ -38,84 +38,84 @@ import java.lang.reflect.Field;
  */
 public class SinkOutputFormat<T> implements OutputFormat<T> {
 
-       private final Sink<T> sink;
-
-       private transient PipelineOptions pipelineOptions;
-
-       private Sink.WriteOperation<T, ?> writeOperation;
-       private Sink.Writer<T, ?> writer;
-
-       private AbstractID uid = new AbstractID();
-
-       public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions 
pipelineOptions) {
-               this.sink = extractSink(transform);
-               this.pipelineOptions = 
Preconditions.checkNotNull(pipelineOptions);
-       }
-
-       private Sink<T> extractSink(Write.Bound<T> transform) {
-               // TODO possibly add a getter in the upstream
-               try {
-                       Field sinkField = 
transform.getClass().getDeclaredField("sink");
-                       sinkField.setAccessible(true);
-                       @SuppressWarnings("unchecked")
-                       Sink<T> extractedSink = (Sink<T>) 
sinkField.get(transform);
-                       return extractedSink;
-               } catch (NoSuchFieldException | IllegalAccessException e) {
-                       throw new RuntimeException("Could not acquire custom 
sink field.", e);
-               }
-       }
-
-       @Override
-       public void configure(Configuration configuration) {
-               writeOperation = sink.createWriteOperation(pipelineOptions);
-               try {
-                       writeOperation.initialize(pipelineOptions);
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to initialize the 
write operation.", e);
-               }
-       }
-
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               try {
-                       writer = writeOperation.createWriter(pipelineOptions);
-               } catch (Exception e) {
-                       throw new IOException("Couldn't create writer.", e);
-               }
-               try {
-                       writer.open(uid + "-" + String.valueOf(taskNumber));
-               } catch (Exception e) {
-                       throw new IOException("Couldn't open writer.", e);
-               }
-       }
-
-       @Override
-       public void writeRecord(T record) throws IOException {
-               try {
-                       writer.write(record);
-               } catch (Exception e) {
-                       throw new IOException("Couldn't write record.", e);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               try {
-                       writer.close();
-               } catch (Exception e) {
-                       throw new IOException("Couldn't close writer.", e);
-               }
-       }
-
-       private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException {
-               out.defaultWriteObject();
-               ObjectMapper mapper = new ObjectMapper();
-               mapper.writeValue(out, pipelineOptions);
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               ObjectMapper mapper = new ObjectMapper();
-               pipelineOptions = mapper.readValue(in, PipelineOptions.class);
-       }
+  private final Sink<T> sink;
+
+  private transient PipelineOptions pipelineOptions;
+
+  private Sink.WriteOperation<T, ?> writeOperation;
+  private Sink.Writer<T, ?> writer;
+
+  private AbstractID uid = new AbstractID();
+
+  public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions 
pipelineOptions) {
+    this.sink = extractSink(transform);
+    this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
+  }
+
+  private Sink<T> extractSink(Write.Bound<T> transform) {
+    // TODO possibly add a getter in the upstream
+    try {
+      Field sinkField = transform.getClass().getDeclaredField("sink");
+      sinkField.setAccessible(true);
+      @SuppressWarnings("unchecked")
+      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
+      return extractedSink;
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException("Could not acquire custom sink field.", e);
+    }
+  }
+
+  @Override
+  public void configure(Configuration configuration) {
+    writeOperation = sink.createWriteOperation(pipelineOptions);
+    try {
+      writeOperation.initialize(pipelineOptions);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize the write operation.", 
e);
+    }
+  }
+
+  @Override
+  public void open(int taskNumber, int numTasks) throws IOException {
+    try {
+      writer = writeOperation.createWriter(pipelineOptions);
+    } catch (Exception e) {
+      throw new IOException("Couldn't create writer.", e);
+    }
+    try {
+      writer.open(uid + "-" + String.valueOf(taskNumber));
+    } catch (Exception e) {
+      throw new IOException("Couldn't open writer.", e);
+    }
+  }
+
+  @Override
+  public void writeRecord(T record) throws IOException {
+    try {
+      writer.write(record);
+    } catch (Exception e) {
+      throw new IOException("Couldn't write record.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } catch (Exception e) {
+      throw new IOException("Couldn't close writer.", e);
+    }
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, pipelineOptions);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
index 64dc072..5981618 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputFormat.java
@@ -41,124 +41,124 @@ import java.util.List;
  * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
  */
 public class SourceInputFormat<T> implements InputFormat<T, 
SourceInputSplit<T>> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(SourceInputFormat.class);
-
-       private final BoundedSource<T> initialSource;
-       private transient PipelineOptions options;
-
-       private BoundedSource.BoundedReader<T> reader = null;
-       private boolean reachedEnd = true;
-
-       public SourceInputFormat(BoundedSource<T> initialSource, 
PipelineOptions options) {
-               this.initialSource = initialSource;
-               this.options = options;
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               ObjectMapper mapper = new ObjectMapper();
-               mapper.writeValue(out, options);
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               ObjectMapper mapper = new ObjectMapper();
-               options = mapper.readValue(in, PipelineOptions.class);
-       }
-
-       @Override
-       public void configure(Configuration configuration) {}
-
-       @Override
-       public void open(SourceInputSplit<T> sourceInputSplit) throws 
IOException {
-               reader = ((BoundedSource<T>) 
sourceInputSplit.getSource()).createReader(options);
-               reachedEnd = false;
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics baseStatistics) 
throws IOException {
-               try {
-                       final long estimatedSize = 
initialSource.getEstimatedSizeBytes(options);
-
-                       return new BaseStatistics() {
-                               @Override
-                               public long getTotalInputSize() {
-                                       return estimatedSize;
-
-                               }
-
-                               @Override
-                               public long getNumberOfRecords() {
-                                       return 
BaseStatistics.NUM_RECORDS_UNKNOWN;
-                               }
-
-                               @Override
-                               public float getAverageRecordWidth() {
-                                       return 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
-                               }
-                       };
-               } catch (Exception e) {
-                       LOG.warn("Could not read Source statistics: {}", e);
-               }
-
-               return null;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public SourceInputSplit<T>[] createInputSplits(int numSplits) throws 
IOException {
-               long desiredSizeBytes;
-               try {
-                       desiredSizeBytes = 
initialSource.getEstimatedSizeBytes(options) / numSplits;
-                       List<? extends Source<T>> shards = 
initialSource.splitIntoBundles(desiredSizeBytes,
-                                       options);
-                       List<SourceInputSplit<T>> splits = new ArrayList<>();
-                       int splitCount = 0;
-                       for (Source<T> shard: shards) {
-                               splits.add(new SourceInputSplit<>(shard, 
splitCount++));
-                       }
-                       return splits.toArray(new 
SourceInputSplit[splits.size()]);
-               } catch (Exception e) {
-                       throw new IOException("Could not create input splits 
from Source.", e);
-               }
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(final 
SourceInputSplit[] sourceInputSplits) {
-               return new InputSplitAssigner() {
-                       private int index = 0;
-                       private final SourceInputSplit[] splits = 
sourceInputSplits;
-                       @Override
-                       public InputSplit getNextInputSplit(String host, int 
taskId) {
-                               if (index < splits.length) {
-                                       return splits[index++];
-                               } else {
-                                       return null;
-                               }
-                       }
-               };
-       }
-
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return reachedEnd;
-       }
-
-       @Override
-       public T nextRecord(T t) throws IOException {
-
-               reachedEnd = !reader.advance();
-               if (!reachedEnd) {
-                       return reader.getCurrent();
-               }
-               return null;
-       }
-
-       @Override
-       public void close() throws IOException {
-               reader.close();
-       }
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceInputFormat.class);
+
+  private final BoundedSource<T> initialSource;
+  private transient PipelineOptions options;
+
+  private BoundedSource.BoundedReader<T> reader = null;
+  private boolean reachedEnd = true;
+
+  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions 
options) {
+    this.initialSource = initialSource;
+    this.options = options;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+  }
+
+  @Override
+  public void configure(Configuration configuration) {}
+
+  @Override
+  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+    reader = ((BoundedSource<T>) 
sourceInputSplit.getSource()).createReader(options);
+    reachedEnd = false;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws 
IOException {
+    try {
+      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+
+      return new BaseStatistics() {
+        @Override
+        public long getTotalInputSize() {
+          return estimatedSize;
+
+        }
+
+        @Override
+        public long getNumberOfRecords() {
+          return BaseStatistics.NUM_RECORDS_UNKNOWN;
+        }
+
+        @Override
+        public float getAverageRecordWidth() {
+          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
+        }
+      };
+    } catch (Exception e) {
+      LOG.warn("Could not read Source statistics: {}", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws 
IOException {
+    long desiredSizeBytes;
+    try {
+      desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / 
numSplits;
+      List<? extends Source<T>> shards = 
initialSource.splitIntoBundles(desiredSizeBytes,
+          options);
+      List<SourceInputSplit<T>> splits = new ArrayList<>();
+      int splitCount = 0;
+      for (Source<T> shard: shards) {
+        splits.add(new SourceInputSplit<>(shard, splitCount++));
+      }
+      return splits.toArray(new SourceInputSplit[splits.size()]);
+    } catch (Exception e) {
+      throw new IOException("Could not create input splits from Source.", e);
+    }
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] 
sourceInputSplits) {
+    return new InputSplitAssigner() {
+      private int index = 0;
+      private final SourceInputSplit[] splits = sourceInputSplits;
+      @Override
+      public InputSplit getNextInputSplit(String host, int taskId) {
+        if (index < splits.length) {
+          return splits[index++];
+        } else {
+          return null;
+        }
+      }
+    };
+  }
+
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return reachedEnd;
+  }
+
+  @Override
+  public T nextRecord(T t) throws IOException {
+
+    reachedEnd = !reader.advance();
+    if (!reachedEnd) {
+      return reader.getCurrent();
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
index 2b93ab7..86fdada 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SourceInputSplit.java
@@ -29,24 +29,24 @@ import org.apache.flink.core.io.InputSplit;
  */
 public class SourceInputSplit<T> implements InputSplit {
 
-       private Source<T> source;
-       private int splitNumber;
+  private Source<T> source;
+  private int splitNumber;
 
-       public SourceInputSplit() {
-       }
+  public SourceInputSplit() {
+  }
 
-       public SourceInputSplit(Source<T> source, int splitNumber) {
-               this.source = source;
-               this.splitNumber = splitNumber;
-       }
+  public SourceInputSplit(Source<T> source, int splitNumber) {
+    this.source = source;
+    this.splitNumber = splitNumber;
+  }
 
-       @Override
-       public int getSplitNumber() {
-               return splitNumber;
-       }
+  @Override
+  public int getSplitNumber() {
+    return splitNumber;
+  }
 
-       public Source<T> getSource() {
-               return source;
-       }
+  public Source<T> getSource() {
+    return source;
+  }
 
 }

Reply via email to