[FLINK-3695] [gelly] ValueArray types

Provide compact and efficiently serializable and comparable array
implementations for Flink mutable Value types and Java primitives.

This cloeses #3382


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f46e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f46e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f46e7

Branch: refs/heads/master
Commit: 963f46e7179db034fd1d444469f4af58eac87409
Parents: 43158a8
Author: Greg Hogan <[email protected]>
Authored: Tue Feb 21 11:40:22 2017 -0500
Committer: Greg Hogan <[email protected]>
Committed: Fri Mar 31 11:14:36 2017 -0400

----------------------------------------------------------------------
 flink-libraries/flink-gelly/pom.xml             |  12 +-
 .../graph/types/valuearray/IntValueArray.java   | 398 ++++++++++++++
 .../valuearray/IntValueArrayComparator.java     | 156 ++++++
 .../valuearray/IntValueArraySerializer.java     |  85 +++
 .../graph/types/valuearray/LongValueArray.java  | 399 ++++++++++++++
 .../valuearray/LongValueArrayComparator.java    | 156 ++++++
 .../valuearray/LongValueArraySerializer.java    |  85 +++
 .../graph/types/valuearray/NullValueArray.java  | 267 ++++++++++
 .../valuearray/NullValueArrayComparator.java    | 147 ++++++
 .../valuearray/NullValueArraySerializer.java    |  85 +++
 .../types/valuearray/StringValueArray.java      | 518 +++++++++++++++++++
 .../valuearray/StringValueArrayComparator.java  | 217 ++++++++
 .../valuearray/StringValueArraySerializer.java  |  85 +++
 .../graph/types/valuearray/ValueArray.java      |  97 ++++
 .../types/valuearray/ValueArrayFactory.java     |  81 +++
 .../types/valuearray/ValueArrayTypeInfo.java    | 159 ++++++
 .../valuearray/ValueArrayTypeInfoFactory.java   |  41 ++
 .../valuearray/IntValueArrayComparatorTest.java |  51 ++
 .../valuearray/IntValueArraySerializerTest.java |  93 ++++
 .../types/valuearray/IntValueArrayTest.java     | 123 +++++
 .../LongValueArrayComparatorTest.java           |  51 ++
 .../LongValueArraySerializerTest.java           |  93 ++++
 .../types/valuearray/LongValueArrayTest.java    | 123 +++++
 .../NullValueArrayComparatorTest.java           |  51 ++
 .../NullValueArraySerializerTest.java           |  68 +++
 .../types/valuearray/NullValueArrayTest.java    |  80 +++
 .../StringValueArrayComparatorTest.java         |  51 ++
 .../StringValueArraySerializerTest.java         |  93 ++++
 .../types/valuearray/StringValueArrayTest.java  | 168 ++++++
 .../valuearray/ValueArrayTypeInfoTest.java      |  64 +++
 30 files changed, 4095 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/pom.xml 
b/flink-libraries/flink-gelly/pom.xml
index f773c70..fa09102 100644
--- a/flink-libraries/flink-gelly/pom.xml
+++ b/flink-libraries/flink-gelly/pom.xml
@@ -28,7 +28,7 @@ under the License.
                <version>1.3-SNAPSHOT</version>
                <relativePath>..</relativePath>
        </parent>
-       
+
        <artifactId>flink-gelly_2.10</artifactId>
        <name>flink-gelly</name>
 
@@ -59,7 +59,7 @@ under the License.
                </dependency>
 
                <!-- test dependencies -->
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils_2.10</artifactId>
@@ -69,6 +69,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-optimizer_2.10</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
new file mode 100644
index 0000000..0e3812d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link IntValue}.
+ */
+public class IntValueArray
+implements ValueArray<IntValue> {
+
+       protected static final int ELEMENT_LENGTH_IN_BYTES = 4;
+
+       protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+       // see note in ArrayList, HashTable, ...
+       private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+       private boolean isBounded;
+
+       private int[] data;
+
+       // the number of elements currently stored
+       private int position;
+
+       // location of the bookmark used by mark() and reset()
+       private transient int mark;
+
+       // hasher used to generate the normalized key
+       private Murmur3_32 hash = new Murmur3_32(0x11d2d865);
+
+       // hash result stored as normalized key
+       private IntValue hashValue = new IntValue();
+
+       /**
+        * Initializes an expandable array with default capacity.
+        */
+       public IntValueArray() {
+               isBounded = false;
+               initialize(DEFAULT_CAPACITY_IN_BYTES);
+       }
+
+       /**
+        * Initializes a fixed-size array with the provided number of bytes.
+        *
+        * @param bytes number of bytes of the encapsulated array
+        */
+       public IntValueArray(int bytes) {
+               isBounded = true;
+               initialize(bytes);
+       }
+
+       /**
+        * Initializes the array with the provided number of bytes.
+        *
+        * @param bytes initial size of the encapsulated array in bytes
+        */
+       private void initialize(int bytes) {
+               int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+               Preconditions.checkArgument(capacity > 0, "Requested array with 
zero capacity");
+               Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, 
"Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+               data = new int[capacity];
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * If the size of the array is insufficient to hold the given capacity 
then
+        * copy the array into a new, larger array.
+        *
+        * @param minCapacity minimum required number of elements
+        */
+       private void ensureCapacity(int minCapacity) {
+               long currentCapacity = data.length;
+
+               if (minCapacity <= currentCapacity) {
+                       return;
+               }
+
+               // increase capacity by at least ~50%
+               long expandedCapacity = Math.max(minCapacity, currentCapacity + 
(currentCapacity >> 1));
+               int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, 
expandedCapacity);
+
+               if (newCapacity < minCapacity) {
+                       // throw exception as unbounded arrays are not expected 
to fill
+                       throw new RuntimeException("Requested array size " + 
minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+               }
+
+               data = Arrays.copyOf(data, newCapacity);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("[");
+               for (int idx = 0 ; idx < this.position ; idx++) {
+                       sb.append(data[idx]);
+                       if (idx < position - 1) {
+                               sb.append(",");
+                       }
+               }
+               sb.append("]");
+
+               return sb.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Iterable
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ReadIterator iterator = new ReadIterator();
+
+       @Override
+       public Iterator<IntValue> iterator() {
+               iterator.reset();
+               return iterator;
+       }
+
+       private class ReadIterator
+       implements Iterator<IntValue> {
+               private IntValue value = new IntValue();
+
+               private int pos;
+
+               @Override
+               public boolean hasNext() {
+                       return pos < position;
+               }
+
+               @Override
+               public IntValue next() {
+                       value.setValue(data[pos++]);
+                       return value;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("remove");
+               }
+
+               public void reset() {
+                       pos = 0;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // IOReadableWritable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(position);
+
+               for (int i = 0 ; i < position ; i++) {
+                       out.writeInt(data[i]);
+               }
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               position = in.readInt();
+               mark = 0;
+
+               ensureCapacity(position);
+
+               for (int i = 0 ; i < position ; i++) {
+                       data[i] = in.readInt();
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // NormalizableKey
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getMaxNormalizedKeyLen() {
+               return hashValue.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public void copyNormalizedKey(MemorySegment target, int offset, int 
len) {
+               hash.reset();
+
+               hash.hash(position);
+               for (int i = 0 ; i < position ; i++) {
+                       hash.hash(data[i]);
+               }
+
+               hashValue.setValue(hash.hash());
+               hashValue.copyNormalizedKey(target, offset, len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Comparable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int compareTo(ValueArray<IntValue> o) {
+               IntValueArray other = (IntValueArray) o;
+
+               int min = Math.min(position, other.position);
+               for (int i = 0 ; i < min ; i++) {
+                       int cmp = Integer.compare(data[i], other.data[i]);
+
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return Integer.compare(position, other.position);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Key
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               int hash = 1;
+
+               for (int i = 0 ; i < position ; i++) {
+                       hash = 31 * hash + data[i];
+               }
+
+               return hash;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof IntValueArray) {
+                       IntValueArray other = (IntValueArray) obj;
+
+                       if (position != other.position) {
+                               return false;
+                       }
+
+                       for (int i = 0 ; i < position ; i++) {
+                               if (data[i] != other.data[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               }
+
+               return false;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ResettableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void setValue(ValueArray<IntValue> value) {
+               value.copyTo(this);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // CopyableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getBinaryLength() {
+               return -1;
+       }
+
+       @Override
+       public void copyTo(ValueArray<IntValue> target) {
+               IntValueArray other = (IntValueArray) target;
+
+               other.position = position;
+               other.mark = mark;
+
+               other.ensureCapacity(position);
+               System.arraycopy(data, 0, other.data, 0, position);
+       }
+
+       @Override
+       public ValueArray<IntValue> copy() {
+               ValueArray<IntValue> copy = new IntValueArray();
+
+               this.copyTo(copy);
+
+               return copy;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               copyInternal(source, target);
+       }
+
+       protected static void copyInternal(DataInputView source, DataOutputView 
target) throws IOException {
+               int count = source.readInt();
+               target.writeInt(count);
+
+               int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+               target.write(source, bytes);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ValueArray
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int size() {
+               return position;
+       }
+
+       @Override
+       public boolean isFull() {
+               if (isBounded) {
+                       return position == data.length;
+               } else {
+                       return position == MAX_ARRAY_SIZE;
+               }
+       }
+
+       @Override
+       public boolean add(IntValue value) {
+               int newPosition = position + 1;
+
+               if (newPosition > data.length) {
+                       if (isBounded) {
+                               return false;
+                       } else {
+                               ensureCapacity(newPosition);
+                       }
+               }
+
+               data[position] = value.getValue();
+               position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public boolean addAll(ValueArray<IntValue> other) {
+               IntValueArray source = (IntValueArray) other;
+
+               int sourceSize = source.position;
+               int newPosition = position + sourceSize;
+
+               if (newPosition > data.length) {
+                       if (isBounded) {
+                               return false;
+                       } else {
+                               ensureCapacity(newPosition);
+                       }
+               }
+
+               System.arraycopy(source.data, 0, data, position, sourceSize);
+               position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public void clear() {
+               position = 0;
+       }
+
+       @Override
+       public void mark() {
+               mark = position;
+       }
+
+       @Override
+       public void reset() {
+               position = mark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
new file mode 100644
index 0000000..bbc9bc5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for IntValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class IntValueArrayComparator extends TypeComparator<IntValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final boolean ascendingComparison;
+
+       private final IntValueArray reference = new IntValueArray();
+
+       private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
+
+       public IntValueArrayComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+       }
+
+       @Override
+       public int hash(IntValueArray record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(IntValueArray toCompare) {
+               toCompare.copyTo(reference);
+       }
+
+       @Override
+       public boolean equalToReference(IntValueArray candidate) {
+               return candidate.equals(this.reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<IntValueArray> 
referencedComparator) {
+               int comp = ((IntValueArrayComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(IntValueArray first, IntValueArray second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int firstCount = firstSource.readInt();
+               int secondCount = secondSource.readInt();
+
+               int minCount = Math.min(firstCount, secondCount);
+               while (minCount-- > 0) {
+                       int firstValue = firstSource.readInt();
+                       int secondValue = secondSource.readInt();
+
+                       int cmp = Integer.compare(firstValue, secondValue);
+                       if (cmp != 0) {
+                               return ascendingComparison ? cmp : -cmp;
+                       }
+               }
+
+               int cmp = Integer.compare(firstCount, secondCount);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return 
NormalizableKey.class.isAssignableFrom(IntValueArray.class);
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return reference.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(IntValueArray record, MemorySegment 
target, int offset, int numBytes) {
+               record.copyNormalizedKey(target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public TypeComparator<IntValueArray> duplicate() {
+               return new IntValueArrayComparator(ascendingComparison);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return comparators;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(IntValueArray record, 
DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public IntValueArray readWithKeyDenormalization(IntValueArray reuse, 
DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
new file mode 100644
index 0000000..b86fe87
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code IntValueArray}.
+ */
+public final class IntValueArraySerializer extends 
TypeSerializerSingleton<IntValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public IntValueArray createInstance() {
+               return new IntValueArray();
+       }
+
+       @Override
+       public IntValueArray copy(IntValueArray from) {
+               return copy(from, new IntValueArray());
+       }
+
+       @Override
+       public IntValueArray copy(IntValueArray from, IntValueArray reuse) {
+               reuse.setValue(from);
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(IntValueArray record, DataOutputView target) 
throws IOException {
+               record.write(target);
+       }
+
+       @Override
+       public IntValueArray deserialize(DataInputView source) throws 
IOException {
+               return deserialize(new IntValueArray(), source);
+       }
+
+       @Override
+       public IntValueArray deserialize(IntValueArray reuse, DataInputView 
source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               IntValueArray.copyInternal(source, target);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof IntValueArraySerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
new file mode 100644
index 0000000..7c01e6c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link LongValue}.
+ */
+public class LongValueArray
+implements ValueArray<LongValue> {
+
+       protected static final int ELEMENT_LENGTH_IN_BYTES = 8;
+
+       protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+       // see note in ArrayList, HashTable, ...
+       private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+       private boolean isBounded;
+
+       private long[] data;
+
+       // the number of elements currently stored
+       private int position;
+
+       // location of the bookmark used by mark() and reset()
+       private transient int mark;
+
+       // hasher used to generate the normalized key
+       private Murmur3_32 hash = new Murmur3_32(0xdf099ea8);
+
+       // hash result stored as normalized key
+       private IntValue hashValue = new IntValue();
+
+       /**
+        * Initializes an expandable array with default capacity.
+        */
+       public LongValueArray() {
+               isBounded = false;
+               initialize(DEFAULT_CAPACITY_IN_BYTES);
+       }
+
+       /**
+        * Initializes a fixed-size array with the provided number of bytes.
+        *
+        * @param bytes number of bytes of the encapsulated array
+        */
+       public LongValueArray(int bytes) {
+               isBounded = true;
+               initialize(bytes);
+       }
+
+       /**
+        * Initializes the array with the provided number of bytes.
+        *
+        * @param bytes initial size of the encapsulated array in bytes
+        */
+       private void initialize(int bytes) {
+               int capacity = bytes / ELEMENT_LENGTH_IN_BYTES;
+
+               Preconditions.checkArgument(capacity > 0, "Requested array with 
zero capacity");
+               Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, 
"Requested capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+               data = new long[capacity];
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * If the size of the array is insufficient to hold the given capacity 
then
+        * copy the array into a new, larger array.
+        *
+        * @param minCapacity minimum required number of elements
+        */
+       private void ensureCapacity(int minCapacity) {
+               long currentCapacity = data.length;
+
+               if (minCapacity <= currentCapacity) {
+                       return;
+               }
+
+               // increase capacity by at least ~50%
+               long expandedCapacity = Math.max(minCapacity, currentCapacity + 
(currentCapacity >> 1));
+               int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, 
expandedCapacity);
+
+               if (newCapacity < minCapacity) {
+                       // throw exception as unbounded arrays are not expected 
to fill
+                       throw new RuntimeException("Requested array size " + 
minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+               }
+
+               data = Arrays.copyOf(data, newCapacity);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("[");
+               for (int idx = 0 ; idx < this.position ; idx++) {
+                       sb.append(data[idx]);
+                       if (idx < position - 1) {
+                               sb.append(",");
+                       }
+               }
+               sb.append("]");
+
+               return sb.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Iterable
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ReadIterator iterator = new ReadIterator();
+
+       @Override
+       public Iterator<LongValue> iterator() {
+               iterator.reset();
+               return iterator;
+       }
+
+       private class ReadIterator
+       implements Iterator<LongValue> {
+               private LongValue value = new LongValue();
+
+               private int pos;
+
+               @Override
+               public boolean hasNext() {
+                       return pos < position;
+               }
+
+               @Override
+               public LongValue next() {
+                       value.setValue(data[pos++]);
+                       return value;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("remove");
+               }
+
+               public void reset() {
+                       pos = 0;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // IOReadableWritable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(position);
+
+               for (int i = 0 ; i < position ; i++) {
+                       out.writeLong(data[i]);
+               }
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               position = in.readInt();
+               mark = 0;
+
+               ensureCapacity(position);
+
+               for (int i = 0 ; i < position ; i++) {
+                       data[i] = in.readLong();
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // NormalizableKey
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getMaxNormalizedKeyLen() {
+               return hashValue.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public void copyNormalizedKey(MemorySegment target, int offset, int 
len) {
+               hash.reset();
+
+               hash.hash(position);
+               for (int i = 0 ; i < position ; i++) {
+                       hash.hash(data[i]);
+               }
+
+               hashValue.setValue(hash.hash());
+               hashValue.copyNormalizedKey(target, offset, len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Comparable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int compareTo(ValueArray<LongValue> o) {
+               LongValueArray other = (LongValueArray) o;
+
+               int min = Math.min(position, other.position);
+               for (int i = 0 ; i < min ; i++) {
+                       int cmp = Long.compare(data[i], other.data[i]);
+
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return Integer.compare(position, other.position);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Key
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               int hash = 1;
+
+               for (int i = 0 ; i < position ; i++) {
+                       hash = 31 * hash + (int) (data[i] ^ data[i] >>> 32);
+               }
+
+               return hash;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof LongValueArray) {
+                       LongValueArray other = (LongValueArray) obj;
+
+                       if (position != other.position) {
+                               return false;
+                       }
+
+                       for (int i = 0 ; i < position ; i++) {
+                               if (data[i] != other.data[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               }
+
+               return false;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ResettableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void setValue(ValueArray<LongValue> value) {
+               value.copyTo(this);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // CopyableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getBinaryLength() {
+               return -1;
+       }
+
+       @Override
+       public void copyTo(ValueArray<LongValue> target) {
+               LongValueArray other = (LongValueArray) target;
+
+               other.position = position;
+               other.mark = mark;
+
+               other.ensureCapacity(position);
+               System.arraycopy(data, 0, other.data, 0, position);
+       }
+
+       @Override
+       public ValueArray<LongValue> copy() {
+               ValueArray<LongValue> copy = new LongValueArray();
+
+               this.copyTo(copy);
+
+               return copy;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               copyInternal(source, target);
+       }
+
+       protected static void copyInternal(DataInputView source, DataOutputView 
target) throws IOException {
+               int count = source.readInt();
+               target.writeInt(count);
+
+               int bytes = ELEMENT_LENGTH_IN_BYTES * count;
+               target.write(source, bytes);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ValueArray
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int size() {
+               return position;
+       }
+
+       @Override
+       public boolean isFull() {
+               if (isBounded) {
+                       return position == data.length;
+               } else {
+                       return position == MAX_ARRAY_SIZE;
+               }
+       }
+
+       @Override
+       public boolean add(LongValue value) {
+               int newPosition = position + 1;
+
+               if (newPosition > data.length) {
+                       if (isBounded) {
+                               return false;
+                       } else {
+                               ensureCapacity(newPosition);
+                       }
+               }
+
+               data[position] = value.getValue();
+               position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public boolean addAll(ValueArray<LongValue> other) {
+               LongValueArray source = (LongValueArray) other;
+
+               int sourceSize = source.position;
+               int newPosition = position + sourceSize;
+
+               if (newPosition > data.length) {
+                       if (isBounded) {
+                               return false;
+                       } else {
+                               ensureCapacity(newPosition);
+                       }
+               }
+
+               System.arraycopy(source.data, 0, data, position, sourceSize);
+               position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public void clear() {
+               position = 0;
+       }
+
+       @Override
+       public void mark() {
+               mark = position;
+       }
+
+       @Override
+       public void reset() {
+               position = mark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
new file mode 100644
index 0000000..26c3da2
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for LongValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class LongValueArrayComparator extends TypeComparator<LongValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final boolean ascendingComparison;
+
+       private final LongValueArray reference = new LongValueArray();
+
+       private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
+
+       public LongValueArrayComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+       }
+
+       @Override
+       public int hash(LongValueArray record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(LongValueArray toCompare) {
+               toCompare.copyTo(reference);
+       }
+
+       @Override
+       public boolean equalToReference(LongValueArray candidate) {
+               return candidate.equals(this.reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<LongValueArray> 
referencedComparator) {
+               int comp = ((LongValueArrayComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(LongValueArray first, LongValueArray second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int firstCount = firstSource.readInt();
+               int secondCount = secondSource.readInt();
+
+               int minCount = Math.min(firstCount, secondCount);
+               while (minCount-- > 0) {
+                       long firstValue = firstSource.readLong();
+                       long secondValue = secondSource.readLong();
+
+                       int cmp = Long.compare(firstValue, secondValue);
+                       if (cmp != 0) {
+                               return ascendingComparison ? cmp : -cmp;
+                       }
+               }
+
+               int cmp = Integer.compare(firstCount, secondCount);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return 
NormalizableKey.class.isAssignableFrom(LongValueArray.class);
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return reference.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(LongValueArray record, MemorySegment 
target, int offset, int numBytes) {
+               record.copyNormalizedKey(target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public TypeComparator<LongValueArray> duplicate() {
+               return new LongValueArrayComparator(ascendingComparison);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return comparators;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(LongValueArray record, 
DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public LongValueArray readWithKeyDenormalization(LongValueArray reuse, 
DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
new file mode 100644
index 0000000..95219b6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code LongValueArray}.
+ */
+public final class LongValueArraySerializer extends 
TypeSerializerSingleton<LongValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public LongValueArray createInstance() {
+               return new LongValueArray();
+       }
+
+       @Override
+       public LongValueArray copy(LongValueArray from) {
+               return copy(from, new LongValueArray());
+       }
+
+       @Override
+       public LongValueArray copy(LongValueArray from, LongValueArray reuse) {
+               reuse.setValue(from);
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(LongValueArray record, DataOutputView target) 
throws IOException {
+               record.write(target);
+       }
+
+       @Override
+       public LongValueArray deserialize(DataInputView source) throws 
IOException {
+               return deserialize(new LongValueArray(), source);
+       }
+
+       @Override
+       public LongValueArray deserialize(LongValueArray reuse, DataInputView 
source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               LongValueArray.copyInternal(source, target);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof LongValueArraySerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
new file mode 100644
index 0000000..bf247a2
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An array of {@link NullValue}.
+ */
+public class NullValueArray
+implements ValueArray<NullValue> {
+
+       // the number of elements currently stored
+       private int position;
+
+       // location of the bookmark used by mark() and reset()
+       private transient int mark;
+
+       // hash result stored as normalized key
+       private IntValue hashValue = new IntValue();
+
+       /**
+        * Initializes an expandable array with default capacity.
+        */
+       public NullValueArray() {
+       }
+
+       /**
+        * Initializes a fixed-size array with the provided number of bytes.
+        *
+        * @param bytes number of bytes of the encapsulated array
+        */
+       public NullValueArray(int bytes) {
+               this();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("[");
+               for (int idx = 0 ; idx < this.position ; idx++) {
+                       sb.append("∅");
+                       if (idx < position - 1) {
+                               sb.append(",");
+                       }
+               }
+               sb.append("]");
+
+               return sb.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Iterable
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ReadIterator iterator = new ReadIterator();
+
+       @Override
+       public Iterator<NullValue> iterator() {
+               iterator.reset();
+               return iterator;
+       }
+
+       private class ReadIterator
+       implements Iterator<NullValue> {
+               private int pos;
+
+               @Override
+               public boolean hasNext() {
+                       return pos < position;
+               }
+
+               @Override
+               public NullValue next() {
+                       pos++;
+                       return NullValue.getInstance();
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("remove");
+               }
+
+               public void reset() {
+                       pos = 0;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // IOReadableWritable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(position);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               position = in.readInt();
+               mark = 0;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // NormalizableKey
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getMaxNormalizedKeyLen() {
+               return hashValue.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public void copyNormalizedKey(MemorySegment target, int offset, int 
len) {
+               hashValue.setValue(position);
+               hashValue.copyNormalizedKey(target, offset, len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Comparable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int compareTo(ValueArray<NullValue> o) {
+               NullValueArray other = (NullValueArray) o;
+
+               return Integer.compare(position, other.position);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Key
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return position;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof NullValueArray) {
+                       NullValueArray other = (NullValueArray) obj;
+
+                       return position == other.position;
+               }
+
+               return false;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ResettableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void setValue(ValueArray<NullValue> value) {
+               value.copyTo(this);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // CopyableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getBinaryLength() {
+               return hashValue.getBinaryLength();
+       }
+
+       @Override
+       public void copyTo(ValueArray<NullValue> target) {
+               NullValueArray other = (NullValueArray) target;
+
+               other.position = position;
+       }
+
+       @Override
+       public ValueArray<NullValue> copy() {
+               ValueArray<NullValue> copy = new NullValueArray();
+
+               this.copyTo(copy);
+
+               return copy;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               target.write(source, getBinaryLength());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ValueArray
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int size() {
+               return position;
+       }
+
+       @Override
+       public boolean isFull() {
+               return position == Integer.MAX_VALUE;
+       }
+
+       @Override
+       public boolean add(NullValue value) {
+               if (position == Integer.MAX_VALUE) {
+                       return false;
+               }
+
+               position++;
+
+               return true;
+       }
+
+       @Override
+       public boolean addAll(ValueArray<NullValue> other) {
+               NullValueArray source = (NullValueArray) other;
+
+               long newPosition = position + (long) source.position;
+
+               if (newPosition > Integer.MAX_VALUE) {
+                       return false;
+               }
+
+               position = (int) newPosition;
+
+               return true;
+       }
+
+       @Override
+       public void clear() {
+               position = 0;
+       }
+
+       @Override
+       public void mark() {
+               mark = position;
+       }
+
+       @Override
+       public void reset() {
+               position = mark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
new file mode 100644
index 0000000..2228d6e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+/**
+ * Specialized comparator for NullValueArray based on CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class NullValueArrayComparator extends TypeComparator<NullValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final boolean ascendingComparison;
+
+       private final NullValueArray reference = new NullValueArray();
+
+       private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
+
+       public NullValueArrayComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+       }
+
+       @Override
+       public int hash(NullValueArray record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(NullValueArray toCompare) {
+               toCompare.copyTo(reference);
+       }
+
+       @Override
+       public boolean equalToReference(NullValueArray candidate) {
+               return candidate.equals(this.reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<NullValueArray> 
referencedComparator) {
+               int comp = ((NullValueArrayComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(NullValueArray first, NullValueArray second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int firstCount = firstSource.readInt();
+               int secondCount = secondSource.readInt();
+
+               int cmp = Integer.compare(firstCount, secondCount);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return 
NormalizableKey.class.isAssignableFrom(NullValueArray.class);
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return reference.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(NullValueArray record, MemorySegment 
target, int offset, int numBytes) {
+               record.copyNormalizedKey(target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public TypeComparator<NullValueArray> duplicate() {
+               return new NullValueArrayComparator(ascendingComparison);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return comparators;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // key normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               // see ComparatorTestBase#testNormalizedKeyReadWriter fixes in 
FLINK-4705
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(NullValueArray record, 
DataOutputView target) throws IOException {
+               record.write(target);
+       }
+
+       @Override
+       public NullValueArray readWithKeyDenormalization(NullValueArray reuse, 
DataInputView source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
new file mode 100644
index 0000000..233ed20
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code NullValueArray}.
+ */
+public final class NullValueArraySerializer extends 
TypeSerializerSingleton<NullValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public NullValueArray createInstance() {
+               return new NullValueArray();
+       }
+
+       @Override
+       public NullValueArray copy(NullValueArray from) {
+               return copy(from, new NullValueArray());
+       }
+
+       @Override
+       public NullValueArray copy(NullValueArray from, NullValueArray reuse) {
+               reuse.setValue(from);
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return 4;
+       }
+
+       @Override
+       public void serialize(NullValueArray record, DataOutputView target) 
throws IOException {
+               record.write(target);
+       }
+
+       @Override
+       public NullValueArray deserialize(DataInputView source) throws 
IOException {
+               return deserialize(new NullValueArray(), source);
+       }
+
+       @Override
+       public NullValueArray deserialize(NullValueArray reuse, DataInputView 
source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               target.write(source, getLength());
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof NullValueArraySerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
new file mode 100644
index 0000000..4699552
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * An array of {@link StringValue}.
+ * <p>
+ * Strings are serialized to a byte array. Concatenating arrays is as simple
+ * and fast as extending and copying byte arrays. Strings are serialized when
+ * individually added to {@code StringValueArray}.
+ * <p>
+ * For each string added to the array the length is first serialized using a
+ * variable length integer. Then the string characters are serialized using a
+ * variable length encoding where the lower 128 ASCII/UFT-8 characters are
+ * encoded in a single byte. This ensures that common characters are serialized
+ * in only two bytes.
+ */
+public class StringValueArray
+implements ValueArray<StringValue> {
+
+       protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096;
+
+       // see note in ArrayList, HashTable, ...
+       private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+       protected static final int HIGH_BIT = 0x1 << 7;
+
+       private boolean isBounded;
+
+       // the initial length of a bounded array, which is allowed to expand to
+       // store one additional element beyond this initial length
+       private int boundedLength;
+
+       private byte[] data;
+
+       // number of StringValue elements currently stored
+       private int length;
+
+       // the number of bytes currently stored
+       private int position;
+
+       // state for the bookmark used by mark() and reset()
+       private transient int markLength;
+
+       private transient int markPosition;
+
+       // hasher used to generate the normalized key
+       private Murmur3_32 hash = new Murmur3_32(0x19264330);
+
+       // hash result stored as normalized key
+       private IntValue hashValue = new IntValue();
+
+       /**
+        * Initializes an expandable array with default capacity.
+        */
+       public StringValueArray() {
+               isBounded = false;
+               initialize(DEFAULT_CAPACITY_IN_BYTES);
+       }
+
+       /**
+        * Initializes a fixed-size array with the provided number of bytes.
+        *
+        * @param bytes number of bytes of the encapsulated array
+        */
+       public StringValueArray(int bytes) {
+               isBounded = true;
+               boundedLength = bytes;
+               initialize(bytes);
+       }
+
+       /**
+        * Initializes the array with the provided number of bytes.
+        *
+        * @param bytes initial size of the encapsulated array in bytes
+        */
+       private void initialize(int bytes) {
+               Preconditions.checkArgument(bytes > 0, "Requested array with 
zero capacity");
+               Preconditions.checkArgument(bytes <= MAX_ARRAY_SIZE, "Requested 
capacity exceeds limit of " + MAX_ARRAY_SIZE);
+
+               data = new byte[bytes];
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * If the size of the array is insufficient to hold the given capacity 
then
+        * copy the array into a new, larger array.
+        *
+        * @param minCapacity minimum required number of elements
+        */
+       private void ensureCapacity(int minCapacity) {
+               long currentCapacity = data.length;
+
+               if (minCapacity <= currentCapacity) {
+                       return;
+               }
+
+               // increase capacity by at least ~50%
+               long expandedCapacity = Math.max(minCapacity, currentCapacity + 
(currentCapacity >> 1));
+               int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, 
expandedCapacity);
+
+               if (newCapacity < minCapacity) {
+                       // throw exception as unbounded arrays are not expected 
to fill
+                       throw new RuntimeException("Requested array size " + 
minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE);
+               }
+
+               data = Arrays.copyOf(data, newCapacity);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder("[");
+               String separator = "";
+
+               for (StringValue sv : this) {
+                       sb
+                               .append(sv.getValue())
+                               .append(separator);
+                       separator = ",";
+               }
+
+               sb.append("]");
+
+               return sb.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Iterable
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ReadIterator iterator = new ReadIterator();
+
+       @Override
+       public Iterator<StringValue> iterator() {
+               iterator.reset();
+               return iterator;
+       }
+
+       private class ReadIterator
+       implements Iterator<StringValue> {
+               private static final int DEFAULT_SIZE = 64;
+
+               private StringValue value = new 
StringValue(CharBuffer.allocate(DEFAULT_SIZE));
+
+               private int size = DEFAULT_SIZE;
+
+               private int pos;
+
+               @Override
+               public boolean hasNext() {
+                       return pos < position;
+               }
+
+               @Override
+               public StringValue next() {
+                       // read length
+                       int len = data[pos++] & 0xFF;
+
+                       if (len >= HIGH_BIT) {
+                               int shift = 7;
+                               int curr;
+                               len = len & 0x7F;
+                               while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) 
{
+                                       len |= (curr & 0x7F) << shift;
+                                       shift += 7;
+                               }
+                               len |= curr << shift;
+                       }
+
+                       // ensure capacity
+                       if (len > size) {
+                               while (size < len) {
+                                       size *= 2;
+                               }
+
+                               value = new 
StringValue(CharBuffer.allocate(size));
+                       }
+
+                       // read string characters
+                       final char[] valueData = value.getCharArray();
+
+                       for (int i = 0; i < len; i++) {
+                               int c = data[pos++] & 0xFF;
+                               if (c >= HIGH_BIT) {
+                                       int shift = 7;
+                                       int curr;
+                                       c = c & 0x7F;
+                                       while ((curr = data[pos++] & 0xFF) >= 
HIGH_BIT) {
+                                               c |= (curr & 0x7F) << shift;
+                                               shift += 7;
+                                       }
+                                       c |= curr << shift;
+                               }
+                               valueData[i] = (char) c;
+                       }
+
+                       return value;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("remove");
+               }
+
+               public void reset() {
+                       pos = 0;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // IOReadableWritable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(length);
+               out.writeInt(position);
+
+               out.write(data, 0, position);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               length = in.readInt();
+               position = in.readInt();
+
+               markLength = 0;
+               markPosition = 0;
+
+               ensureCapacity(position);
+
+               in.read(data, 0, position);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // NormalizableKey
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getMaxNormalizedKeyLen() {
+               return hashValue.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public void copyNormalizedKey(MemorySegment target, int offset, int 
len) {
+               hash.reset();
+
+               hash.hash(position);
+               for (int i = 0 ; i < position ; i++) {
+                       hash.hash(data[i]);
+               }
+
+               hashValue.setValue(hash.hash());
+               hashValue.copyNormalizedKey(target, offset, len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Comparable
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int compareTo(ValueArray<StringValue> o) {
+               StringValueArray other = (StringValueArray) o;
+
+               // sorts first on number of data in the array, then comparison 
between
+               // the first non-equal element in the arrays
+               int cmp = Integer.compare(position, other.position);
+
+               if (cmp != 0) {
+                       return cmp;
+               }
+
+               for (int i = 0 ; i < position ; i++) {
+                       cmp = Byte.compare(data[i], other.data[i]);
+
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+
+               return 0;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Key
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               int hash = 1;
+
+               for (int i = 0 ; i < position ; i++) {
+                       hash = 31 * hash + data[i];
+               }
+
+               return hash;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof StringValueArray) {
+                       StringValueArray other = (StringValueArray) obj;
+
+                       if (length != other.length) {
+                               return false;
+                       }
+
+                       if (position != other.position) {
+                               return false;
+                       }
+
+                       for (int i = 0 ; i < position ; i++) {
+                               if (data[i] != other.data[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               }
+
+               return false;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ResettableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void setValue(ValueArray<StringValue> value) {
+               value.copyTo(this);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // CopyableValue
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int getBinaryLength() {
+               return -1;
+       }
+
+       @Override
+       public void copyTo(ValueArray<StringValue> target) {
+               StringValueArray other = (StringValueArray) target;
+
+               other.length = length;
+               other.position = position;
+               other.markLength = markLength;
+               other.markPosition = markPosition;
+
+               other.ensureCapacity(position);
+               System.arraycopy(data, 0, other.data, 0, position);
+       }
+
+       @Override
+       public ValueArray<StringValue> copy() {
+               ValueArray<StringValue> copy = new StringValueArray();
+
+               this.copyTo(copy);
+
+               return copy;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               copyInternal(source, target);
+       }
+
+       protected static void copyInternal(DataInputView source, DataOutputView 
target) throws IOException {
+               int length = source.readInt();
+               target.writeInt(length);
+
+               int position = source.readInt();
+               target.writeInt(position);
+
+               target.write(source, position);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // ValueArray
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int size() {
+               return length;
+       }
+
+       @Override
+       public boolean isFull() {
+               if (isBounded) {
+                       return position >= boundedLength;
+               } else {
+                       return position == MAX_ARRAY_SIZE;
+               }
+       }
+
+       @Override
+       public boolean add(StringValue value) {
+               if (isBounded && position >= boundedLength) {
+                       return false;
+               }
+
+               // up to five bytes storing length
+               if (position + 5 > data.length) {
+                       ensureCapacity(position + 5);
+               }
+
+               // update local variable until serialization succeeds
+               int newPosition = position;
+
+               // write the length, variable-length encoded
+               int len = value.length();
+
+               while (len >= HIGH_BIT) {
+                       data[newPosition++] = (byte) (len | HIGH_BIT);
+                       len >>>= 7;
+               }
+               data[newPosition++] = (byte) len;
+
+               // write the char data, variable-length encoded
+               final char[] valueData = value.getCharArray();
+               int remainingCapacity = data.length - newPosition;
+
+               len = value.length();
+               for (int i = 0; i < len; i++) {
+                       // up to three bytes storing length
+                       if (remainingCapacity < 3) {
+                               ensureCapacity(remainingCapacity + 3);
+                               remainingCapacity = data.length - newPosition;
+                       }
+
+                       int c = valueData[i];
+
+                       while (c >= HIGH_BIT) {
+                               data[newPosition++] = (byte) (c | HIGH_BIT);
+                               remainingCapacity--;
+                               c >>>= 7;
+                       }
+                       data[newPosition++] = (byte) c;
+                       remainingCapacity--;
+               }
+
+               length++;
+               position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public boolean addAll(ValueArray<StringValue> other) {
+               StringValueArray source = (StringValueArray) other;
+
+               int sourceSize = source.position;
+               int newPosition = position + sourceSize;
+
+               if (newPosition > data.length) {
+                       if (isBounded) {
+                               return false;
+                       } else {
+                               ensureCapacity(newPosition);
+                       }
+               }
+
+               System.arraycopy(source.data, 0, data, position, sourceSize);
+               length += source.length;
+           position = newPosition;
+
+               return true;
+       }
+
+       @Override
+       public void clear() {
+               length = 0;
+               position = 0;
+       }
+
+       @Override
+       public void mark() {
+               markLength = length;
+               markPosition = position;
+       }
+
+       @Override
+       public void reset() {
+               length = markLength;
+               position = markPosition;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
new file mode 100644
index 0000000..df88a8e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NormalizableKey;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.graph.types.valuearray.StringValueArray.HIGH_BIT;
+
+/**
+ * Specialized comparator for StringValueArray based on 
CopyableValueComparator.
+ *
+ * This can be used for grouping keys but not for sorting keys.
+ */
+@Internal
+public class StringValueArrayComparator extends 
TypeComparator<StringValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final boolean ascendingComparison;
+
+       private final StringValueArray reference = new StringValueArray();
+
+       private final TypeComparator<?>[] comparators = new TypeComparator[] 
{this};
+
+       public StringValueArrayComparator(boolean ascending) {
+               this.ascendingComparison = ascending;
+       }
+
+       @Override
+       public int hash(StringValueArray record) {
+               return record.hashCode();
+       }
+
+       @Override
+       public void setReference(StringValueArray toCompare) {
+               toCompare.copyTo(reference);
+       }
+
+       @Override
+       public boolean equalToReference(StringValueArray candidate) {
+               return candidate.equals(this.reference);
+       }
+
+       @Override
+       public int compareToReference(TypeComparator<StringValueArray> 
referencedComparator) {
+               int comp = ((StringValueArrayComparator) 
referencedComparator).reference.compareTo(reference);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public int compare(StringValueArray first, StringValueArray second) {
+               int comp = first.compareTo(second);
+               return ascendingComparison ? comp : -comp;
+       }
+
+       /**
+        * Read the length of the next serialized {@code StringValue}.
+        *
+        * @param source the input view containing the record
+        * @return the length of the next serialized {@code StringValue}
+        * @throws IOException if the input view raised an exception when 
reading the length
+        */
+       private static int readStringLength(DataInputView source) throws 
IOException {
+               int len = source.readByte() & 0xFF;
+
+               if (len >= HIGH_BIT) {
+                       int shift = 7;
+                       int curr;
+                       len = len & 0x7F;
+                       while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+                               len |= (curr & 0x7F) << shift;
+                               shift += 7;
+                       }
+                       len |= curr << shift;
+               }
+
+               return len;
+       }
+
+       /**
+        * Read the next character from the serialized {@code StringValue}.
+        *
+        * @param source the input view containing the record
+        * @return the next {@code char} of the current serialized {@code 
StringValue}
+        * @throws IOException if the input view raised an exception when 
reading the length
+        */
+       private static char readStringChar(DataInputView source) throws 
IOException {
+               int c = source.readByte() & 0xFF;
+
+               if (c >= HIGH_BIT) {
+                       int shift = 7;
+                       int curr;
+                       c = c & 0x7F;
+                       while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) {
+                               c |= (curr & 0x7F) << shift;
+                               shift += 7;
+                       }
+                       c |= curr << shift;
+               }
+
+               return (char) c;
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               int firstCount = firstSource.readInt();
+               int secondCount = secondSource.readInt();
+
+               int minCount = Math.min(firstCount, secondCount);
+               while (minCount-- > 0) {
+                       int firstLength = readStringLength(firstSource);
+                       int secondLength = readStringLength(secondSource);
+
+                       int minLength = Math.min(firstLength, secondLength);
+                       while (minLength-- > 0) {
+                               char firstChar = readStringChar(firstSource);
+                               char secondChar = readStringChar(secondSource);
+
+                               int cmp = Character.compare(firstChar, 
secondChar);
+                               if (cmp != 0) {
+                                       return ascendingComparison ? cmp : -cmp;
+                               }
+                       }
+
+                       int cmp = Integer.compare(firstLength, secondLength);
+                       if (cmp != 0) {
+                               return ascendingComparison ? cmp : -cmp;
+                       }
+               }
+
+               int cmp = Integer.compare(firstCount, secondCount);
+               return ascendingComparison ? cmp : -cmp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return 
NormalizableKey.class.isAssignableFrom(StringValueArray.class);
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return reference.getMaxNormalizedKeyLen();
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return keyBytes < getNormalizeKeyLen();
+       }
+
+       @Override
+       public void putNormalizedKey(StringValueArray record, MemorySegment 
target, int offset, int numBytes) {
+               record.copyNormalizedKey(target, offset, numBytes);
+       }
+
+       @Override
+       public boolean invertNormalizedKey() {
+               return !ascendingComparison;
+       }
+
+       @Override
+       public TypeComparator<StringValueArray> duplicate() {
+               return new StringValueArrayComparator(ascendingComparison);
+       }
+
+       @Override
+       public int extractKeys(Object record, Object[] target, int index) {
+               target[index] = record;
+               return 1;
+       }
+
+       @Override
+       public TypeComparator<?>[] getFlatComparators() {
+               return comparators;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // unsupported normalization
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public void writeWithKeyNormalization(StringValueArray record, 
DataOutputView target) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public StringValueArray readWithKeyDenormalization(StringValueArray 
reuse, DataInputView source) throws IOException {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
new file mode 100644
index 0000000..0e875e3
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Specialized serializer for {@code StringValueArray}.
+ */
+public final class StringValueArraySerializer extends 
TypeSerializerSingleton<StringValueArray> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public StringValueArray createInstance() {
+               return new StringValueArray();
+       }
+
+       @Override
+       public StringValueArray copy(StringValueArray from) {
+               return copy(from, new StringValueArray());
+       }
+
+       @Override
+       public StringValueArray copy(StringValueArray from, StringValueArray 
reuse) {
+               reuse.setValue(from);
+               return reuse;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(StringValueArray record, DataOutputView target) 
throws IOException {
+               record.write(target);
+       }
+
+       @Override
+       public StringValueArray deserialize(DataInputView source) throws 
IOException {
+               return deserialize(new StringValueArray(), source);
+       }
+
+       @Override
+       public StringValueArray deserialize(StringValueArray reuse, 
DataInputView source) throws IOException {
+               reuse.read(source);
+               return reuse;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               StringValueArray.copyInternal(source, target);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof StringValueArraySerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
new file mode 100644
index 0000000..6e34b71
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.NormalizableKey;
+import org.apache.flink.types.ResettableValue;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface for array types which reuse objects during serialization.
+ *
+ * Value arrays are usable as grouping keys but not sorting keys.
+ *
+ * @param <T> the {@link Value} type
+ */
+@TypeInfo(ValueArrayTypeInfoFactory.class)
+public interface ValueArray<T>
+extends Iterable<T>, IOReadableWritable, Serializable, 
NormalizableKey<ValueArray<T>>, ResettableValue<ValueArray<T>>, 
CopyableValue<ValueArray<T>> {
+
+       /**
+        * Returns the number of elements stored in the array.
+        *
+        * @return the number of elements stored in the array
+        */
+       int size();
+
+       /**
+        * An bounded array fills when the allocated capacity has been fully 
used.
+        * An unbounded array will only fill when the underlying data structure 
has
+        * reached capacity, for example the ~2^31 element limit for Java 
arrays.
+        *
+        * @return whether the array is full
+        */
+       boolean isFull();
+
+       /**
+        * Appends the value to this array if and only if the array capacity 
would
+        * not be exceeded.
+        *
+        * @param value the value to add to this array
+        * @return whether the value was added to the array
+        */
+       boolean add(T value);
+
+       /**
+        * Appends all of the values in the specified array to the end of this
+        * array. If the combined array would exceed capacity then no values are
+        * appended.
+        *
+        * @param source array containing values to be added to this array
+        * @return whether the values were added to the array
+        */
+       boolean addAll(ValueArray<T> source);
+
+       /**
+        * Saves the array index, which can be restored by calling {@code 
reset()}.
+        *
+        * This is not serialized and is not part of the contract for
+        * {@link #equals(Object)}.
+        */
+       void mark();
+
+       /**
+        * Restores the array index to when {@code mark()} was last called.
+        */
+       void reset();
+
+       /**
+        * Resets the array to the empty state. The implementation is *not*
+        * expected to release the underlying data structure. This allows the 
array
+        * to be reused with minimal impact on the garbage collector.
+        *
+        * This may reset the {@link #mark()} in order to allow arrays be 
shrunk.
+        */
+       void clear();
+}

Reply via email to