http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
new file mode 100644
index 0000000..b7b6282
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+/**
+ * A factory generator for {@link ValueArray} types is necessary because the
+ * contained {@link Value} types do not currently implement a common interface
+ * for creating a {@link ValueArray}. Algorithms must instantiate classes at
+ * runtime when the type information has been erased.
+ *
+ * This mirrors creating {@link Value} using {@link CopyableValue#copy()}.
+ */
+public class ValueArrayFactory {
+
+       /**
+        * Produce a {@code ValueArray} for the given {@code Value} type.
+        *
+        * @param cls {@code Value} class
+        * @return {@code ValueArray} for given {@code Value} class
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> ValueArray<T> createValueArray(Class<? extends Value> 
cls) {
+               if (IntValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new IntValueArray();
+               } else if (LongValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new LongValueArray();
+               } else if (NullValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new NullValueArray();
+               } else if (StringValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new StringValueArray();
+               } else {
+                       throw new IllegalArgumentException("Unable to create 
unbounded ValueArray for type " + cls);
+               }
+       }
+
+       /**
+        * Produce a {@code ValueArray} for the given {@code Value} type with 
the
+        * given bounded size.
+        *
+        * @param cls {@code Value} class
+        * @param bytes limit the array to the given number of bytes
+        * @return {@code ValueArray} for given {@code Value} class
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> ValueArray<T> createValueArray(Class<? extends Value> 
cls, int bytes) {
+               if (IntValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new IntValueArray(bytes);
+               } else if (LongValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new LongValueArray(bytes);
+               } else if (NullValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new NullValueArray(bytes);
+               } else if (StringValue.class.isAssignableFrom(cls)) {
+                       return (ValueArray<T>) new StringValueArray(bytes);
+               } else {
+                       throw new IllegalArgumentException("Unable to create 
bounded ValueArray for type " + cls);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
new file mode 100644
index 0000000..ee9b770
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TypeInformation} for the {@link ValueArray} type.
+ *
+ * @param <T> the {@link Value} type
+ */
+public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> 
implements AtomicType<ValueArray<T>> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final ValueArrayTypeInfo<IntValue> 
INT_VALUE_ARRAY_TYPE_INFO = new 
ValueArrayTypeInfo<>(ValueTypeInfo.INT_VALUE_TYPE_INFO);
+       public static final ValueArrayTypeInfo<LongValue> 
LONG_VALUE_ARRAY_TYPE_INFO = new 
ValueArrayTypeInfo<>(ValueTypeInfo.LONG_VALUE_TYPE_INFO);
+       public static final ValueArrayTypeInfo<NullValue> 
NULL_VALUE_ARRAY_TYPE_INFO = new 
ValueArrayTypeInfo<>(ValueTypeInfo.NULL_VALUE_TYPE_INFO);
+       public static final ValueArrayTypeInfo<StringValue> 
STRING_VALUE_ARRAY_TYPE_INFO = new 
ValueArrayTypeInfo<>(ValueTypeInfo.STRING_VALUE_TYPE_INFO);
+
+       private final TypeInformation<T> valueType;
+
+       private final Class<T> type;
+
+       public ValueArrayTypeInfo(TypeInformation<T> valueType) {
+               this.valueType = valueType;
+               this.type = valueType.getTypeClass();
+       }
+
+       @Override
+       public int getArity() {
+               return 1;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public Class<ValueArray<T>> getTypeClass() {
+               return (Class<ValueArray<T>>) (Class<?>) ValueArray.class;
+       }
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return Comparable.class.isAssignableFrom(type);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig 
executionConfig) {
+               if (IntValue.class.isAssignableFrom(type)) {
+                       return (TypeSerializer<ValueArray<T>>) 
(TypeSerializer<?>) new IntValueArraySerializer();
+               } else if (LongValue.class.isAssignableFrom(type)) {
+                       return (TypeSerializer<ValueArray<T>>) 
(TypeSerializer<?>) new LongValueArraySerializer();
+               } else if (NullValue.class.isAssignableFrom(type)) {
+                       return (TypeSerializer<ValueArray<T>>) 
(TypeSerializer<?>) new NullValueArraySerializer();
+               } else if (StringValue.class.isAssignableFrom(type)) {
+                       return (TypeSerializer<ValueArray<T>>) 
(TypeSerializer<?>) new StringValueArraySerializer();
+               } else {
+                       throw new InvalidTypesException("No ValueArray class 
exists for " + type);
+               }
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       @Override
+       public TypeComparator<ValueArray<T>> createComparator(boolean 
sortOrderAscending, ExecutionConfig executionConfig) {
+               if (IntValue.class.isAssignableFrom(type)) {
+                       return (TypeComparator<ValueArray<T>>) 
(TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending);
+               } else if (LongValue.class.isAssignableFrom(type)) {
+                       return (TypeComparator<ValueArray<T>>) 
(TypeComparator<?>) new LongValueArrayComparator(sortOrderAscending);
+               } else if (NullValue.class.isAssignableFrom(type)) {
+                       return (TypeComparator<ValueArray<T>>) 
(TypeComparator<?>) new NullValueArrayComparator(sortOrderAscending);
+               } else if (StringValue.class.isAssignableFrom(type)) {
+                       return (TypeComparator<ValueArray<T>>) 
(TypeComparator<?>) new StringValueArrayComparator(sortOrderAscending);
+               } else {
+                       throw new InvalidTypesException("No ValueArray class 
exists for " + type);
+               }
+       }
+
+       @Override
+       public Map<String, TypeInformation<?>> getGenericParameters() {
+               Map<String, TypeInformation<?>> m = new HashMap<>(1);
+               m.put("T", valueType);
+               return m;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return type.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ValueArrayTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       ValueArrayTypeInfo<T> valueArrayTypeInfo = 
(ValueArrayTypeInfo<T>) obj;
+
+                       return valueArrayTypeInfo.canEqual(this) &&
+                               type == valueArrayTypeInfo.type;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof ValueArrayTypeInfo;
+       }
+
+       @Override
+       public String toString() {
+               return "ValueArrayType<" + type.getSimpleName() + ">";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
new file mode 100644
index 0000000..2145c3d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Value;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * Used by {@link TypeExtractor} to create a {@link TypeInformation} for
+ * implementations of {@link ValueArray}.
+ *
+ * @param <T> the {@link Value} type
+ */
+public class ValueArrayTypeInfoFactory<T> extends 
TypeInfoFactory<ValueArray<T>> {
+
+       @Override
+       public TypeInformation<ValueArray<T>> createTypeInfo(Type t, 
Map<String, TypeInformation<?>> genericParameters) {
+               return new ValueArrayTypeInfo(genericParameters.get("T"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java
new file mode 100644
index 0000000..bbc6846
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+public class IntValueArrayComparatorTest extends 
ComparatorTestBase<IntValueArray> {
+
+       @Override
+       protected TypeComparator<IntValueArray> createComparator(boolean 
ascending) {
+               return new IntValueArrayComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<IntValueArray> createSerializer() {
+               return new IntValueArraySerializer();
+       }
+
+       @Override
+       protected IntValueArray[] getSortedTestData() {
+               IntValueArray iva0 = new IntValueArray();
+
+               IntValueArray iva1 = new IntValueArray();
+               iva1.add(new IntValue(5));
+
+               IntValueArray iva2 = new IntValueArray();
+               iva2.add(new IntValue(5));
+               iva2.add(new IntValue(10));
+
+               return new IntValueArray[]{ iva0, iva1, iva2 };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java
new file mode 100644
index 0000000..1ee24c9
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link IntValueArraySerializer}.
+ */
+public class IntValueArraySerializerTest extends 
SerializerTestBase<IntValueArray> {
+
+       @Override
+       protected TypeSerializer<IntValueArray> createSerializer() {
+               return new IntValueArraySerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<IntValueArray> getTypeClass() {
+               return IntValueArray.class;
+       }
+
+       @Override
+       protected IntValueArray[] getTestData() {
+               int defaultElements = IntValueArray.DEFAULT_CAPACITY_IN_BYTES / 
IntValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+               Random rnd = new Random(874597969123412341L);
+               int rndInt = rnd.nextInt();
+
+               IntValueArray iva0 = new IntValueArray();
+
+               IntValueArray iva1 = new IntValueArray();
+               iva1.addAll(iva0);
+               iva1.add(new IntValue(0));
+
+               IntValueArray iva2 = new IntValueArray();
+               iva2.addAll(iva1);
+               iva2.add(new IntValue(1));
+
+               IntValueArray iva3 = new IntValueArray();
+               iva3.addAll(iva2);
+               iva3.add(new IntValue(-1));
+
+               IntValueArray iva4 = new IntValueArray();
+               iva4.addAll(iva3);
+               iva4.add(new IntValue(Integer.MAX_VALUE));
+
+               IntValueArray iva5 = new IntValueArray();
+               iva5.addAll(iva4);
+               iva5.add(new IntValue(Integer.MIN_VALUE));
+
+               IntValueArray iva6 = new IntValueArray();
+               iva6.addAll(iva5);
+               iva6.add(new IntValue(rndInt));
+
+               IntValueArray iva7 = new IntValueArray();
+               iva7.addAll(iva6);
+               iva7.add(new IntValue(-rndInt));
+
+               IntValueArray iva8 = new IntValueArray();
+               iva8.addAll(iva7);
+               for (int i = 0 ; i < 1.5 * defaultElements ; i++) {
+                       iva8.add(new IntValue(i));
+               }
+               iva8.addAll(iva8);
+
+               return new IntValueArray[] {iva0, iva1, iva2, iva3, iva4, iva5, 
iva6, iva7, iva8};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java
new file mode 100644
index 0000000..2e1282a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.IntValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IntValueArrayTest {
+
+       @Test
+       public void testBoundedArray() {
+               int count = IntValueArray.DEFAULT_CAPACITY_IN_BYTES / 
IntValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+               ValueArray<IntValue> iva = new 
IntValueArray(IntValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+               // fill the array
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(iva.isFull());
+                       assertEquals(i, iva.size());
+
+                       assertTrue(iva.add(new IntValue(i)));
+
+                       assertEquals(i + 1, iva.size());
+               }
+
+               // array is now full
+               assertTrue(iva.isFull());
+               assertEquals(count, iva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (IntValue lv : iva) {
+                       assertEquals(idx++, lv.getValue());
+               }
+
+               // add element past end of array
+               assertFalse(iva.add(new IntValue(count)));
+               assertFalse(iva.addAll(iva));
+
+               // test copy
+               assertEquals(iva, iva.copy());
+
+               // test copyTo
+               IntValueArray iva_to = new IntValueArray();
+               iva.copyTo(iva_to);
+               assertEquals(iva, iva_to);
+
+               // test clear
+               iva.clear();
+               assertEquals(0, iva.size());
+       }
+
+       @Test
+       public void testUnboundedArray() {
+               int count = 4096;
+
+               ValueArray<IntValue> iva = new IntValueArray();
+
+               // add several elements
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(iva.isFull());
+                       assertEquals(i, iva.size());
+
+                       assertTrue(iva.add(new IntValue(i)));
+
+                       assertEquals(i + 1, iva.size());
+               }
+
+               // array never fills
+               assertFalse(iva.isFull());
+               assertEquals(count, iva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (IntValue lv : iva) {
+                       assertEquals(idx++, lv.getValue());
+               }
+
+               // add element past end of array
+               assertTrue(iva.add(new IntValue(count)));
+               assertTrue(iva.addAll(iva));
+
+               // test copy
+               assertEquals(iva, iva.copy());
+
+               // test copyTo
+               IntValueArray iva_to = new IntValueArray();
+               iva.copyTo(iva_to);
+               assertEquals(iva, iva_to);
+
+               // test mark/reset
+               int size = iva.size();
+               iva.mark();
+               assertTrue(iva.add(new IntValue()));
+               assertEquals(size + 1, iva.size());
+               iva.reset();
+               assertEquals(size, iva.size());
+
+               // test clear
+               iva.clear();
+               assertEquals(0, iva.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
new file mode 100644
index 0000000..af9dbdb
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+public class LongValueArrayComparatorTest extends 
ComparatorTestBase<LongValueArray> {
+
+       @Override
+       protected TypeComparator<LongValueArray> createComparator(boolean 
ascending) {
+               return new LongValueArrayComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<LongValueArray> createSerializer() {
+               return new LongValueArraySerializer();
+       }
+
+       @Override
+       protected LongValueArray[] getSortedTestData() {
+               LongValueArray lva0 = new LongValueArray();
+
+               LongValueArray lva1 = new LongValueArray();
+               lva1.add(new LongValue(5));
+
+               LongValueArray lva2 = new LongValueArray();
+               lva2.add(new LongValue(50));
+               lva2.add(new LongValue(100));
+
+               return new LongValueArray[]{ lva0, lva1, lva2 };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java
new file mode 100644
index 0000000..1cd0a6c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link LongValueArraySerializer}.
+ */
+public class LongValueArraySerializerTest extends 
SerializerTestBase<LongValueArray> {
+
+       @Override
+       protected TypeSerializer<LongValueArray> createSerializer() {
+               return new LongValueArraySerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<LongValueArray> getTypeClass() {
+               return LongValueArray.class;
+       }
+
+       @Override
+       protected LongValueArray[] getTestData() {
+               int defaultElements = LongValueArray.DEFAULT_CAPACITY_IN_BYTES 
/ LongValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+               Random rnd = new Random(874597969123412341L);
+               long rndLong = rnd.nextLong();
+
+               LongValueArray lva0 = new LongValueArray();
+
+               LongValueArray lva1 = new LongValueArray();
+               lva1.addAll(lva0);
+               lva1.add(new LongValue(0));
+
+               LongValueArray lva2 = new LongValueArray();
+               lva2.addAll(lva1);
+               lva2.add(new LongValue(1));
+
+               LongValueArray lva3 = new LongValueArray();
+               lva3.addAll(lva2);
+               lva3.add(new LongValue(-1));
+
+               LongValueArray lva4 = new LongValueArray();
+               lva4.addAll(lva3);
+               lva4.add(new LongValue(Long.MAX_VALUE));
+
+               LongValueArray lva5 = new LongValueArray();
+               lva5.addAll(lva4);
+               lva5.add(new LongValue(Long.MIN_VALUE));
+
+               LongValueArray lva6 = new LongValueArray();
+               lva6.addAll(lva5);
+               lva6.add(new LongValue(rndLong));
+
+               LongValueArray lva7 = new LongValueArray();
+               lva7.addAll(lva6);
+               lva7.add(new LongValue(-rndLong));
+
+               LongValueArray lva8 = new LongValueArray();
+               lva8.addAll(lva7);
+               for (int i = 0 ; i < 1.5 * defaultElements ; i++) {
+                       lva8.add(new LongValue(i));
+               }
+               lva8.addAll(lva8);
+
+               return new LongValueArray[] {lva0, lva1, lva2, lva3, lva4, 
lva5, lva6, lva7, lva8};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java
new file mode 100644
index 0000000..cfc345e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LongValueArrayTest {
+
+       @Test
+       public void testBoundedArray() {
+               int count = LongValueArray.DEFAULT_CAPACITY_IN_BYTES / 
LongValueArray.ELEMENT_LENGTH_IN_BYTES;
+
+               ValueArray<LongValue> lva = new 
LongValueArray(LongValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+               // fill the array
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(lva.isFull());
+                       assertEquals(i, lva.size());
+
+                       assertTrue(lva.add(new LongValue(i)));
+
+                       assertEquals(i + 1, lva.size());
+               }
+
+               // array is now full
+               assertTrue(lva.isFull());
+               assertEquals(count, lva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (LongValue lv : lva) {
+                       assertEquals(idx++, lv.getValue());
+               }
+
+               // add element past end of array
+               assertFalse(lva.add(new LongValue(count)));
+               assertFalse(lva.addAll(lva));
+
+               // test copy
+               assertEquals(lva, lva.copy());
+
+               // test copyTo
+               LongValueArray lva_to = new LongValueArray();
+               lva.copyTo(lva_to);
+               assertEquals(lva, lva_to);
+
+               // test clear
+               lva.clear();
+               assertEquals(0, lva.size());
+       }
+
+       @Test
+       public void testUnboundedArray() {
+               int count = 4096;
+
+               ValueArray<LongValue> lva = new LongValueArray();
+
+               // add several elements
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(lva.isFull());
+                       assertEquals(i, lva.size());
+
+                       assertTrue(lva.add(new LongValue(i)));
+
+                       assertEquals(i + 1, lva.size());
+               }
+
+               // array never fills
+               assertFalse(lva.isFull());
+               assertEquals(count, lva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (LongValue lv : lva) {
+                       assertEquals(idx++, lv.getValue());
+               }
+
+               // add element past end of array
+               assertTrue(lva.add(new LongValue(count)));
+               assertTrue(lva.addAll(lva));
+
+               // test copy
+               assertEquals(lva, lva.copy());
+
+               // test copyTo
+               LongValueArray lva_to = new LongValueArray();
+               lva.copyTo(lva_to);
+               assertEquals(lva, lva_to);
+
+               // test mark/reset
+               int size = lva.size();
+               lva.mark();
+               assertTrue(lva.add(new LongValue()));
+               assertEquals(size + 1, lva.size());
+               lva.reset();
+               assertEquals(size, lva.size());
+
+               // test clear
+               lva.clear();
+               assertEquals(0, lva.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java
new file mode 100644
index 0000000..10fa2e2
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.NullValue;
+
+public class NullValueArrayComparatorTest extends 
ComparatorTestBase<NullValueArray> {
+
+       @Override
+       protected TypeComparator<NullValueArray> createComparator(boolean 
ascending) {
+               return new NullValueArrayComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<NullValueArray> createSerializer() {
+               return new NullValueArraySerializer();
+       }
+
+       @Override
+       protected NullValueArray[] getSortedTestData() {
+               NullValueArray nva0 = new NullValueArray();
+
+               NullValueArray nva1 = new NullValueArray();
+               nva1.add(NullValue.getInstance());
+
+               NullValueArray nva2 = new NullValueArray();
+               nva2.add(NullValue.getInstance());
+               nva2.add(NullValue.getInstance());
+
+               return new NullValueArray[]{ nva0, nva1 };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java
new file mode 100644
index 0000000..2253a42
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A test for the {@link NullValueArraySerializer}.
+ */
+public class NullValueArraySerializerTest extends 
SerializerTestBase<NullValueArray> {
+
+       @Override
+       protected TypeSerializer<NullValueArray> createSerializer() {
+               return new NullValueArraySerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return 4;
+       }
+
+       @Override
+       protected Class<NullValueArray> getTypeClass() {
+               return NullValueArray.class;
+       }
+
+       @Override
+       protected NullValueArray[] getTestData() {
+               NullValue nv = NullValue.getInstance();
+
+               NullValueArray nva0 = new NullValueArray();
+
+               NullValueArray nva1 = new NullValueArray();
+               nva1.addAll(nva0);
+               nva1.add(nv);
+
+               NullValueArray nva2 = new NullValueArray();
+               nva2.addAll(nva1);
+               nva2.add(nv);
+
+               NullValueArray nva3 = new NullValueArray();
+               nva3.addAll(nva2);
+               for (int i = 0 ; i < 100 ; i++) {
+                       nva3.add(nv);
+               }
+               nva3.addAll(nva3);
+
+               return new NullValueArray[] {nva0, nva1, nva2, nva3};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java
new file mode 100644
index 0000000..6d013a1
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NullValueArrayTest {
+
+       @Test
+       public void testUnboundedArray() {
+               int count = 4096;
+
+               ValueArray<NullValue> nva = new NullValueArray();
+
+               // add several elements
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(nva.isFull());
+                       assertEquals(i, nva.size());
+
+                       assertTrue(nva.add(NullValue.getInstance()));
+
+                       assertEquals(i + 1, nva.size());
+               }
+
+               // array never fills
+               assertFalse(nva.isFull());
+               assertEquals(count, nva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (NullValue nv : nva) {
+                       assertEquals(NullValue.getInstance(), nv);
+               }
+
+               // add element past end of array
+               assertTrue(nva.add(NullValue.getInstance()));
+               assertTrue(nva.addAll(nva));
+
+               // test copy
+               assertEquals(nva, nva.copy());
+
+               // test copyTo
+               NullValueArray nva_to = new NullValueArray();
+               nva.copyTo(nva_to);
+               assertEquals(nva, nva_to);
+
+               // test mark/reset
+               int size = nva.size();
+               nva.mark();
+               assertTrue(nva.add(NullValue.getInstance()));
+               assertEquals(size + 1, nva.size());
+               nva.reset();
+               assertEquals(size, nva.size());
+
+               // test clear
+               nva.clear();
+               assertEquals(0, nva.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java
new file mode 100644
index 0000000..e7cc102
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+public class StringValueArrayComparatorTest extends 
ComparatorTestBase<StringValueArray> {
+
+       @Override
+       protected TypeComparator<StringValueArray> createComparator(boolean 
ascending) {
+               return new StringValueArrayComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<StringValueArray> createSerializer() {
+               return new StringValueArraySerializer();
+       }
+
+       @Override
+       protected StringValueArray[] getSortedTestData() {
+               StringValueArray sva0 = new StringValueArray();
+
+               StringValueArray sva1 = new StringValueArray();
+               sva1.add(new StringValue("abc"));
+
+               StringValueArray sva2 = new StringValueArray();
+               sva2.add(new StringValue("qrs"));
+               sva2.add(new StringValue("xyz"));
+
+               return new StringValueArray[]{ sva0, sva1, sva2 };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java
new file mode 100644
index 0000000..f5909da
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link StringValueArraySerializer}.
+ */
+public class StringValueArraySerializerTest extends 
SerializerTestBase<StringValueArray> {
+
+       @Override
+       protected TypeSerializer<StringValueArray> createSerializer() {
+               return new StringValueArraySerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<StringValueArray> getTypeClass() {
+               return StringValueArray.class;
+       }
+
+       @Override
+       protected StringValueArray[] getTestData() {
+               int defaultElements = 
StringValueArray.DEFAULT_CAPACITY_IN_BYTES;
+
+               Random rnd = new Random(874597969123412341L);
+               long rndLong = rnd.nextLong();
+
+               StringValueArray sva0 = new StringValueArray();
+
+               StringValueArray sva1 = new StringValueArray();
+               sva1.addAll(sva0);
+               sva1.add(new StringValue(String.valueOf(0)));
+
+               StringValueArray sva2 = new StringValueArray();
+               sva2.addAll(sva1);
+               sva2.add(new StringValue(String.valueOf(1)));
+
+               StringValueArray sva3 = new StringValueArray();
+               sva3.addAll(sva2);
+               sva3.add(new StringValue(String.valueOf(-1)));
+
+               StringValueArray sva4 = new StringValueArray();
+               sva4.addAll(sva3);
+               sva4.add(new StringValue(String.valueOf(Long.MAX_VALUE)));
+
+               StringValueArray sva5 = new StringValueArray();
+               sva5.addAll(sva4);
+               sva5.add(new StringValue(String.valueOf(Long.MIN_VALUE)));
+
+               StringValueArray sva6 = new StringValueArray();
+               sva6.addAll(sva5);
+               sva6.add(new StringValue(String.valueOf(rndLong)));
+
+               StringValueArray sva7 = new StringValueArray();
+               sva7.addAll(sva6);
+               sva7.add(new StringValue(String.valueOf(-rndLong)));
+
+               StringValueArray sva8 = new StringValueArray();
+               sva8.addAll(sva7);
+               for (int i = 0 ; i < 1.5 * defaultElements ; i++) {
+                       sva8.add(new StringValue(String.valueOf(i)));
+               }
+               sva8.addAll(sva8);
+
+               return new StringValueArray[] {sva0, sva1, sva2, sva3, sva4, 
sva5, sva6, sva7, sva8};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java
new file mode 100644
index 0000000..a425e8e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StringValueArrayTest {
+
+       @Test
+       public void testBoundedArray() {
+               // one byte for length and one byte for character
+               int count = StringValueArray.DEFAULT_CAPACITY_IN_BYTES / 2;
+
+               ValueArray<StringValue> sva = new 
StringValueArray(StringValueArray.DEFAULT_CAPACITY_IN_BYTES);
+
+               // fill the array
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(sva.isFull());
+                       assertEquals(i, sva.size());
+
+                       assertTrue(sva.add(new 
StringValue(Character.toString((char)(i & 0x7F)))));
+
+                       assertEquals(i + 1, sva.size());
+               }
+
+               // array is now full
+               assertTrue(sva.isFull());
+               assertEquals(count, sva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (StringValue sv : sva) {
+                       assertEquals((idx++) & 0x7F, sv.getValue().charAt(0));
+               }
+
+               // add element past end of array
+               assertFalse(sva.add(new 
StringValue(String.valueOf((char)count))));
+               assertFalse(sva.addAll(sva));
+
+               // test copy
+               assertEquals(sva, sva.copy());
+
+               // test copyTo
+               StringValueArray sva_to = new StringValueArray();
+               sva.copyTo(sva_to);
+               assertEquals(sva, sva_to);
+
+               // test clear
+               sva.clear();
+               assertEquals(0, sva.size());
+       }
+
+       @Test
+       public void testBoundedArrayWithVariableLengthCharacters() {
+               // characters alternatingly take 1 and 2 bytes (plus one byte 
for length)
+               int count = 1280;
+
+               ValueArray<StringValue> sva = new StringValueArray(3200);
+
+               // fill the array
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(sva.isFull());
+                       assertEquals(i, sva.size());
+
+                       assertTrue(sva.add(new 
StringValue(Character.toString((char)(i & 0xFF)))));
+
+                       assertEquals(i + 1, sva.size());
+               }
+
+               // array is now full
+               assertTrue(sva.isFull());
+               assertEquals(count, sva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (StringValue sv : sva) {
+                       assertEquals((idx++) & 0xFF, sv.getValue().charAt(0));
+               }
+
+               // add element past end of array
+               assertFalse(sva.add(new 
StringValue(String.valueOf((char)count))));
+               assertFalse(sva.addAll(sva));
+
+               // test copy
+               assertEquals(sva, sva.copy());
+
+               // test copyTo
+               StringValueArray sva_to = new StringValueArray();
+               sva.copyTo(sva_to);
+               assertEquals(sva, sva_to);
+
+               // test clear
+               sva.clear();
+               assertEquals(0, sva.size());
+       }
+
+       @Test
+       public void testUnboundedArray() {
+               int count = 4096;
+
+               ValueArray<StringValue> sva = new StringValueArray();
+
+               // add several elements
+               for (int i = 0 ; i < count ; i++) {
+                       assertFalse(sva.isFull());
+                       assertEquals(i, sva.size());
+
+                       assertTrue(sva.add(new 
StringValue(String.valueOf((char)i))));
+
+                       assertEquals(i + 1, sva.size());
+               }
+
+               // array never fills
+               assertFalse(sva.isFull());
+               assertEquals(count, sva.size());
+
+               // verify the array values
+               int idx = 0;
+               for (StringValue sv : sva) {
+                       assertEquals(idx++, sv.getValue().charAt(0));
+               }
+
+               // add element past end of array
+               assertTrue(sva.add(new 
StringValue(String.valueOf((char)count))));
+               assertTrue(sva.addAll(sva));
+
+               // test copy
+               assertEquals(sva, sva.copy());
+
+               // test copyTo
+               StringValueArray sva_to = new StringValueArray();
+               sva.copyTo(sva_to);
+               assertEquals(sva, sva_to);
+
+               // test mark/reset
+               int size = sva.size();
+               sva.mark();
+               assertTrue(sva.add(new StringValue()));
+               assertEquals(size + 1, sva.size());
+               sva.reset();
+               assertEquals(size, sva.size());
+
+               // test clear
+               sva.clear();
+               assertEquals(0, sva.size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java
new file mode 100644
index 0000000..73cecc0
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
+import org.junit.Test;
+
+import static 
org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.INT_VALUE_ARRAY_TYPE_INFO;
+import static 
org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.LONG_VALUE_ARRAY_TYPE_INFO;
+import static 
org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.NULL_VALUE_ARRAY_TYPE_INFO;
+import static 
org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.STRING_VALUE_ARRAY_TYPE_INFO;
+import static org.junit.Assert.assertEquals;
+
+public class ValueArrayTypeInfoTest {
+
+       private ExecutionConfig config = new ExecutionConfig();
+
+       @Test
+       public void testIntValueArray() {
+               assertEquals(INT_VALUE_ARRAY_TYPE_INFO.getTypeClass(), 
ValueArray.class);
+               
assertEquals(INT_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), 
IntValueArraySerializer.class);
+               assertEquals(INT_VALUE_ARRAY_TYPE_INFO.createComparator(true, 
config).getClass(), IntValueArrayComparator.class);
+       }
+
+       @Test
+       public void testLongValueArray() {
+               assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.getTypeClass(), 
ValueArray.class);
+               
assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), 
LongValueArraySerializer.class);
+               assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.createComparator(true, 
config).getClass(), LongValueArrayComparator.class);
+       }
+
+       @Test
+       public void testNullValueArray() {
+               assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.getTypeClass(), 
ValueArray.class);
+               
assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), 
NullValueArraySerializer.class);
+               assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.createComparator(true, 
config).getClass(), NullValueArrayComparator.class);
+       }
+
+       @Test
+       public void testStringValueArray() {
+               assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.getTypeClass(), 
ValueArray.class);
+               
assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), 
StringValueArraySerializer.class);
+               
assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.createComparator(true, 
config).getClass(), StringValueArrayComparator.class);
+       }
+}

Reply via email to