http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java new file mode 100644 index 0000000..b7b6282 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; + +/** + * A factory generator for {@link ValueArray} types is necessary because the + * contained {@link Value} types do not currently implement a common interface + * for creating a {@link ValueArray}. Algorithms must instantiate classes at + * runtime when the type information has been erased. + * + * This mirrors creating {@link Value} using {@link CopyableValue#copy()}. + */ +public class ValueArrayFactory { + + /** + * Produce a {@code ValueArray} for the given {@code Value} type. + * + * @param cls {@code Value} class + * @return {@code ValueArray} for given {@code Value} class + */ + @SuppressWarnings("unchecked") + public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls) { + if (IntValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new IntValueArray(); + } else if (LongValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new LongValueArray(); + } else if (NullValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new NullValueArray(); + } else if (StringValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new StringValueArray(); + } else { + throw new IllegalArgumentException("Unable to create unbounded ValueArray for type " + cls); + } + } + + /** + * Produce a {@code ValueArray} for the given {@code Value} type with the + * given bounded size. + * + * @param cls {@code Value} class + * @param bytes limit the array to the given number of bytes + * @return {@code ValueArray} for given {@code Value} class + */ + @SuppressWarnings("unchecked") + public static <T> ValueArray<T> createValueArray(Class<? extends Value> cls, int bytes) { + if (IntValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new IntValueArray(bytes); + } else if (LongValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new LongValueArray(bytes); + } else if (NullValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new NullValueArray(bytes); + } else if (StringValue.class.isAssignableFrom(cls)) { + return (ValueArray<T>) new StringValueArray(bytes); + } else { + throw new IllegalArgumentException("Unable to create bounded ValueArray for type " + cls); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java new file mode 100644 index 0000000..ee9b770 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfo.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link TypeInformation} for the {@link ValueArray} type. + * + * @param <T> the {@link Value} type + */ +public class ValueArrayTypeInfo<T> extends TypeInformation<ValueArray<T>> implements AtomicType<ValueArray<T>> { + + private static final long serialVersionUID = 1L; + + public static final ValueArrayTypeInfo<IntValue> INT_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.INT_VALUE_TYPE_INFO); + public static final ValueArrayTypeInfo<LongValue> LONG_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.LONG_VALUE_TYPE_INFO); + public static final ValueArrayTypeInfo<NullValue> NULL_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.NULL_VALUE_TYPE_INFO); + public static final ValueArrayTypeInfo<StringValue> STRING_VALUE_ARRAY_TYPE_INFO = new ValueArrayTypeInfo<>(ValueTypeInfo.STRING_VALUE_TYPE_INFO); + + private final TypeInformation<T> valueType; + + private final Class<T> type; + + public ValueArrayTypeInfo(TypeInformation<T> valueType) { + this.valueType = valueType; + this.type = valueType.getTypeClass(); + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @SuppressWarnings("unchecked") + @Override + public Class<ValueArray<T>> getTypeClass() { + return (Class<ValueArray<T>>) (Class<?>) ValueArray.class; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public boolean isKeyType() { + return Comparable.class.isAssignableFrom(type); + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer<ValueArray<T>> createSerializer(ExecutionConfig executionConfig) { + if (IntValue.class.isAssignableFrom(type)) { + return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new IntValueArraySerializer(); + } else if (LongValue.class.isAssignableFrom(type)) { + return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new LongValueArraySerializer(); + } else if (NullValue.class.isAssignableFrom(type)) { + return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new NullValueArraySerializer(); + } else if (StringValue.class.isAssignableFrom(type)) { + return (TypeSerializer<ValueArray<T>>) (TypeSerializer<?>) new StringValueArraySerializer(); + } else { + throw new InvalidTypesException("No ValueArray class exists for " + type); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public TypeComparator<ValueArray<T>> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + if (IntValue.class.isAssignableFrom(type)) { + return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new IntValueArrayComparator(sortOrderAscending); + } else if (LongValue.class.isAssignableFrom(type)) { + return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new LongValueArrayComparator(sortOrderAscending); + } else if (NullValue.class.isAssignableFrom(type)) { + return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new NullValueArrayComparator(sortOrderAscending); + } else if (StringValue.class.isAssignableFrom(type)) { + return (TypeComparator<ValueArray<T>>) (TypeComparator<?>) new StringValueArrayComparator(sortOrderAscending); + } else { + throw new InvalidTypesException("No ValueArray class exists for " + type); + } + } + + @Override + public Map<String, TypeInformation<?>> getGenericParameters() { + Map<String, TypeInformation<?>> m = new HashMap<>(1); + m.put("T", valueType); + return m; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return type.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ValueArrayTypeInfo) { + @SuppressWarnings("unchecked") + ValueArrayTypeInfo<T> valueArrayTypeInfo = (ValueArrayTypeInfo<T>) obj; + + return valueArrayTypeInfo.canEqual(this) && + type == valueArrayTypeInfo.type; + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof ValueArrayTypeInfo; + } + + @Override + public String toString() { + return "ValueArrayType<" + type.getSimpleName() + ">"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java new file mode 100644 index 0000000..2145c3d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeinfo.TypeInfoFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.types.Value; + +import java.lang.reflect.Type; +import java.util.Map; + +/** + * Used by {@link TypeExtractor} to create a {@link TypeInformation} for + * implementations of {@link ValueArray}. + * + * @param <T> the {@link Value} type + */ +public class ValueArrayTypeInfoFactory<T> extends TypeInfoFactory<ValueArray<T>> { + + @Override + public TypeInformation<ValueArray<T>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) { + return new ValueArrayTypeInfo(genericParameters.get("T")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java new file mode 100644 index 0000000..bbc6846 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparatorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.IntValue; + +public class IntValueArrayComparatorTest extends ComparatorTestBase<IntValueArray> { + + @Override + protected TypeComparator<IntValueArray> createComparator(boolean ascending) { + return new IntValueArrayComparator(ascending); + } + + @Override + protected TypeSerializer<IntValueArray> createSerializer() { + return new IntValueArraySerializer(); + } + + @Override + protected IntValueArray[] getSortedTestData() { + IntValueArray iva0 = new IntValueArray(); + + IntValueArray iva1 = new IntValueArray(); + iva1.add(new IntValue(5)); + + IntValueArray iva2 = new IntValueArray(); + iva2.add(new IntValue(5)); + iva2.add(new IntValue(10)); + + return new IntValueArray[]{ iva0, iva1, iva2 }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java new file mode 100644 index 0000000..1ee24c9 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.IntValue; + +import java.util.Random; + +/** + * A test for the {@link IntValueArraySerializer}. + */ +public class IntValueArraySerializerTest extends SerializerTestBase<IntValueArray> { + + @Override + protected TypeSerializer<IntValueArray> createSerializer() { + return new IntValueArraySerializer(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<IntValueArray> getTypeClass() { + return IntValueArray.class; + } + + @Override + protected IntValueArray[] getTestData() { + int defaultElements = IntValueArray.DEFAULT_CAPACITY_IN_BYTES / IntValueArray.ELEMENT_LENGTH_IN_BYTES; + + Random rnd = new Random(874597969123412341L); + int rndInt = rnd.nextInt(); + + IntValueArray iva0 = new IntValueArray(); + + IntValueArray iva1 = new IntValueArray(); + iva1.addAll(iva0); + iva1.add(new IntValue(0)); + + IntValueArray iva2 = new IntValueArray(); + iva2.addAll(iva1); + iva2.add(new IntValue(1)); + + IntValueArray iva3 = new IntValueArray(); + iva3.addAll(iva2); + iva3.add(new IntValue(-1)); + + IntValueArray iva4 = new IntValueArray(); + iva4.addAll(iva3); + iva4.add(new IntValue(Integer.MAX_VALUE)); + + IntValueArray iva5 = new IntValueArray(); + iva5.addAll(iva4); + iva5.add(new IntValue(Integer.MIN_VALUE)); + + IntValueArray iva6 = new IntValueArray(); + iva6.addAll(iva5); + iva6.add(new IntValue(rndInt)); + + IntValueArray iva7 = new IntValueArray(); + iva7.addAll(iva6); + iva7.add(new IntValue(-rndInt)); + + IntValueArray iva8 = new IntValueArray(); + iva8.addAll(iva7); + for (int i = 0 ; i < 1.5 * defaultElements ; i++) { + iva8.add(new IntValue(i)); + } + iva8.addAll(iva8); + + return new IntValueArray[] {iva0, iva1, iva2, iva3, iva4, iva5, iva6, iva7, iva8}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java new file mode 100644 index 0000000..2e1282a --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/IntValueArrayTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.types.IntValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class IntValueArrayTest { + + @Test + public void testBoundedArray() { + int count = IntValueArray.DEFAULT_CAPACITY_IN_BYTES / IntValueArray.ELEMENT_LENGTH_IN_BYTES; + + ValueArray<IntValue> iva = new IntValueArray(IntValueArray.DEFAULT_CAPACITY_IN_BYTES); + + // fill the array + for (int i = 0 ; i < count ; i++) { + assertFalse(iva.isFull()); + assertEquals(i, iva.size()); + + assertTrue(iva.add(new IntValue(i))); + + assertEquals(i + 1, iva.size()); + } + + // array is now full + assertTrue(iva.isFull()); + assertEquals(count, iva.size()); + + // verify the array values + int idx = 0; + for (IntValue lv : iva) { + assertEquals(idx++, lv.getValue()); + } + + // add element past end of array + assertFalse(iva.add(new IntValue(count))); + assertFalse(iva.addAll(iva)); + + // test copy + assertEquals(iva, iva.copy()); + + // test copyTo + IntValueArray iva_to = new IntValueArray(); + iva.copyTo(iva_to); + assertEquals(iva, iva_to); + + // test clear + iva.clear(); + assertEquals(0, iva.size()); + } + + @Test + public void testUnboundedArray() { + int count = 4096; + + ValueArray<IntValue> iva = new IntValueArray(); + + // add several elements + for (int i = 0 ; i < count ; i++) { + assertFalse(iva.isFull()); + assertEquals(i, iva.size()); + + assertTrue(iva.add(new IntValue(i))); + + assertEquals(i + 1, iva.size()); + } + + // array never fills + assertFalse(iva.isFull()); + assertEquals(count, iva.size()); + + // verify the array values + int idx = 0; + for (IntValue lv : iva) { + assertEquals(idx++, lv.getValue()); + } + + // add element past end of array + assertTrue(iva.add(new IntValue(count))); + assertTrue(iva.addAll(iva)); + + // test copy + assertEquals(iva, iva.copy()); + + // test copyTo + IntValueArray iva_to = new IntValueArray(); + iva.copyTo(iva_to); + assertEquals(iva, iva_to); + + // test mark/reset + int size = iva.size(); + iva.mark(); + assertTrue(iva.add(new IntValue())); + assertEquals(size + 1, iva.size()); + iva.reset(); + assertEquals(size, iva.size()); + + // test clear + iva.clear(); + assertEquals(0, iva.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java new file mode 100644 index 0000000..af9dbdb --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparatorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.LongValue; + +public class LongValueArrayComparatorTest extends ComparatorTestBase<LongValueArray> { + + @Override + protected TypeComparator<LongValueArray> createComparator(boolean ascending) { + return new LongValueArrayComparator(ascending); + } + + @Override + protected TypeSerializer<LongValueArray> createSerializer() { + return new LongValueArraySerializer(); + } + + @Override + protected LongValueArray[] getSortedTestData() { + LongValueArray lva0 = new LongValueArray(); + + LongValueArray lva1 = new LongValueArray(); + lva1.add(new LongValue(5)); + + LongValueArray lva2 = new LongValueArray(); + lva2.add(new LongValue(50)); + lva2.add(new LongValue(100)); + + return new LongValueArray[]{ lva0, lva1, lva2 }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java new file mode 100644 index 0000000..1cd0a6c --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.LongValue; + +import java.util.Random; + +/** + * A test for the {@link LongValueArraySerializer}. + */ +public class LongValueArraySerializerTest extends SerializerTestBase<LongValueArray> { + + @Override + protected TypeSerializer<LongValueArray> createSerializer() { + return new LongValueArraySerializer(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<LongValueArray> getTypeClass() { + return LongValueArray.class; + } + + @Override + protected LongValueArray[] getTestData() { + int defaultElements = LongValueArray.DEFAULT_CAPACITY_IN_BYTES / LongValueArray.ELEMENT_LENGTH_IN_BYTES; + + Random rnd = new Random(874597969123412341L); + long rndLong = rnd.nextLong(); + + LongValueArray lva0 = new LongValueArray(); + + LongValueArray lva1 = new LongValueArray(); + lva1.addAll(lva0); + lva1.add(new LongValue(0)); + + LongValueArray lva2 = new LongValueArray(); + lva2.addAll(lva1); + lva2.add(new LongValue(1)); + + LongValueArray lva3 = new LongValueArray(); + lva3.addAll(lva2); + lva3.add(new LongValue(-1)); + + LongValueArray lva4 = new LongValueArray(); + lva4.addAll(lva3); + lva4.add(new LongValue(Long.MAX_VALUE)); + + LongValueArray lva5 = new LongValueArray(); + lva5.addAll(lva4); + lva5.add(new LongValue(Long.MIN_VALUE)); + + LongValueArray lva6 = new LongValueArray(); + lva6.addAll(lva5); + lva6.add(new LongValue(rndLong)); + + LongValueArray lva7 = new LongValueArray(); + lva7.addAll(lva6); + lva7.add(new LongValue(-rndLong)); + + LongValueArray lva8 = new LongValueArray(); + lva8.addAll(lva7); + for (int i = 0 ; i < 1.5 * defaultElements ; i++) { + lva8.add(new LongValue(i)); + } + lva8.addAll(lva8); + + return new LongValueArray[] {lva0, lva1, lva2, lva3, lva4, lva5, lva6, lva7, lva8}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java new file mode 100644 index 0000000..cfc345e --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/LongValueArrayTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.types.LongValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LongValueArrayTest { + + @Test + public void testBoundedArray() { + int count = LongValueArray.DEFAULT_CAPACITY_IN_BYTES / LongValueArray.ELEMENT_LENGTH_IN_BYTES; + + ValueArray<LongValue> lva = new LongValueArray(LongValueArray.DEFAULT_CAPACITY_IN_BYTES); + + // fill the array + for (int i = 0 ; i < count ; i++) { + assertFalse(lva.isFull()); + assertEquals(i, lva.size()); + + assertTrue(lva.add(new LongValue(i))); + + assertEquals(i + 1, lva.size()); + } + + // array is now full + assertTrue(lva.isFull()); + assertEquals(count, lva.size()); + + // verify the array values + int idx = 0; + for (LongValue lv : lva) { + assertEquals(idx++, lv.getValue()); + } + + // add element past end of array + assertFalse(lva.add(new LongValue(count))); + assertFalse(lva.addAll(lva)); + + // test copy + assertEquals(lva, lva.copy()); + + // test copyTo + LongValueArray lva_to = new LongValueArray(); + lva.copyTo(lva_to); + assertEquals(lva, lva_to); + + // test clear + lva.clear(); + assertEquals(0, lva.size()); + } + + @Test + public void testUnboundedArray() { + int count = 4096; + + ValueArray<LongValue> lva = new LongValueArray(); + + // add several elements + for (int i = 0 ; i < count ; i++) { + assertFalse(lva.isFull()); + assertEquals(i, lva.size()); + + assertTrue(lva.add(new LongValue(i))); + + assertEquals(i + 1, lva.size()); + } + + // array never fills + assertFalse(lva.isFull()); + assertEquals(count, lva.size()); + + // verify the array values + int idx = 0; + for (LongValue lv : lva) { + assertEquals(idx++, lv.getValue()); + } + + // add element past end of array + assertTrue(lva.add(new LongValue(count))); + assertTrue(lva.addAll(lva)); + + // test copy + assertEquals(lva, lva.copy()); + + // test copyTo + LongValueArray lva_to = new LongValueArray(); + lva.copyTo(lva_to); + assertEquals(lva, lva_to); + + // test mark/reset + int size = lva.size(); + lva.mark(); + assertTrue(lva.add(new LongValue())); + assertEquals(size + 1, lva.size()); + lva.reset(); + assertEquals(size, lva.size()); + + // test clear + lva.clear(); + assertEquals(0, lva.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java new file mode 100644 index 0000000..10fa2e2 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparatorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.NullValue; + +public class NullValueArrayComparatorTest extends ComparatorTestBase<NullValueArray> { + + @Override + protected TypeComparator<NullValueArray> createComparator(boolean ascending) { + return new NullValueArrayComparator(ascending); + } + + @Override + protected TypeSerializer<NullValueArray> createSerializer() { + return new NullValueArraySerializer(); + } + + @Override + protected NullValueArray[] getSortedTestData() { + NullValueArray nva0 = new NullValueArray(); + + NullValueArray nva1 = new NullValueArray(); + nva1.add(NullValue.getInstance()); + + NullValueArray nva2 = new NullValueArray(); + nva2.add(NullValue.getInstance()); + nva2.add(NullValue.getInstance()); + + return new NullValueArray[]{ nva0, nva1 }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java new file mode 100644 index 0000000..2253a42 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.NullValue; + +/** + * A test for the {@link NullValueArraySerializer}. + */ +public class NullValueArraySerializerTest extends SerializerTestBase<NullValueArray> { + + @Override + protected TypeSerializer<NullValueArray> createSerializer() { + return new NullValueArraySerializer(); + } + + @Override + protected int getLength() { + return 4; + } + + @Override + protected Class<NullValueArray> getTypeClass() { + return NullValueArray.class; + } + + @Override + protected NullValueArray[] getTestData() { + NullValue nv = NullValue.getInstance(); + + NullValueArray nva0 = new NullValueArray(); + + NullValueArray nva1 = new NullValueArray(); + nva1.addAll(nva0); + nva1.add(nv); + + NullValueArray nva2 = new NullValueArray(); + nva2.addAll(nva1); + nva2.add(nv); + + NullValueArray nva3 = new NullValueArray(); + nva3.addAll(nva2); + for (int i = 0 ; i < 100 ; i++) { + nva3.add(nv); + } + nva3.addAll(nva3); + + return new NullValueArray[] {nva0, nva1, nva2, nva3}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java new file mode 100644 index 0000000..6d013a1 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/NullValueArrayTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NullValueArrayTest { + + @Test + public void testUnboundedArray() { + int count = 4096; + + ValueArray<NullValue> nva = new NullValueArray(); + + // add several elements + for (int i = 0 ; i < count ; i++) { + assertFalse(nva.isFull()); + assertEquals(i, nva.size()); + + assertTrue(nva.add(NullValue.getInstance())); + + assertEquals(i + 1, nva.size()); + } + + // array never fills + assertFalse(nva.isFull()); + assertEquals(count, nva.size()); + + // verify the array values + int idx = 0; + for (NullValue nv : nva) { + assertEquals(NullValue.getInstance(), nv); + } + + // add element past end of array + assertTrue(nva.add(NullValue.getInstance())); + assertTrue(nva.addAll(nva)); + + // test copy + assertEquals(nva, nva.copy()); + + // test copyTo + NullValueArray nva_to = new NullValueArray(); + nva.copyTo(nva_to); + assertEquals(nva, nva_to); + + // test mark/reset + int size = nva.size(); + nva.mark(); + assertTrue(nva.add(NullValue.getInstance())); + assertEquals(size + 1, nva.size()); + nva.reset(); + assertEquals(size, nva.size()); + + // test clear + nva.clear(); + assertEquals(0, nva.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java new file mode 100644 index 0000000..e7cc102 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparatorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.StringValue; + +public class StringValueArrayComparatorTest extends ComparatorTestBase<StringValueArray> { + + @Override + protected TypeComparator<StringValueArray> createComparator(boolean ascending) { + return new StringValueArrayComparator(ascending); + } + + @Override + protected TypeSerializer<StringValueArray> createSerializer() { + return new StringValueArraySerializer(); + } + + @Override + protected StringValueArray[] getSortedTestData() { + StringValueArray sva0 = new StringValueArray(); + + StringValueArray sva1 = new StringValueArray(); + sva1.add(new StringValue("abc")); + + StringValueArray sva2 = new StringValueArray(); + sva2.add(new StringValue("qrs")); + sva2.add(new StringValue("xyz")); + + return new StringValueArray[]{ sva0, sva1, sva2 }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java new file mode 100644 index 0000000..f5909da --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.StringValue; + +import java.util.Random; + +/** + * A test for the {@link StringValueArraySerializer}. + */ +public class StringValueArraySerializerTest extends SerializerTestBase<StringValueArray> { + + @Override + protected TypeSerializer<StringValueArray> createSerializer() { + return new StringValueArraySerializer(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<StringValueArray> getTypeClass() { + return StringValueArray.class; + } + + @Override + protected StringValueArray[] getTestData() { + int defaultElements = StringValueArray.DEFAULT_CAPACITY_IN_BYTES; + + Random rnd = new Random(874597969123412341L); + long rndLong = rnd.nextLong(); + + StringValueArray sva0 = new StringValueArray(); + + StringValueArray sva1 = new StringValueArray(); + sva1.addAll(sva0); + sva1.add(new StringValue(String.valueOf(0))); + + StringValueArray sva2 = new StringValueArray(); + sva2.addAll(sva1); + sva2.add(new StringValue(String.valueOf(1))); + + StringValueArray sva3 = new StringValueArray(); + sva3.addAll(sva2); + sva3.add(new StringValue(String.valueOf(-1))); + + StringValueArray sva4 = new StringValueArray(); + sva4.addAll(sva3); + sva4.add(new StringValue(String.valueOf(Long.MAX_VALUE))); + + StringValueArray sva5 = new StringValueArray(); + sva5.addAll(sva4); + sva5.add(new StringValue(String.valueOf(Long.MIN_VALUE))); + + StringValueArray sva6 = new StringValueArray(); + sva6.addAll(sva5); + sva6.add(new StringValue(String.valueOf(rndLong))); + + StringValueArray sva7 = new StringValueArray(); + sva7.addAll(sva6); + sva7.add(new StringValue(String.valueOf(-rndLong))); + + StringValueArray sva8 = new StringValueArray(); + sva8.addAll(sva7); + for (int i = 0 ; i < 1.5 * defaultElements ; i++) { + sva8.add(new StringValue(String.valueOf(i))); + } + sva8.addAll(sva8); + + return new StringValueArray[] {sva0, sva1, sva2, sva3, sva4, sva5, sva6, sva7, sva8}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java new file mode 100644 index 0000000..a425e8e --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/StringValueArrayTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.types.StringValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class StringValueArrayTest { + + @Test + public void testBoundedArray() { + // one byte for length and one byte for character + int count = StringValueArray.DEFAULT_CAPACITY_IN_BYTES / 2; + + ValueArray<StringValue> sva = new StringValueArray(StringValueArray.DEFAULT_CAPACITY_IN_BYTES); + + // fill the array + for (int i = 0 ; i < count ; i++) { + assertFalse(sva.isFull()); + assertEquals(i, sva.size()); + + assertTrue(sva.add(new StringValue(Character.toString((char)(i & 0x7F))))); + + assertEquals(i + 1, sva.size()); + } + + // array is now full + assertTrue(sva.isFull()); + assertEquals(count, sva.size()); + + // verify the array values + int idx = 0; + for (StringValue sv : sva) { + assertEquals((idx++) & 0x7F, sv.getValue().charAt(0)); + } + + // add element past end of array + assertFalse(sva.add(new StringValue(String.valueOf((char)count)))); + assertFalse(sva.addAll(sva)); + + // test copy + assertEquals(sva, sva.copy()); + + // test copyTo + StringValueArray sva_to = new StringValueArray(); + sva.copyTo(sva_to); + assertEquals(sva, sva_to); + + // test clear + sva.clear(); + assertEquals(0, sva.size()); + } + + @Test + public void testBoundedArrayWithVariableLengthCharacters() { + // characters alternatingly take 1 and 2 bytes (plus one byte for length) + int count = 1280; + + ValueArray<StringValue> sva = new StringValueArray(3200); + + // fill the array + for (int i = 0 ; i < count ; i++) { + assertFalse(sva.isFull()); + assertEquals(i, sva.size()); + + assertTrue(sva.add(new StringValue(Character.toString((char)(i & 0xFF))))); + + assertEquals(i + 1, sva.size()); + } + + // array is now full + assertTrue(sva.isFull()); + assertEquals(count, sva.size()); + + // verify the array values + int idx = 0; + for (StringValue sv : sva) { + assertEquals((idx++) & 0xFF, sv.getValue().charAt(0)); + } + + // add element past end of array + assertFalse(sva.add(new StringValue(String.valueOf((char)count)))); + assertFalse(sva.addAll(sva)); + + // test copy + assertEquals(sva, sva.copy()); + + // test copyTo + StringValueArray sva_to = new StringValueArray(); + sva.copyTo(sva_to); + assertEquals(sva, sva_to); + + // test clear + sva.clear(); + assertEquals(0, sva.size()); + } + + @Test + public void testUnboundedArray() { + int count = 4096; + + ValueArray<StringValue> sva = new StringValueArray(); + + // add several elements + for (int i = 0 ; i < count ; i++) { + assertFalse(sva.isFull()); + assertEquals(i, sva.size()); + + assertTrue(sva.add(new StringValue(String.valueOf((char)i)))); + + assertEquals(i + 1, sva.size()); + } + + // array never fills + assertFalse(sva.isFull()); + assertEquals(count, sva.size()); + + // verify the array values + int idx = 0; + for (StringValue sv : sva) { + assertEquals(idx++, sv.getValue().charAt(0)); + } + + // add element past end of array + assertTrue(sva.add(new StringValue(String.valueOf((char)count)))); + assertTrue(sva.addAll(sva)); + + // test copy + assertEquals(sva, sva.copy()); + + // test copyTo + StringValueArray sva_to = new StringValueArray(); + sva.copyTo(sva_to); + assertEquals(sva, sva_to); + + // test mark/reset + int size = sva.size(); + sva.mark(); + assertTrue(sva.add(new StringValue())); + assertEquals(size + 1, sva.size()); + sva.reset(); + assertEquals(size, sva.size()); + + // test clear + sva.clear(); + assertEquals(0, sva.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java new file mode 100644 index 0000000..73cecc0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArrayTypeInfoTest.java @@ -0,0 +1,64 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; +import org.junit.Test; + +import static org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.INT_VALUE_ARRAY_TYPE_INFO; +import static org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.LONG_VALUE_ARRAY_TYPE_INFO; +import static org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.NULL_VALUE_ARRAY_TYPE_INFO; +import static org.apache.flink.graph.types.valuearray.ValueArrayTypeInfo.STRING_VALUE_ARRAY_TYPE_INFO; +import static org.junit.Assert.assertEquals; + +public class ValueArrayTypeInfoTest { + + private ExecutionConfig config = new ExecutionConfig(); + + @Test + public void testIntValueArray() { + assertEquals(INT_VALUE_ARRAY_TYPE_INFO.getTypeClass(), ValueArray.class); + assertEquals(INT_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), IntValueArraySerializer.class); + assertEquals(INT_VALUE_ARRAY_TYPE_INFO.createComparator(true, config).getClass(), IntValueArrayComparator.class); + } + + @Test + public void testLongValueArray() { + assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.getTypeClass(), ValueArray.class); + assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), LongValueArraySerializer.class); + assertEquals(LONG_VALUE_ARRAY_TYPE_INFO.createComparator(true, config).getClass(), LongValueArrayComparator.class); + } + + @Test + public void testNullValueArray() { + assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.getTypeClass(), ValueArray.class); + assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), NullValueArraySerializer.class); + assertEquals(NULL_VALUE_ARRAY_TYPE_INFO.createComparator(true, config).getClass(), NullValueArrayComparator.class); + } + + @Test + public void testStringValueArray() { + assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.getTypeClass(), ValueArray.class); + assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.createSerializer(config).getClass(), StringValueArraySerializer.class); + assertEquals(STRING_VALUE_ARRAY_TYPE_INFO.createComparator(true, config).getClass(), StringValueArrayComparator.class); + } +}
