[FLINK-5187] [core] Port Row and related type utils to Java and move them to 
flink-core.

This closes #2968.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f8a255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f8a255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f8a255

Branch: refs/heads/master
Commit: 86f8a255db6ce2ff9e09c2824e85c4930427ecdb
Parents: 15e7f0a
Author: Jark Wu <[email protected]>
Authored: Thu Dec 8 22:44:29 2016 +0800
Committer: Fabian Hueske <[email protected]>
Committed: Thu Dec 15 11:36:40 2016 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |   6 +-
 .../flink/api/java/typeutils/RowTypeInfo.java   | 203 ++++++++
 .../typeutils/runtime/NullAwareComparator.java  | 240 +++++++++
 .../java/typeutils/runtime/NullMaskUtils.java   | 105 ++++
 .../java/typeutils/runtime/RowComparator.java   | 488 +++++++++++++++++++
 .../java/typeutils/runtime/RowSerializer.java   | 243 +++++++++
 .../main/java/org/apache/flink/types/Row.java   | 116 +++++
 .../api/java/typeutils/RowTypeInfoTest.java     |  69 +++
 .../typeutils/runtime/RowComparatorTest.java    | 156 ++++++
 .../RowComparatorWithManyFieldsTests.java       | 103 ++++
 .../typeutils/runtime/RowSerializerTest.java    | 197 ++++++++
 .../java/org/apache/flink/types/RowTest.java    |  37 ++
 12 files changed, 1961 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index ea02df0..2b43563 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -87,9 +87,11 @@ Internally, Flink makes the following distinctions between 
types:
 
 * Composite types
 
-  * Flink Java Tuples (part of the Flink Java API)
+  * Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields 
not supported
 
-  * Scala *case classes* (including Scala tuples)
+  * Scala *case classes* (including Scala tuples): max 22 fields, null fields 
not supported
+
+  * Row: tuples with arbitrary number of fields and support for null fields
 
   * POJOs: classes that follow a certain bean-like pattern
 

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
new file mode 100644
index 0000000..03cbe61
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase<Row> {
+
+       private static final long serialVersionUID = 9158518989896601963L;
+
+       protected final String[] fieldNames;
+       /** Temporary variable for directly passing orders to comparators. */
+       private boolean[] comparatorOrders = null;
+
+       public RowTypeInfo(TypeInformation<?>... types) {
+               super(Row.class, types);
+
+               this.fieldNames = new String[types.length];
+
+               for (int i = 0; i < types.length; i++) {
+                       fieldNames[i] = "f" + i;
+               }
+       }
+
+       @Override
+       public TypeComparator<Row> createComparator(
+               int[] logicalKeyFields,
+               boolean[] orders,
+               int logicalFieldOffset,
+               ExecutionConfig config) {
+
+               comparatorOrders = orders;
+               TypeComparator<Row> comparator = super.createComparator(
+                       logicalKeyFields,
+                       orders,
+                       logicalFieldOffset,
+                       config);
+               comparatorOrders = null;
+               return comparator;
+       }
+
+       @Override
+       protected TypeComparatorBuilder<Row> createTypeComparatorBuilder() {
+               if (comparatorOrders == null) {
+                       throw new IllegalStateException("Cannot create 
comparator builder without orders.");
+               }
+               return new RowTypeComparatorBuilder(comparatorOrders);
+       }
+
+       @Override
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public int getFieldIndex(String fieldName) {
+               for (int i = 0; i < fieldNames.length; i++) {
+                       if (fieldNames[i].equals(fieldName)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+
+       @Override
+       public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
+               int len = getArity();
+               TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
+               for (int i = 0; i < len; i++) {
+                       fieldSerializers[i] = types[i].createSerializer(config);
+               }
+               return new RowSerializer(fieldSerializers);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof RowTypeInfo;
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder bld = new StringBuilder("Row");
+               if (types.length > 0) {
+                       bld.append('(').append(fieldNames[0]).append(": 
").append(types[0]);
+
+                       for (int i = 1; i < types.length; i++) {
+                               bld.append(", 
").append(fieldNames[i]).append(": ").append(types[i]);
+                       }
+
+                       bld.append(')');
+               }
+               return bld.toString();
+       }
+
+       private class RowTypeComparatorBuilder implements 
TypeComparatorBuilder<Row> {
+
+               private final ArrayList<TypeComparator> fieldComparators = new 
ArrayList<TypeComparator>();
+               private final ArrayList<Integer> logicalKeyFields = new 
ArrayList<Integer>();
+               private final boolean[] comparatorOrders;
+
+               public RowTypeComparatorBuilder(boolean[] comparatorOrders) {
+                       this.comparatorOrders = comparatorOrders;
+               }
+
+               @Override
+               public void initializeTypeComparatorBuilder(int size) {
+                       fieldComparators.ensureCapacity(size);
+                       logicalKeyFields.ensureCapacity(size);
+               }
+
+               @Override
+               public void addComparatorField(int fieldId, TypeComparator<?> 
comparator) {
+                       fieldComparators.add(comparator);
+                       logicalKeyFields.add(fieldId);
+               }
+
+               @Override
+               public TypeComparator<Row> createTypeComparator(ExecutionConfig 
config) {
+                       checkState(
+                               fieldComparators.size() > 0,
+                               "No field comparators were defined for the 
TupleTypeComparatorBuilder."
+                       );
+
+                       checkState(
+                               logicalKeyFields.size() > 0,
+                               "No key fields were defined for the 
TupleTypeComparatorBuilder."
+                       );
+
+                       checkState(
+                               fieldComparators.size() == 
logicalKeyFields.size(),
+                               "The number of field comparators and key fields 
is not equal."
+                       );
+
+                       final int maxKey = Collections.max(logicalKeyFields);
+
+                       checkState(
+                               maxKey >= 0,
+                               "The maximum key field must be greater or equal 
than 0."
+                       );
+
+                       TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
+
+                       for (int i = 0; i <= maxKey; i++) {
+                               fieldSerializers[i] = 
types[i].createSerializer(config);
+                       }
+
+                       int[] keyPositions = new int[logicalKeyFields.size()];
+                       for (int i = 0; i < keyPositions.length; i++) {
+                               keyPositions[i] = logicalKeyFields.get(i);
+                       }
+
+                       TypeComparator[] comparators = new 
TypeComparator[fieldComparators.size()];
+                       for (int i = 0; i < fieldComparators.size(); i++) {
+                               comparators[i] = fieldComparators.get(i);
+                       }
+
+                       //noinspection unchecked
+                       return new RowComparator(
+                               getArity(),
+                               keyPositions,
+                               comparators,
+                               (TypeSerializer<Object>[]) fieldSerializers,
+                               comparatorOrders);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
new file mode 100644
index 0000000..3587811
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Null-aware comparator that wraps a comparator which does not support null 
references.
+ * <p>
+ * NOTE: This class assumes to be used within a composite type comparator (such
+ * as {@link RowComparator}) that handles serialized comparison.
+ */
+public class NullAwareComparator<T> extends TypeComparator<T> {
+       private static final long serialVersionUID = 1L;
+
+       private final TypeComparator<T> wrappedComparator;
+       private final boolean order;
+
+       // number of flat fields
+       private final int flatFields;
+
+       // stores the null for reference comparison
+       private boolean nullReference = false;
+
+       public NullAwareComparator(TypeComparator<T> wrappedComparator, boolean 
order) {
+               this.wrappedComparator = wrappedComparator;
+               this.order = order;
+               this.flatFields = wrappedComparator.getFlatComparators().length;
+       }
+
+       @Override
+       public int hash(T record) {
+               if (record != null) {
+                       return wrappedComparator.hash(record);
+               } else {
+                       return 0;
+               }
+       }
+
+       @Override
+       public void setReference(T toCompare) {
+               if (toCompare == null) {
+                       nullReference = true;
+               } else {
+                       nullReference = false;
+                       wrappedComparator.setReference(toCompare);
+               }
+       }
+
+       @Override
+       public boolean equalToReference(T candidate) {
+               // both values are null
+               if (candidate == null && nullReference) {
+                       return true;
+               }
+               // one value is null
+               else if (candidate == null || nullReference) {
+                       return false;
+               }
+               // no null value
+               else {
+                       return wrappedComparator.equalToReference(candidate);
+               }
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<T> referencedComparator) {
+               NullAwareComparator otherComparator = (NullAwareComparator) 
referencedComparator;
+               boolean otherNullReference = otherComparator.nullReference;
+               // both values are null -> equality
+               if (nullReference && otherNullReference) {
+                       return 0;
+               }
+               // first value is null -> inequality
+               // but order is considered
+               else if (nullReference) {
+                       return order ? 1 : -1;
+               }
+               // second value is null -> inequality
+               // but order is considered
+               else if (otherNullReference) {
+                       return order ? -1 : 1;
+               }
+               // no null values
+               else {
+                       return 
wrappedComparator.compareToReference(otherComparator.wrappedComparator);
+               }
+       }
+
+       @Override
+       public int compare(T first, T second) {
+               // both values are null -> equality
+               if (first == null && second == null) {
+                       return 0;
+               }
+               // first value is null -> inequality
+               // but order is considered
+               else if (first == null) {
+                       return order ? -1 : 1;
+               }
+               // second value is null -> inequality
+               // but order is considered
+               else if (second == null) {
+                       return order ? 1 : -1;
+               }
+               // no null values
+               else {
+                       return wrappedComparator.compare(first, second);
+               }
+       }
+
+       @Override
+       public int compareSerialized(
+               DataInputView firstSource,
+               DataInputView secondSource) throws IOException {
+
+               throw new UnsupportedOperationException(
+                       "Comparator does not support null-aware serialized 
comparision.");
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return wrappedComparator.supportsNormalizedKey();
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               int len = wrappedComparator.getNormalizeKeyLen();
+               if (len == Integer.MAX_VALUE) {
+                       return Integer.MAX_VALUE;
+               } else {
+                       // add one for a null byte
+                       return len + 1;
+               }
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 
1);
+       }
+
+       @Override
+       public void putNormalizedKey(T record, MemorySegment target, int 
offset, int numBytes) {
+               if (numBytes > 0) {
+                       // write a null byte with padding
+                       if (record == null) {
+                               target.putBoolean(offset, false);
+                               // write padding
+                               for (int j = 0; j < numBytes - 1; j++) {
+                                       target.put(offset + 1 + j, (byte) 0);
+                               }
+                       }
+                       // write a non-null byte with key
+                       else {
+                               target.putBoolean(offset, true);
+                               // write key
+                               wrappedComparator.putNormalizedKey(record, 
target, offset + 1, numBytes - 1);
+                       }
+               }
+       }
+
+       @Override
+       public void writeWithKeyNormalization(T record, DataOutputView target) 
throws IOException {
+               throw new UnsupportedOperationException(
+                       "Record serialization with leading normalized keys not 
supported.");
+       }
+
+       @Override
+       public T readWithKeyDenormalization(T reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException(
+                       "Record deserialization with leading normalized keys 
not supported.");
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return wrappedComparator.invertNormalizedKey();
+       }
+
+       @Override
+       public TypeComparator<T> duplicate() {
+               return new 
NullAwareComparator<T>(wrappedComparator.duplicate(), order);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               if (record == null) {
+                       for (int i = 0; i < flatFields; i++) {
+                               target[index + i] = null;
+                       }
+                       return flatFields;
+               } else {
+                       return wrappedComparator.extractKeys(record, target, 
index);
+               }
+       }
+
+       @Override
+       public TypeComparator[] getFlatComparators() {
+               // determine the flat comparators and wrap them again in 
null-aware comparators
+               List<TypeComparator<?>> flatComparators = new ArrayList<>();
+               if (wrappedComparator instanceof CompositeTypeComparator) {
+                       ((CompositeTypeComparator) 
wrappedComparator).getFlatComparator(flatComparators);
+               } else {
+                       flatComparators.add(wrappedComparator);
+               }
+
+               TypeComparator<?>[] result = new 
TypeComparator[flatComparators.size()];
+               for (int i = 0; i < result.length; i++) {
+                       result[i] = new 
NullAwareComparator<>(flatComparators.get(i), order);
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
new file mode 100644
index 0000000..010af7f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+
+public class NullMaskUtils {
+
+       public static void writeNullMask(int len, Row value, DataOutputView 
target) throws IOException {
+               int b = 0x00;
+               int bytePos = 0;
+
+               int fieldPos = 0;
+               int numPos = 0;
+               while (fieldPos < len) {
+                       b = 0x00;
+                       // set bits in byte
+                       bytePos = 0;
+                       numPos = Math.min(8, len - fieldPos);
+                       while (bytePos < numPos) {
+                               b = b << 1;
+                               // set bit if field is null
+                               if (value.getField(fieldPos + bytePos) == null) 
{
+                                       b |= 0x01;
+                               }
+                               bytePos += 1;
+                       }
+                       fieldPos += numPos;
+                       // shift bits if last byte is not completely filled
+                       b <<= (8 - bytePos);
+                       // write byte
+                       target.writeByte(b);
+               }
+       }
+
+       public static void readIntoNullMask(
+               int len,
+               DataInputView source,
+               boolean[] nullMask) throws IOException {
+
+               int b = 0x00;
+               int bytePos = 0;
+
+               int fieldPos = 0;
+               int numPos = 0;
+               while (fieldPos < len) {
+                       // read byte
+                       b = source.readUnsignedByte();
+                       bytePos = 0;
+                       numPos = Math.min(8, len - fieldPos);
+                       while (bytePos < numPos) {
+                               nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+                               b = b << 1;
+                               bytePos += 1;
+                       }
+                       fieldPos += numPos;
+               }
+       }
+
+       public static void readIntoAndCopyNullMask(
+               int len,
+               DataInputView source,
+               DataOutputView target,
+               boolean[] nullMask) throws IOException {
+
+               int b = 0x00;
+               int bytePos = 0;
+
+               int fieldPos = 0;
+               int numPos = 0;
+               while (fieldPos < len) {
+                       // read byte
+                       b = source.readUnsignedByte();
+                       // copy byte
+                       target.writeByte(b);
+                       bytePos = 0;
+                       numPos = Math.min(8, len - fieldPos);
+                       while (bytePos < numPos) {
+                               nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+                               b = b << 1;
+                               bytePos += 1;
+                       }
+                       fieldPos += numPos;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
new file mode 100644
index 0000000..d6c5195
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator<Row> {
+
+       private static final long serialVersionUID = 1L;
+       /** The number of fields of the Row */
+       private final int arity;
+       /** key positions describe which fields are keys in what order */
+       private final int[] keyPositions;
+       /** null-aware comparators for the key fields, in the same order as the 
key fields */
+       private final NullAwareComparator<Object>[] comparators;
+       /** serializers to deserialize the first n fields for comparison */
+       private final TypeSerializer<Object>[] serializers;
+       /** auxiliary fields for normalized key support */
+       private final int[] normalizedKeyLengths;
+       private final int numLeadingNormalizableKeys;
+       private final int normalizableKeyPrefixLen;
+       private final boolean invertNormKey;
+
+       // null masks for serialized comparison
+       private final boolean[] nullMask1;
+       private final boolean[] nullMask2;
+
+       // cache for the deserialized key field objects
+       transient private final Object[] deserializedKeyFields1;
+       transient private final Object[] deserializedKeyFields2;
+
+       /**
+        * General constructor for RowComparator.
+        *
+        * @param arity        the number of fields of the Row
+        * @param keyPositions key positions describe which fields are keys in 
what order
+        * @param comparators  non-null-aware comparators for the key fields, 
in the same order as
+        *                     the key fields
+        * @param serializers  serializers to deserialize the first n fields 
for comparison
+        * @param orders       sorting orders for the fields
+        */
+       public RowComparator(
+               int arity,
+               int[] keyPositions,
+               TypeComparator<Object>[] comparators,
+               TypeSerializer<Object>[] serializers,
+               boolean[] orders) {
+
+               this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
+       }
+
+
+       /**
+        * Intermediate constructor for creating auxiliary fields.
+        */
+       private RowComparator(
+               int arity,
+               int[] keyPositions,
+               NullAwareComparator<Object>[] comparators,
+               TypeSerializer<Object>[] serializers) {
+
+               this(
+                       arity,
+                       keyPositions,
+                       comparators,
+                       serializers,
+                       createAuxiliaryFields(keyPositions, comparators));
+       }
+
+       /**
+        * Intermediate constructor for creating auxiliary fields.
+        */
+       private RowComparator(
+               int arity,
+               int[] keyPositions,
+               NullAwareComparator<Object>[] comparators,
+               TypeSerializer<Object>[] serializers,
+               Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) {
+
+               this(
+                       arity,
+                       keyPositions,
+                       comparators,
+                       serializers,
+                       auxiliaryFields.f0,
+                       auxiliaryFields.f1,
+                       auxiliaryFields.f2,
+                       auxiliaryFields.f3);
+       }
+
+       /**
+        * Intermediate constructor for creating auxiliary fields.
+        */
+       private RowComparator(
+               int arity,
+               int[] keyPositions,
+               NullAwareComparator<Object>[] comparators,
+               TypeSerializer<Object>[] serializers,
+               int[] normalizedKeyLengths,
+               int numLeadingNormalizableKeys,
+               int normalizableKeyPrefixLen,
+               boolean invertNormKey) {
+
+               this.arity = arity;
+               this.keyPositions = keyPositions;
+               this.comparators = comparators;
+               this.serializers = serializers;
+               this.normalizedKeyLengths = normalizedKeyLengths;
+               this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
+               this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
+               this.invertNormKey = invertNormKey;
+               this.nullMask1 = new boolean[arity];
+               this.nullMask2 = new boolean[arity];
+               deserializedKeyFields1 = instantiateDeserializationFields();
+               deserializedKeyFields2 = instantiateDeserializationFields();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Comparator Methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void getFlatComparator(List<TypeComparator> flatComparators) {
+               for (NullAwareComparator<Object> c : comparators) {
+                       Collections.addAll(flatComparators, 
c.getFlatComparators());
+               }
+       }
+
+       @Override
+       public int hash(Row record) {
+               int code = 0;
+               int i = 0;
+
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+                               Object element = 
record.getField(keyPositions[i]); // element can be null
+                               code += comparators[i].hash(element);
+                       }
+               } catch (IndexOutOfBoundsException e) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+
+               return code;
+       }
+
+       @Override
+       public void setReference(Row toCompare) {
+               int i = 0;
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               TypeComparator<Object> comparator = 
comparators[i];
+                               Object element = 
toCompare.getField(keyPositions[i]);
+                               comparator.setReference(element);   // element 
can be null
+                       }
+               } catch (IndexOutOfBoundsException e) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+       }
+
+       @Override
+       public boolean equalToReference(Row candidate) {
+               int i = 0;
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               TypeComparator<Object> comparator = 
comparators[i];
+                               Object element = 
candidate.getField(keyPositions[i]);   // element can be null
+                               // check if reference is not equal
+                               if (!comparator.equalToReference(element)) {
+                                       return false;
+                               }
+                       }
+               } catch (IndexOutOfBoundsException e) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+               return true;
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<Row> referencedComparator) 
{
+               RowComparator other = (RowComparator) referencedComparator;
+               int i = 0;
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               int cmp = 
comparators[i].compareToReference(other.comparators[i]);
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+               } catch (IndexOutOfBoundsException e) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+               return 0;
+       }
+
+       @Override
+       public int compare(Row first, Row second) {
+               int i = 0;
+               try {
+                       for (; i < keyPositions.length; i++) {
+                               int keyPos = keyPositions[i];
+                               TypeComparator<Object> comparator = 
comparators[i];
+                               Object firstElement = first.getField(keyPos);   
// element can be null
+                               Object secondElement = second.getField(keyPos); 
// element can be null
+
+                               int cmp = comparator.compare(firstElement, 
secondElement);
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+               } catch (IndexOutOfBoundsException e) {
+                       throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+               }
+               return 0;
+       }
+
+       @Override
+       public int compareSerialized(
+               DataInputView firstSource,
+               DataInputView secondSource) throws IOException {
+
+               int len = serializers.length;
+               int keyLen = keyPositions.length;
+
+               readIntoNullMask(arity, firstSource, nullMask1);
+               readIntoNullMask(arity, secondSource, nullMask2);
+
+               // deserialize
+               for (int i = 0; i < len; i++) {
+                       TypeSerializer<Object> serializer = serializers[i];
+
+                       // deserialize field 1
+                       if (!nullMask1[i]) {
+                               deserializedKeyFields1[i] = 
serializer.deserialize(
+                                       deserializedKeyFields1[i],
+                                       firstSource);
+                       }
+
+                       // deserialize field 2
+                       if (!nullMask2[i]) {
+                               deserializedKeyFields2[i] = 
serializer.deserialize(
+                                       deserializedKeyFields2[i],
+                                       secondSource);
+                       }
+               }
+
+               // compare
+               for (int i = 0; i < keyLen; i++) {
+                       int keyPos = keyPositions[i];
+                       TypeComparator<Object> comparator = comparators[i];
+
+                       boolean isNull1 = nullMask1[keyPos];
+                       boolean isNull2 = nullMask2[keyPos];
+
+                       int cmp = 0;
+                       // both values are null -> equality
+                       if (isNull1 && isNull2) {
+                               cmp = 0;
+                       }
+                       // first value is null -> inequality
+                       else if (isNull1) {
+                               cmp = comparator.compare(null, 
deserializedKeyFields2[keyPos]);
+                       }
+                       // second value is null -> inequality
+                       else if (isNull2) {
+                               cmp = 
comparator.compare(deserializedKeyFields1[keyPos], null);
+                       }
+                       // no null values
+                       else {
+                               cmp = comparator.compare(
+                                       deserializedKeyFields1[keyPos],
+                                       deserializedKeyFields2[keyPos]);
+                       }
+
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return 0;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return numLeadingNormalizableKeys > 0;
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return normalizableKeyPrefixLen;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return numLeadingNormalizableKeys < keyPositions.length ||
+                               normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+                               normalizableKeyPrefixLen > keyBytes;
+       }
+
+       @Override
+       public void putNormalizedKey(Row record, MemorySegment target, int 
offset, int numBytes) {
+               int bytesLeft = numBytes;
+               int currentOffset = offset;
+
+               for (int i = 0; i < numLeadingNormalizableKeys && bytesLeft > 
0; i++) {
+                       int len = normalizedKeyLengths[i];
+                       len = bytesLeft >= len ? len : bytesLeft;
+
+                       TypeComparator<Object> comparator = comparators[i];
+                       Object element = record.getField(keyPositions[i]);  // 
element can be null
+                       // write key
+                       comparator.putNormalizedKey(element, target, 
currentOffset, len);
+
+                       bytesLeft -= len;
+                       currentOffset += len;
+               }
+
+       }
+
+       @Override
+       public void writeWithKeyNormalization(Row record, DataOutputView 
target) throws IOException {
+               throw new UnsupportedOperationException(
+                       "Record serialization with leading normalized keys not 
supported.");
+       }
+
+       @Override
+       public Row readWithKeyDenormalization(Row reuse, DataInputView source) 
throws IOException {
+               throw new UnsupportedOperationException(
+                       "Record deserialization with leading normalized keys 
not supported.");
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return invertNormKey;
+       }
+
+       @Override
+       public TypeComparator<Row> duplicate() {
+               NullAwareComparator<?>[] comparatorsCopy = new 
NullAwareComparator<?>[comparators.length];
+               for (int i = 0; i < comparators.length; i++) {
+                       comparatorsCopy[i] = (NullAwareComparator<?>) 
comparators[i].duplicate();
+               }
+
+               TypeSerializer<?>[] serializersCopy = new 
TypeSerializer<?>[serializers.length];
+               for (int i = 0; i < serializers.length; i++) {
+                       serializersCopy[i] = serializers[i].duplicate();
+               }
+
+               return new RowComparator(
+                       arity,
+                       keyPositions,
+                       (NullAwareComparator<Object>[]) comparatorsCopy,
+                       (TypeSerializer<Object>[]) serializersCopy,
+                       normalizedKeyLengths,
+                       numLeadingNormalizableKeys,
+                       normalizableKeyPrefixLen,
+                       invertNormKey);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               int len = comparators.length;
+               int localIndex = index;
+               for (int i = 0; i < len; i++) {
+                       Object element = ((Row) 
record).getField(keyPositions[i]);  // element can be null
+                       localIndex += comparators[i].extractKeys(element, 
target, localIndex);
+               }
+               return localIndex - index;
+       }
+
+
+       private Object[] instantiateDeserializationFields() {
+               Object[] newFields = new Object[serializers.length];
+               for (int i = 0; i < serializers.length; i++) {
+                       newFields[i] = serializers[i].createInstance();
+               }
+               return newFields;
+       }
+
+       /**
+        * @return creates auxiliary fields for normalized key support
+        */
+       private static Tuple4<int[], Integer, Integer, Boolean>
+       createAuxiliaryFields(int[] keyPositions, NullAwareComparator<Object>[] 
comparators) {
+
+               int[] normalizedKeyLengths = new int[keyPositions.length];
+               int numLeadingNormalizableKeys = 0;
+               int normalizableKeyPrefixLen = 0;
+               boolean inverted = false;
+
+               for (int i = 0; i < keyPositions.length; i++) {
+                       NullAwareComparator<Object> k = comparators[i];
+                       // as long as the leading keys support normalized keys, 
we can build up the composite key
+                       if (k.supportsNormalizedKey()) {
+                               if (i == 0) {
+                                       // the first comparator decides whether 
we need to invert the key direction
+                                       inverted = k.invertNormalizedKey();
+                               } else if (k.invertNormalizedKey() != inverted) 
{
+                                       // if a successor does not agree on the 
inversion direction, it cannot be part of the
+                                       // normalized key
+                                       return new Tuple4<>(
+                                               normalizedKeyLengths,
+                                               numLeadingNormalizableKeys,
+                                               normalizableKeyPrefixLen,
+                                               inverted);
+                               }
+                               numLeadingNormalizableKeys++;
+                               int len = k.getNormalizeKeyLen();
+                               if (len < 0) {
+                                       throw new RuntimeException(
+                                               "Comparator " + 
k.getClass().getName() +
+                                               " specifies an invalid length 
for the normalized key: " + len);
+                               }
+                               normalizedKeyLengths[i] = len;
+                               normalizableKeyPrefixLen += len;
+                               if (normalizableKeyPrefixLen < 0) {
+                                       // overflow, which means we are out of 
budget for normalized key space anyways
+                                       return new Tuple4<>(
+                                               normalizedKeyLengths,
+                                               numLeadingNormalizableKeys,
+                                               Integer.MAX_VALUE,
+                                               inverted);
+                               }
+                       } else {
+                               return new Tuple4<>(
+                                       normalizedKeyLengths,
+                                       numLeadingNormalizableKeys,
+                                       normalizableKeyPrefixLen,
+                                       inverted);
+                       }
+               }
+               return new Tuple4<>(
+                       normalizedKeyLengths,
+                       numLeadingNormalizableKeys,
+                       normalizableKeyPrefixLen,
+                       inverted);
+       }
+
+       private static NullAwareComparator<Object>[] makeNullAware(
+               TypeComparator<Object>[] comparators,
+               boolean[] orders) {
+
+               checkArgument(comparators.length == orders.length);
+               NullAwareComparator<?>[] result = new 
NullAwareComparator<?>[comparators.length];
+               for (int i = 0; i < comparators.length; i++) {
+                       result[i] = new 
NullAwareComparator<Object>(comparators[i], orders[i]);
+               }
+               return (NullAwareComparator<Object>[]) result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
new file mode 100644
index 0000000..5457c05
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializer for {@link Row}.
+ */
+public class RowSerializer extends TypeSerializer<Row> {
+
+       private static final long serialVersionUID = 1L;
+       private final boolean[] nullMask;
+       private final TypeSerializer<Object>[] fieldSerializers;
+
+       public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
+               this.fieldSerializers = (TypeSerializer<Object>[]) 
checkNotNull(fieldSerializers);
+               this.nullMask = new boolean[fieldSerializers.length];
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<Row> duplicate() {
+               boolean stateful = false;
+               TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
+
+               for (int i = 0; i < fieldSerializers.length; i++) {
+                       duplicateFieldSerializers[i] = 
fieldSerializers[i].duplicate();
+                       if (duplicateFieldSerializers[i] != 
fieldSerializers[i]) {
+                               // at least one of them is stateful
+                               stateful = true;
+                       }
+               }
+
+               if (stateful) {
+                       return new RowSerializer(duplicateFieldSerializers);
+               } else {
+                       return this;
+               }
+       }
+
+       @Override
+       public Row createInstance() {
+               return new Row(fieldSerializers.length);
+       }
+
+       @Override
+       public Row copy(Row from) {
+               int len = fieldSerializers.length;
+
+               if (from.getArity() != len) {
+                       throw new RuntimeException("Row arity of from does not 
match serializers.");
+               }
+
+               Row result = new Row(len);
+               for (int i = 0; i < len; i++) {
+                       Object fromField = from.getField(i);
+                       if (fromField != null) {
+                               Object copy = 
fieldSerializers[i].copy(fromField);
+                               result.setField(i, copy);
+                       } else {
+                               result.setField(i, null);
+                       }
+               }
+               return result;
+       }
+
+       @Override
+       public Row copy(Row from, Row reuse) {
+               int len = fieldSerializers.length;
+
+               // cannot reuse, do a non-reuse copy
+               if (reuse == null) {
+                       return copy(from);
+               }
+
+               if (from.getArity() != len || reuse.getArity() != len) {
+                       throw new RuntimeException(
+                               "Row arity of reuse or from is incompatible 
with this RowSerializer.");
+               }
+
+               for (int i = 0; i < len; i++) {
+                       Object fromField = from.getField(i);
+                       if (fromField != null) {
+                               Object reuseField = reuse.getField(i);
+                               if (reuseField != null) {
+                                       Object copy = 
fieldSerializers[i].copy(fromField, reuseField);
+                                       reuse.setField(i, copy);
+                               } else {
+                                       Object copy = 
fieldSerializers[i].copy(fromField);
+                                       reuse.setField(i, copy);
+                               }
+                       } else {
+                               reuse.setField(i, null);
+                       }
+               }
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(Row record, DataOutputView target) throws 
IOException {
+               int len = fieldSerializers.length;
+
+               if (record.getArity() != len) {
+                       throw new RuntimeException("Row arity of from does not 
match serializers.");
+               }
+
+               // write a null mask
+               writeNullMask(len, record, target);
+
+               // serialize non-null fields
+               for (int i = 0; i < len; i++) {
+                       Object o = record.getField(i);
+                       if (o != null) {
+                               fieldSerializers[i].serialize(o, target);
+                       }
+               }
+       }
+
+
+       @Override
+       public Row deserialize(DataInputView source) throws IOException {
+               int len = fieldSerializers.length;
+
+               Row result = new Row(len);
+
+               // read null mask
+               readIntoNullMask(len, source, nullMask);
+
+               for (int i = 0; i < len; i++) {
+                       if (nullMask[i]) {
+                               result.setField(i, null);
+                       } else {
+                               result.setField(i, 
fieldSerializers[i].deserialize(source));
+                       }
+               }
+
+               return result;
+       }
+
+       @Override
+       public Row deserialize(Row reuse, DataInputView source) throws 
IOException {
+               int len = fieldSerializers.length;
+
+               if (reuse.getArity() != len) {
+                       throw new RuntimeException("Row arity of from does not 
match serializers.");
+               }
+
+               // read null mask
+               readIntoNullMask(len, source, nullMask);
+
+               for (int i = 0; i < len; i++) {
+                       if (nullMask[i]) {
+                               reuse.setField(i, null);
+                       } else {
+                               Object reuseField = reuse.getField(i);
+                               if (reuseField != null) {
+                                       reuse.setField(i, 
fieldSerializers[i].deserialize(reuseField, source));
+                               } else {
+                                       reuse.setField(i, 
fieldSerializers[i].deserialize(source));
+                               }
+                       }
+               }
+
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int len = fieldSerializers.length;
+
+               // copy null mask
+               readIntoAndCopyNullMask(len, source, target, nullMask);
+
+               for (int i = 0; i < len; i++) {
+                       if (!nullMask[i]) {
+                               fieldSerializers[i].copy(source, target);
+                       }
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (canEqual(obj)) {
+                       RowSerializer other = (RowSerializer) obj;
+                       if (this.fieldSerializers.length == 
other.fieldSerializers.length) {
+                               for (int i = 0; i < 
this.fieldSerializers.length; i++) {
+                                       if 
(!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
+                                               return false;
+                                       }
+                               }
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof RowSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(fieldSerializers);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java 
b/flink-core/src/main/java/org/apache/flink/types/Row.java
new file mode 100644
index 0000000..6825b71
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row can have arbitrary number of fields and contain a set of fields, 
which may all be
+ * different types. The fields in Row can be null. Due to Row is not strongly 
typed, Flink's
+ * type extraction mechanism can't extract correct field types. So that users 
should manually
+ * tell Flink the type information via creating a {@link RowTypeInfo}.
+ *
+ * <p>
+ * The fields in the Row can be accessed by position (zero-based) {@link 
#getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * <p>
+ * Row is in principle serializable. However, it may contain non-serializable 
fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+       private static final long serialVersionUID = 1L;
+
+       /** The array to store actual values. */
+       private final Object[] fields;
+
+       /**
+        * Create a new Row instance.
+        * @param arity The number of fields in the Row
+        */
+       public Row(int arity) {
+               this.fields = new Object[arity];
+       }
+
+       /**
+        * Get the number of fields in the Row.
+        * @return The number of fields in the Row.
+        */
+       public int getArity() {
+               return fields.length;
+       }
+
+       /**
+        * Gets the field at the specified position.
+        * @param pos The position of the field, 0-based.
+        * @return The field at the specified position.
+        * @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+        */
+       public Object getField(int pos) {
+               return fields[pos];
+       }
+
+       /**
+        * Sets the field at the specified position.
+        *
+        * @param pos The position of the field, 0-based.
+        * @param value The value to be assigned to the field at the specified 
position.
+        * @throws IndexOutOfBoundsException Thrown, if the position is 
negative, or equal to, or larger than the number of fields.
+        */
+       public void setField(int pos, Object value) {
+               fields[pos] = value;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               for (int i = 0; i < fields.length; i++) {
+                       if (i > 0) {
+                               sb.append(",");
+                       }
+                       sb.append(StringUtils.arrayAwareToString(fields[i]));
+               }
+               return sb.toString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               Row row = (Row) o;
+
+               return Arrays.equals(fields, row.fields);
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(fields);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
new file mode 100644
index 0000000..8de7bf7
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class RowTypeInfoTest {
+
+       @Test
+       public void testRowTypeInfoEquality() {
+               RowTypeInfo typeInfo1 = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowTypeInfo typeInfo2 = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               assertEquals(typeInfo1, typeInfo2);
+               assertEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+       }
+
+       @Test
+       public void testRowTypeInfoInequality() {
+               RowTypeInfo typeInfo1 = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               RowTypeInfo typeInfo2 = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+               assertNotEquals(typeInfo1, typeInfo2);
+               assertNotEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+       }
+
+       @Test
+       public void testNestedRowTypeInfo() {
+               RowTypeInfo typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       new RowTypeInfo(
+                               BasicTypeInfo.SHORT_TYPE_INFO,
+                           BasicTypeInfo.BIG_DEC_TYPE_INFO
+                       ),
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               assertEquals("Row(f0: Short, f1: BigDecimal)", 
typeInfo.getTypeAt("f1").toString());
+               assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
new file mode 100644
index 0000000..ca54bd4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowComparatorTest extends ComparatorTestBase<Row> {
+
+       private static final RowTypeInfo typeInfo = new RowTypeInfo(
+               BasicTypeInfo.INT_TYPE_INFO,
+               BasicTypeInfo.DOUBLE_TYPE_INFO,
+               BasicTypeInfo.STRING_TYPE_INFO,
+               new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                       BasicTypeInfo.SHORT_TYPE_INFO
+               ),
+               TypeExtractor.createTypeInfo(MyPojo.class));
+
+       private static MyPojo testPojo1 = new MyPojo();
+       private static MyPojo testPojo2 = new MyPojo();
+       private static MyPojo testPojo3 = new MyPojo();
+
+       private static final Row[] data = new Row[]{
+               createRow(null, null, null, null, null),
+               createRow(0, null, null, null, null),
+               createRow(0, 0.0, null, null, null),
+               createRow(0, 0.0, "a", null, null),
+               createRow(1, 0.0, "a", null, null),
+               createRow(1, 1.0, "a", null, null),
+               createRow(1, 1.0, "b", null, null),
+               createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+               createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+               createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+               createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+               createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), 
testPojo1),
+               createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), 
testPojo2),
+               createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), 
testPojo3)
+       };
+
+       @BeforeClass
+       public static void init() {
+               // TODO we cannot test null here as PojoComparator has no 
support for null keys
+               testPojo1.name = "";
+               testPojo2.name = "Test1";
+               testPojo3.name = "Test2";
+       }
+
+       @Override
+       protected void deepEquals(String message, Row should, Row is) {
+               int arity = should.getArity();
+               assertEquals(message, arity, is.getArity());
+               for (int i = 0; i < arity; i++) {
+                       Object copiedValue = should.getField(i);
+                       Object element = is.getField(i);
+                       assertEquals(message, element, copiedValue);
+               }
+       }
+
+       @Override
+       protected TypeComparator<Row> createComparator(boolean ascending) {
+               return typeInfo.createComparator(
+                       new int[] {0, 1, 2, 3, 4, 5, 6},
+                   new boolean[] {ascending, ascending, ascending, ascending, 
ascending, ascending, ascending},
+                   0,
+                   new ExecutionConfig());
+       }
+
+       @Override
+       protected TypeSerializer<Row> createSerializer() {
+               return typeInfo.createSerializer(new ExecutionConfig());
+       }
+
+       @Override
+       protected Row[] getSortedTestData() {
+               return data;
+       }
+
+       @Override
+       protected boolean supportsNullKeys() {
+               return true;
+       }
+
+       private static Row createRow(Object f0, Object f1, Object f2, Object 
f3, Object f4) {
+               Row row = new Row(5);
+               row.setField(0, f0);
+               row.setField(1, f1);
+               row.setField(2, f2);
+               row.setField(3, f3);
+               row.setField(4, f4);
+               return row;
+       }
+
+       public static class MyPojo implements Serializable, Comparable<MyPojo> {
+               // we cannot use null because the PojoComparator does not 
support null properly
+               public String name = "";
+
+               @Override
+               public int compareTo(MyPojo o) {
+                       if (name == null && o.name == null) {
+                               return 0;
+                       } else if (name == null) {
+                               return -1;
+                       } else if (o.name == null) {
+                               return 1;
+                       } else {
+                               return name.compareTo(o.name);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       MyPojo myPojo = (MyPojo) o;
+
+                       return name != null ? name.equals(myPojo.name) : 
myPojo.name == null;
+
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
new file mode 100644
index 0000000..d0fdbd6
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link RowComparator} for wide rows.
+ */
+public class RowComparatorWithManyFieldsTests extends ComparatorTestBase<Row> {
+
+       private static final int numberOfFields = 10;
+       private static RowTypeInfo typeInfo;
+       private static final Row[] data = new Row[]{
+               createRow(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", 
"j0"),
+               createRow("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", 
"j1"),
+               createRow("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", 
"j2"),
+               createRow("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", 
"j3")
+       };
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               TypeInformation<?>[] fieldTypes = new 
TypeInformation[numberOfFields];
+               for (int i = 0; i < numberOfFields; i++) {
+                       fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
+               }
+               typeInfo = new RowTypeInfo(fieldTypes);
+
+       }
+
+       @Override
+       protected void deepEquals(String message, Row should, Row is) {
+               int arity = should.getArity();
+               assertEquals(message, arity, is.getArity());
+               for (int i = 0; i < arity; i++) {
+                       Object copiedValue = should.getField(i);
+                       Object element = is.getField(i);
+                       assertEquals(message, element, copiedValue);
+               }
+       }
+
+       @Override
+       protected TypeComparator<Row> createComparator(boolean ascending) {
+               return typeInfo.createComparator(
+                       new int[]{0},
+                       new boolean[]{ascending},
+                       0,
+                       new ExecutionConfig());
+       }
+
+       @Override
+       protected TypeSerializer<Row> createSerializer() {
+               return typeInfo.createSerializer(new ExecutionConfig());
+       }
+
+       @Override
+       protected Row[] getSortedTestData() {
+               return data;
+       }
+
+       @Override
+       protected boolean supportsNullKeys() {
+               return true;
+       }
+
+       private static Row createRow(Object... values) {
+               checkNotNull(values);
+               checkArgument(values.length == numberOfFields);
+               Row row = new Row(numberOfFields);
+               for (int i = 0; i < values.length; i++) {
+                       row.setField(i, values[i]);
+               }
+               return row;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
new file mode 100644
index 0000000..d08d68a
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowSerializerTest {
+
+       @Test
+       public void testRowSerializer() {
+               TypeInformation<Row> typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+               Row row1 = new Row(2);
+               row1.setField(0, 1);
+               row1.setField(1, "a");
+
+               Row row2 = new Row(2);
+               row2.setField(0, 2);
+               row2.setField(1, null);
+
+               TypeSerializer<Row> serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               RowSerializerTestInstance instance = new 
RowSerializerTestInstance(serializer, row1, row2);
+               instance.testAll();
+       }
+
+       @Test
+       public void testLargeRowSerializer() {
+               TypeInformation<Row> typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO);
+
+               Row row = new Row(13);
+               row.setField(0, 2);
+               row.setField(1, null);
+               row.setField(3, null);
+               row.setField(4, null);
+               row.setField(5, null);
+               row.setField(6, null);
+               row.setField(7, null);
+               row.setField(8, null);
+               row.setField(9, null);
+               row.setField(10, null);
+               row.setField(11, null);
+               row.setField(12, "Test");
+
+               TypeSerializer<Row> serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               RowSerializerTestInstance testInstance = new 
RowSerializerTestInstance(serializer, row);
+               testInstance.testAll();
+       }
+
+       @Test
+       public void testRowSerializerWithComplexTypes() {
+               TypeInformation<Row> typeInfo = new RowTypeInfo(
+                       BasicTypeInfo.INT_TYPE_INFO,
+                       BasicTypeInfo.DOUBLE_TYPE_INFO,
+                       BasicTypeInfo.STRING_TYPE_INFO,
+                       new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.BOOLEAN_TYPE_INFO,
+                               BasicTypeInfo.SHORT_TYPE_INFO),
+                       TypeExtractor.createTypeInfo(MyPojo.class));
+
+               MyPojo testPojo1 = new MyPojo();
+               testPojo1.name = null;
+               MyPojo testPojo2 = new MyPojo();
+               testPojo2.name = "Test1";
+               MyPojo testPojo3 = new MyPojo();
+               testPojo3.name = "Test2";
+
+               Row[] data = new Row[]{
+                       createRow(null, null, null, null, null),
+                       createRow(0, null, null, null, null),
+                       createRow(0, 0.0, null, null, null),
+                       createRow(0, 0.0, "a", null, null),
+                       createRow(1, 0.0, "a", null, null),
+                       createRow(1, 1.0, "a", null, null),
+                       createRow(1, 1.0, "b", null, null),
+                       createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 
2), null),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 
2), null),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 
2), null),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 
3), null),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 
3), testPojo1),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 
3), testPojo2),
+                       createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 
3), testPojo3)
+               };
+
+               TypeSerializer<Row> serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+               RowSerializerTestInstance testInstance = new 
RowSerializerTestInstance(serializer, data);
+               testInstance.testAll();
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+
+       private static Row createRow(Object f0, Object f1, Object f2, Object 
f3, Object f4) {
+               Row row = new Row(5);
+               row.setField(0, f0);
+               row.setField(1, f1);
+               row.setField(2, f2);
+               row.setField(3, f3);
+               row.setField(4, f4);
+               return row;
+       }
+
+
+       private class RowSerializerTestInstance extends 
SerializerTestInstance<Row> {
+
+               RowSerializerTestInstance(
+                       TypeSerializer<Row> serializer,
+                       Row... testData) {
+                       super(serializer, Row.class, -1, testData);
+               }
+
+               @Override
+               protected void deepEquals(String message, Row should, Row is) {
+                       int arity = should.getArity();
+                       assertEquals(message, arity, is.getArity());
+                       for (int i = 0; i < arity; i++) {
+                               Object copiedValue = should.getField(i);
+                               Object element = is.getField(i);
+                               assertEquals(message, element, copiedValue);
+                       }
+               }
+       }
+
+       public static class MyPojo implements Serializable, Comparable<MyPojo> {
+               public String name = null;
+
+               @Override
+               public int compareTo(MyPojo o) {
+                       if (name == null && o.name == null) {
+                               return 0;
+                       } else if (name == null) {
+                               return -1;
+                       } else if (o.name == null) {
+                               return 1;
+                       } else {
+                               return name.compareTo(o.name);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       MyPojo myPojo = (MyPojo) o;
+
+                       return name != null ? name.equals(myPojo.name) : 
myPojo.name == null;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java 
b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
new file mode 100644
index 0000000..35ba32d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowTest {
+       @Test
+       public void testRowToString() {
+               Row row = new Row(5);
+               row.setField(0, 1);
+               row.setField(1, "hello");
+               row.setField(2, null);
+               row.setField(3, new Tuple2<>(2, "hi"));
+               row.setField(4, "hello world");
+
+               assertEquals("1,hello,null,(2,hi),hello world", row.toString());
+       }
+}

Reply via email to