[FLINK-3695] [gelly] ValueArray types Provide compact and efficiently serializable and comparable array implementations for Flink mutable Value types and Java primitives.
This cloeses #3382 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/963f46e7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/963f46e7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/963f46e7 Branch: refs/heads/master Commit: 963f46e7179db034fd1d444469f4af58eac87409 Parents: 43158a8 Author: Greg Hogan <[email protected]> Authored: Tue Feb 21 11:40:22 2017 -0500 Committer: Greg Hogan <[email protected]> Committed: Fri Mar 31 11:14:36 2017 -0400 ---------------------------------------------------------------------- flink-libraries/flink-gelly/pom.xml | 12 +- .../graph/types/valuearray/IntValueArray.java | 398 ++++++++++++++ .../valuearray/IntValueArrayComparator.java | 156 ++++++ .../valuearray/IntValueArraySerializer.java | 85 +++ .../graph/types/valuearray/LongValueArray.java | 399 ++++++++++++++ .../valuearray/LongValueArrayComparator.java | 156 ++++++ .../valuearray/LongValueArraySerializer.java | 85 +++ .../graph/types/valuearray/NullValueArray.java | 267 ++++++++++ .../valuearray/NullValueArrayComparator.java | 147 ++++++ .../valuearray/NullValueArraySerializer.java | 85 +++ .../types/valuearray/StringValueArray.java | 518 +++++++++++++++++++ .../valuearray/StringValueArrayComparator.java | 217 ++++++++ .../valuearray/StringValueArraySerializer.java | 85 +++ .../graph/types/valuearray/ValueArray.java | 97 ++++ .../types/valuearray/ValueArrayFactory.java | 81 +++ .../types/valuearray/ValueArrayTypeInfo.java | 159 ++++++ .../valuearray/ValueArrayTypeInfoFactory.java | 41 ++ .../valuearray/IntValueArrayComparatorTest.java | 51 ++ .../valuearray/IntValueArraySerializerTest.java | 93 ++++ .../types/valuearray/IntValueArrayTest.java | 123 +++++ .../LongValueArrayComparatorTest.java | 51 ++ .../LongValueArraySerializerTest.java | 93 ++++ .../types/valuearray/LongValueArrayTest.java | 123 +++++ .../NullValueArrayComparatorTest.java | 51 ++ .../NullValueArraySerializerTest.java | 68 +++ .../types/valuearray/NullValueArrayTest.java | 80 +++ .../StringValueArrayComparatorTest.java | 51 ++ .../StringValueArraySerializerTest.java | 93 ++++ .../types/valuearray/StringValueArrayTest.java | 168 ++++++ .../valuearray/ValueArrayTypeInfoTest.java | 64 +++ 30 files changed, 4095 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/pom.xml b/flink-libraries/flink-gelly/pom.xml index f773c70..fa09102 100644 --- a/flink-libraries/flink-gelly/pom.xml +++ b/flink-libraries/flink-gelly/pom.xml @@ -28,7 +28,7 @@ under the License. <version>1.3-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - + <artifactId>flink-gelly_2.10</artifactId> <name>flink-gelly</name> @@ -59,7 +59,7 @@ under the License. </dependency> <!-- test dependencies --> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.10</artifactId> @@ -69,6 +69,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-optimizer_2.10</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java new file mode 100644 index 0000000..0e3812d --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArray.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +/** + * An array of {@link IntValue}. + */ +public class IntValueArray +implements ValueArray<IntValue> { + + protected static final int ELEMENT_LENGTH_IN_BYTES = 4; + + protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096; + + // see note in ArrayList, HashTable, ... + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private boolean isBounded; + + private int[] data; + + // the number of elements currently stored + private int position; + + // location of the bookmark used by mark() and reset() + private transient int mark; + + // hasher used to generate the normalized key + private Murmur3_32 hash = new Murmur3_32(0x11d2d865); + + // hash result stored as normalized key + private IntValue hashValue = new IntValue(); + + /** + * Initializes an expandable array with default capacity. + */ + public IntValueArray() { + isBounded = false; + initialize(DEFAULT_CAPACITY_IN_BYTES); + } + + /** + * Initializes a fixed-size array with the provided number of bytes. + * + * @param bytes number of bytes of the encapsulated array + */ + public IntValueArray(int bytes) { + isBounded = true; + initialize(bytes); + } + + /** + * Initializes the array with the provided number of bytes. + * + * @param bytes initial size of the encapsulated array in bytes + */ + private void initialize(int bytes) { + int capacity = bytes / ELEMENT_LENGTH_IN_BYTES; + + Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity"); + Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE); + + data = new int[capacity]; + } + + // -------------------------------------------------------------------------------------------- + + /** + * If the size of the array is insufficient to hold the given capacity then + * copy the array into a new, larger array. + * + * @param minCapacity minimum required number of elements + */ + private void ensureCapacity(int minCapacity) { + long currentCapacity = data.length; + + if (minCapacity <= currentCapacity) { + return; + } + + // increase capacity by at least ~50% + long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1)); + int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity); + + if (newCapacity < minCapacity) { + // throw exception as unbounded arrays are not expected to fill + throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE); + } + + data = Arrays.copyOf(data, newCapacity); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + for (int idx = 0 ; idx < this.position ; idx++) { + sb.append(data[idx]); + if (idx < position - 1) { + sb.append(","); + } + } + sb.append("]"); + + return sb.toString(); + } + + // -------------------------------------------------------------------------------------------- + // Iterable + // -------------------------------------------------------------------------------------------- + + private final ReadIterator iterator = new ReadIterator(); + + @Override + public Iterator<IntValue> iterator() { + iterator.reset(); + return iterator; + } + + private class ReadIterator + implements Iterator<IntValue> { + private IntValue value = new IntValue(); + + private int pos; + + @Override + public boolean hasNext() { + return pos < position; + } + + @Override + public IntValue next() { + value.setValue(data[pos++]); + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + public void reset() { + pos = 0; + } + } + + // -------------------------------------------------------------------------------------------- + // IOReadableWritable + // -------------------------------------------------------------------------------------------- + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(position); + + for (int i = 0 ; i < position ; i++) { + out.writeInt(data[i]); + } + } + + @Override + public void read(DataInputView in) throws IOException { + position = in.readInt(); + mark = 0; + + ensureCapacity(position); + + for (int i = 0 ; i < position ; i++) { + data[i] = in.readInt(); + } + } + + // -------------------------------------------------------------------------------------------- + // NormalizableKey + // -------------------------------------------------------------------------------------------- + + @Override + public int getMaxNormalizedKeyLen() { + return hashValue.getMaxNormalizedKeyLen(); + } + + @Override + public void copyNormalizedKey(MemorySegment target, int offset, int len) { + hash.reset(); + + hash.hash(position); + for (int i = 0 ; i < position ; i++) { + hash.hash(data[i]); + } + + hashValue.setValue(hash.hash()); + hashValue.copyNormalizedKey(target, offset, len); + } + + // -------------------------------------------------------------------------------------------- + // Comparable + // -------------------------------------------------------------------------------------------- + + @Override + public int compareTo(ValueArray<IntValue> o) { + IntValueArray other = (IntValueArray) o; + + int min = Math.min(position, other.position); + for (int i = 0 ; i < min ; i++) { + int cmp = Integer.compare(data[i], other.data[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(position, other.position); + } + + // -------------------------------------------------------------------------------------------- + // Key + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + int hash = 1; + + for (int i = 0 ; i < position ; i++) { + hash = 31 * hash + data[i]; + } + + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof IntValueArray) { + IntValueArray other = (IntValueArray) obj; + + if (position != other.position) { + return false; + } + + for (int i = 0 ; i < position ; i++) { + if (data[i] != other.data[i]) { + return false; + } + } + + return true; + } + + return false; + } + + // -------------------------------------------------------------------------------------------- + // ResettableValue + // -------------------------------------------------------------------------------------------- + + @Override + public void setValue(ValueArray<IntValue> value) { + value.copyTo(this); + } + + // -------------------------------------------------------------------------------------------- + // CopyableValue + // -------------------------------------------------------------------------------------------- + + @Override + public int getBinaryLength() { + return -1; + } + + @Override + public void copyTo(ValueArray<IntValue> target) { + IntValueArray other = (IntValueArray) target; + + other.position = position; + other.mark = mark; + + other.ensureCapacity(position); + System.arraycopy(data, 0, other.data, 0, position); + } + + @Override + public ValueArray<IntValue> copy() { + ValueArray<IntValue> copy = new IntValueArray(); + + this.copyTo(copy); + + return copy; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + copyInternal(source, target); + } + + protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException { + int count = source.readInt(); + target.writeInt(count); + + int bytes = ELEMENT_LENGTH_IN_BYTES * count; + target.write(source, bytes); + } + + // -------------------------------------------------------------------------------------------- + // ValueArray + // -------------------------------------------------------------------------------------------- + + @Override + public int size() { + return position; + } + + @Override + public boolean isFull() { + if (isBounded) { + return position == data.length; + } else { + return position == MAX_ARRAY_SIZE; + } + } + + @Override + public boolean add(IntValue value) { + int newPosition = position + 1; + + if (newPosition > data.length) { + if (isBounded) { + return false; + } else { + ensureCapacity(newPosition); + } + } + + data[position] = value.getValue(); + position = newPosition; + + return true; + } + + @Override + public boolean addAll(ValueArray<IntValue> other) { + IntValueArray source = (IntValueArray) other; + + int sourceSize = source.position; + int newPosition = position + sourceSize; + + if (newPosition > data.length) { + if (isBounded) { + return false; + } else { + ensureCapacity(newPosition); + } + } + + System.arraycopy(source.data, 0, data, position, sourceSize); + position = newPosition; + + return true; + } + + @Override + public void clear() { + position = 0; + } + + @Override + public void mark() { + mark = position; + } + + @Override + public void reset() { + position = mark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java new file mode 100644 index 0000000..bbc9bc5 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArrayComparator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for IntValueArray based on CopyableValueComparator. + * + * This can be used for grouping keys but not for sorting keys. + */ +@Internal +public class IntValueArrayComparator extends TypeComparator<IntValueArray> { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final IntValueArray reference = new IntValueArray(); + + private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; + + public IntValueArrayComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(IntValueArray record) { + return record.hashCode(); + } + + @Override + public void setReference(IntValueArray toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(IntValueArray candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator<IntValueArray> referencedComparator) { + int comp = ((IntValueArrayComparator) referencedComparator).reference.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(IntValueArray first, IntValueArray second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + int firstCount = firstSource.readInt(); + int secondCount = secondSource.readInt(); + + int minCount = Math.min(firstCount, secondCount); + while (minCount-- > 0) { + int firstValue = firstSource.readInt(); + int secondValue = secondSource.readInt(); + + int cmp = Integer.compare(firstValue, secondValue); + if (cmp != 0) { + return ascendingComparison ? cmp : -cmp; + } + } + + int cmp = Integer.compare(firstCount, secondCount); + return ascendingComparison ? cmp : -cmp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(IntValueArray.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(IntValueArray record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator<IntValueArray> duplicate() { + return new IntValueArrayComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(IntValueArray record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IntValueArray readWithKeyDenormalization(IntValueArray reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java new file mode 100644 index 0000000..b86fe87 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Specialized serializer for {@code IntValueArray}. + */ +public final class IntValueArraySerializer extends TypeSerializerSingleton<IntValueArray> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public IntValueArray createInstance() { + return new IntValueArray(); + } + + @Override + public IntValueArray copy(IntValueArray from) { + return copy(from, new IntValueArray()); + } + + @Override + public IntValueArray copy(IntValueArray from, IntValueArray reuse) { + reuse.setValue(from); + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(IntValueArray record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public IntValueArray deserialize(DataInputView source) throws IOException { + return deserialize(new IntValueArray(), source); + } + + @Override + public IntValueArray deserialize(IntValueArray reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + IntValueArray.copyInternal(source, target); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof IntValueArraySerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java new file mode 100644 index 0000000..7c01e6c --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArray.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +/** + * An array of {@link LongValue}. + */ +public class LongValueArray +implements ValueArray<LongValue> { + + protected static final int ELEMENT_LENGTH_IN_BYTES = 8; + + protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096; + + // see note in ArrayList, HashTable, ... + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private boolean isBounded; + + private long[] data; + + // the number of elements currently stored + private int position; + + // location of the bookmark used by mark() and reset() + private transient int mark; + + // hasher used to generate the normalized key + private Murmur3_32 hash = new Murmur3_32(0xdf099ea8); + + // hash result stored as normalized key + private IntValue hashValue = new IntValue(); + + /** + * Initializes an expandable array with default capacity. + */ + public LongValueArray() { + isBounded = false; + initialize(DEFAULT_CAPACITY_IN_BYTES); + } + + /** + * Initializes a fixed-size array with the provided number of bytes. + * + * @param bytes number of bytes of the encapsulated array + */ + public LongValueArray(int bytes) { + isBounded = true; + initialize(bytes); + } + + /** + * Initializes the array with the provided number of bytes. + * + * @param bytes initial size of the encapsulated array in bytes + */ + private void initialize(int bytes) { + int capacity = bytes / ELEMENT_LENGTH_IN_BYTES; + + Preconditions.checkArgument(capacity > 0, "Requested array with zero capacity"); + Preconditions.checkArgument(capacity <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE); + + data = new long[capacity]; + } + + // -------------------------------------------------------------------------------------------- + + /** + * If the size of the array is insufficient to hold the given capacity then + * copy the array into a new, larger array. + * + * @param minCapacity minimum required number of elements + */ + private void ensureCapacity(int minCapacity) { + long currentCapacity = data.length; + + if (minCapacity <= currentCapacity) { + return; + } + + // increase capacity by at least ~50% + long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1)); + int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity); + + if (newCapacity < minCapacity) { + // throw exception as unbounded arrays are not expected to fill + throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE); + } + + data = Arrays.copyOf(data, newCapacity); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + for (int idx = 0 ; idx < this.position ; idx++) { + sb.append(data[idx]); + if (idx < position - 1) { + sb.append(","); + } + } + sb.append("]"); + + return sb.toString(); + } + + // -------------------------------------------------------------------------------------------- + // Iterable + // -------------------------------------------------------------------------------------------- + + private final ReadIterator iterator = new ReadIterator(); + + @Override + public Iterator<LongValue> iterator() { + iterator.reset(); + return iterator; + } + + private class ReadIterator + implements Iterator<LongValue> { + private LongValue value = new LongValue(); + + private int pos; + + @Override + public boolean hasNext() { + return pos < position; + } + + @Override + public LongValue next() { + value.setValue(data[pos++]); + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + public void reset() { + pos = 0; + } + } + + // -------------------------------------------------------------------------------------------- + // IOReadableWritable + // -------------------------------------------------------------------------------------------- + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(position); + + for (int i = 0 ; i < position ; i++) { + out.writeLong(data[i]); + } + } + + @Override + public void read(DataInputView in) throws IOException { + position = in.readInt(); + mark = 0; + + ensureCapacity(position); + + for (int i = 0 ; i < position ; i++) { + data[i] = in.readLong(); + } + } + + // -------------------------------------------------------------------------------------------- + // NormalizableKey + // -------------------------------------------------------------------------------------------- + + @Override + public int getMaxNormalizedKeyLen() { + return hashValue.getMaxNormalizedKeyLen(); + } + + @Override + public void copyNormalizedKey(MemorySegment target, int offset, int len) { + hash.reset(); + + hash.hash(position); + for (int i = 0 ; i < position ; i++) { + hash.hash(data[i]); + } + + hashValue.setValue(hash.hash()); + hashValue.copyNormalizedKey(target, offset, len); + } + + // -------------------------------------------------------------------------------------------- + // Comparable + // -------------------------------------------------------------------------------------------- + + @Override + public int compareTo(ValueArray<LongValue> o) { + LongValueArray other = (LongValueArray) o; + + int min = Math.min(position, other.position); + for (int i = 0 ; i < min ; i++) { + int cmp = Long.compare(data[i], other.data[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(position, other.position); + } + + // -------------------------------------------------------------------------------------------- + // Key + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + int hash = 1; + + for (int i = 0 ; i < position ; i++) { + hash = 31 * hash + (int) (data[i] ^ data[i] >>> 32); + } + + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof LongValueArray) { + LongValueArray other = (LongValueArray) obj; + + if (position != other.position) { + return false; + } + + for (int i = 0 ; i < position ; i++) { + if (data[i] != other.data[i]) { + return false; + } + } + + return true; + } + + return false; + } + + // -------------------------------------------------------------------------------------------- + // ResettableValue + // -------------------------------------------------------------------------------------------- + + @Override + public void setValue(ValueArray<LongValue> value) { + value.copyTo(this); + } + + // -------------------------------------------------------------------------------------------- + // CopyableValue + // -------------------------------------------------------------------------------------------- + + @Override + public int getBinaryLength() { + return -1; + } + + @Override + public void copyTo(ValueArray<LongValue> target) { + LongValueArray other = (LongValueArray) target; + + other.position = position; + other.mark = mark; + + other.ensureCapacity(position); + System.arraycopy(data, 0, other.data, 0, position); + } + + @Override + public ValueArray<LongValue> copy() { + ValueArray<LongValue> copy = new LongValueArray(); + + this.copyTo(copy); + + return copy; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + copyInternal(source, target); + } + + protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException { + int count = source.readInt(); + target.writeInt(count); + + int bytes = ELEMENT_LENGTH_IN_BYTES * count; + target.write(source, bytes); + } + + // -------------------------------------------------------------------------------------------- + // ValueArray + // -------------------------------------------------------------------------------------------- + + @Override + public int size() { + return position; + } + + @Override + public boolean isFull() { + if (isBounded) { + return position == data.length; + } else { + return position == MAX_ARRAY_SIZE; + } + } + + @Override + public boolean add(LongValue value) { + int newPosition = position + 1; + + if (newPosition > data.length) { + if (isBounded) { + return false; + } else { + ensureCapacity(newPosition); + } + } + + data[position] = value.getValue(); + position = newPosition; + + return true; + } + + @Override + public boolean addAll(ValueArray<LongValue> other) { + LongValueArray source = (LongValueArray) other; + + int sourceSize = source.position; + int newPosition = position + sourceSize; + + if (newPosition > data.length) { + if (isBounded) { + return false; + } else { + ensureCapacity(newPosition); + } + } + + System.arraycopy(source.data, 0, data, position, sourceSize); + position = newPosition; + + return true; + } + + @Override + public void clear() { + position = 0; + } + + @Override + public void mark() { + mark = position; + } + + @Override + public void reset() { + position = mark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java new file mode 100644 index 0000000..26c3da2 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArrayComparator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for LongValueArray based on CopyableValueComparator. + * + * This can be used for grouping keys but not for sorting keys. + */ +@Internal +public class LongValueArrayComparator extends TypeComparator<LongValueArray> { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final LongValueArray reference = new LongValueArray(); + + private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; + + public LongValueArrayComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(LongValueArray record) { + return record.hashCode(); + } + + @Override + public void setReference(LongValueArray toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(LongValueArray candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator<LongValueArray> referencedComparator) { + int comp = ((LongValueArrayComparator) referencedComparator).reference.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(LongValueArray first, LongValueArray second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + int firstCount = firstSource.readInt(); + int secondCount = secondSource.readInt(); + + int minCount = Math.min(firstCount, secondCount); + while (minCount-- > 0) { + long firstValue = firstSource.readLong(); + long secondValue = secondSource.readLong(); + + int cmp = Long.compare(firstValue, secondValue); + if (cmp != 0) { + return ascendingComparison ? cmp : -cmp; + } + } + + int cmp = Integer.compare(firstCount, secondCount); + return ascendingComparison ? cmp : -cmp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(LongValueArray.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(LongValueArray record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator<LongValueArray> duplicate() { + return new LongValueArrayComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(LongValueArray record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public LongValueArray readWithKeyDenormalization(LongValueArray reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java new file mode 100644 index 0000000..95219b6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Specialized serializer for {@code LongValueArray}. + */ +public final class LongValueArraySerializer extends TypeSerializerSingleton<LongValueArray> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public LongValueArray createInstance() { + return new LongValueArray(); + } + + @Override + public LongValueArray copy(LongValueArray from) { + return copy(from, new LongValueArray()); + } + + @Override + public LongValueArray copy(LongValueArray from, LongValueArray reuse) { + reuse.setValue(from); + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(LongValueArray record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public LongValueArray deserialize(DataInputView source) throws IOException { + return deserialize(new LongValueArray(), source); + } + + @Override + public LongValueArray deserialize(LongValueArray reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + LongValueArray.copyInternal(source, target); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof LongValueArraySerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java new file mode 100644 index 0000000..bf247a2 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArray.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.NullValue; + +import java.io.IOException; +import java.util.Iterator; + +/** + * An array of {@link NullValue}. + */ +public class NullValueArray +implements ValueArray<NullValue> { + + // the number of elements currently stored + private int position; + + // location of the bookmark used by mark() and reset() + private transient int mark; + + // hash result stored as normalized key + private IntValue hashValue = new IntValue(); + + /** + * Initializes an expandable array with default capacity. + */ + public NullValueArray() { + } + + /** + * Initializes a fixed-size array with the provided number of bytes. + * + * @param bytes number of bytes of the encapsulated array + */ + public NullValueArray(int bytes) { + this(); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + for (int idx = 0 ; idx < this.position ; idx++) { + sb.append("â "); + if (idx < position - 1) { + sb.append(","); + } + } + sb.append("]"); + + return sb.toString(); + } + + // -------------------------------------------------------------------------------------------- + // Iterable + // -------------------------------------------------------------------------------------------- + + private final ReadIterator iterator = new ReadIterator(); + + @Override + public Iterator<NullValue> iterator() { + iterator.reset(); + return iterator; + } + + private class ReadIterator + implements Iterator<NullValue> { + private int pos; + + @Override + public boolean hasNext() { + return pos < position; + } + + @Override + public NullValue next() { + pos++; + return NullValue.getInstance(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + public void reset() { + pos = 0; + } + } + + // -------------------------------------------------------------------------------------------- + // IOReadableWritable + // -------------------------------------------------------------------------------------------- + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(position); + } + + @Override + public void read(DataInputView in) throws IOException { + position = in.readInt(); + mark = 0; + } + + // -------------------------------------------------------------------------------------------- + // NormalizableKey + // -------------------------------------------------------------------------------------------- + + @Override + public int getMaxNormalizedKeyLen() { + return hashValue.getMaxNormalizedKeyLen(); + } + + @Override + public void copyNormalizedKey(MemorySegment target, int offset, int len) { + hashValue.setValue(position); + hashValue.copyNormalizedKey(target, offset, len); + } + + // -------------------------------------------------------------------------------------------- + // Comparable + // -------------------------------------------------------------------------------------------- + + @Override + public int compareTo(ValueArray<NullValue> o) { + NullValueArray other = (NullValueArray) o; + + return Integer.compare(position, other.position); + } + + // -------------------------------------------------------------------------------------------- + // Key + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return position; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NullValueArray) { + NullValueArray other = (NullValueArray) obj; + + return position == other.position; + } + + return false; + } + + // -------------------------------------------------------------------------------------------- + // ResettableValue + // -------------------------------------------------------------------------------------------- + + @Override + public void setValue(ValueArray<NullValue> value) { + value.copyTo(this); + } + + // -------------------------------------------------------------------------------------------- + // CopyableValue + // -------------------------------------------------------------------------------------------- + + @Override + public int getBinaryLength() { + return hashValue.getBinaryLength(); + } + + @Override + public void copyTo(ValueArray<NullValue> target) { + NullValueArray other = (NullValueArray) target; + + other.position = position; + } + + @Override + public ValueArray<NullValue> copy() { + ValueArray<NullValue> copy = new NullValueArray(); + + this.copyTo(copy); + + return copy; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.write(source, getBinaryLength()); + } + + // -------------------------------------------------------------------------------------------- + // ValueArray + // -------------------------------------------------------------------------------------------- + + @Override + public int size() { + return position; + } + + @Override + public boolean isFull() { + return position == Integer.MAX_VALUE; + } + + @Override + public boolean add(NullValue value) { + if (position == Integer.MAX_VALUE) { + return false; + } + + position++; + + return true; + } + + @Override + public boolean addAll(ValueArray<NullValue> other) { + NullValueArray source = (NullValueArray) other; + + long newPosition = position + (long) source.position; + + if (newPosition > Integer.MAX_VALUE) { + return false; + } + + position = (int) newPosition; + + return true; + } + + @Override + public void clear() { + position = 0; + } + + @Override + public void mark() { + mark = position; + } + + @Override + public void reset() { + position = mark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java new file mode 100644 index 0000000..2228d6e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArrayComparator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for NullValueArray based on CopyableValueComparator. + * + * This can be used for grouping keys but not for sorting keys. + */ +@Internal +public class NullValueArrayComparator extends TypeComparator<NullValueArray> { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final NullValueArray reference = new NullValueArray(); + + private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; + + public NullValueArrayComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(NullValueArray record) { + return record.hashCode(); + } + + @Override + public void setReference(NullValueArray toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(NullValueArray candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator<NullValueArray> referencedComparator) { + int comp = ((NullValueArrayComparator) referencedComparator).reference.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(NullValueArray first, NullValueArray second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + int firstCount = firstSource.readInt(); + int secondCount = secondSource.readInt(); + + int cmp = Integer.compare(firstCount, secondCount); + return ascendingComparison ? cmp : -cmp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(NullValueArray.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(NullValueArray record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator<NullValueArray> duplicate() { + return new NullValueArrayComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // key normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + // see ComparatorTestBase#testNormalizedKeyReadWriter fixes in FLINK-4705 + return false; + } + + @Override + public void writeWithKeyNormalization(NullValueArray record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public NullValueArray readWithKeyDenormalization(NullValueArray reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java new file mode 100644 index 0000000..233ed20 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Specialized serializer for {@code NullValueArray}. + */ +public final class NullValueArraySerializer extends TypeSerializerSingleton<NullValueArray> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public NullValueArray createInstance() { + return new NullValueArray(); + } + + @Override + public NullValueArray copy(NullValueArray from) { + return copy(from, new NullValueArray()); + } + + @Override + public NullValueArray copy(NullValueArray from, NullValueArray reuse) { + reuse.setValue(from); + return reuse; + } + + @Override + public int getLength() { + return 4; + } + + @Override + public void serialize(NullValueArray record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public NullValueArray deserialize(DataInputView source) throws IOException { + return deserialize(new NullValueArray(), source); + } + + @Override + public NullValueArray deserialize(NullValueArray reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.write(source, getLength()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof NullValueArraySerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java new file mode 100644 index 0000000..4699552 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArray.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.CharBuffer; +import java.util.Arrays; +import java.util.Iterator; + +/** + * An array of {@link StringValue}. + * <p> + * Strings are serialized to a byte array. Concatenating arrays is as simple + * and fast as extending and copying byte arrays. Strings are serialized when + * individually added to {@code StringValueArray}. + * <p> + * For each string added to the array the length is first serialized using a + * variable length integer. Then the string characters are serialized using a + * variable length encoding where the lower 128 ASCII/UFT-8 characters are + * encoded in a single byte. This ensures that common characters are serialized + * in only two bytes. + */ +public class StringValueArray +implements ValueArray<StringValue> { + + protected static final int DEFAULT_CAPACITY_IN_BYTES = 4096; + + // see note in ArrayList, HashTable, ... + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + protected static final int HIGH_BIT = 0x1 << 7; + + private boolean isBounded; + + // the initial length of a bounded array, which is allowed to expand to + // store one additional element beyond this initial length + private int boundedLength; + + private byte[] data; + + // number of StringValue elements currently stored + private int length; + + // the number of bytes currently stored + private int position; + + // state for the bookmark used by mark() and reset() + private transient int markLength; + + private transient int markPosition; + + // hasher used to generate the normalized key + private Murmur3_32 hash = new Murmur3_32(0x19264330); + + // hash result stored as normalized key + private IntValue hashValue = new IntValue(); + + /** + * Initializes an expandable array with default capacity. + */ + public StringValueArray() { + isBounded = false; + initialize(DEFAULT_CAPACITY_IN_BYTES); + } + + /** + * Initializes a fixed-size array with the provided number of bytes. + * + * @param bytes number of bytes of the encapsulated array + */ + public StringValueArray(int bytes) { + isBounded = true; + boundedLength = bytes; + initialize(bytes); + } + + /** + * Initializes the array with the provided number of bytes. + * + * @param bytes initial size of the encapsulated array in bytes + */ + private void initialize(int bytes) { + Preconditions.checkArgument(bytes > 0, "Requested array with zero capacity"); + Preconditions.checkArgument(bytes <= MAX_ARRAY_SIZE, "Requested capacity exceeds limit of " + MAX_ARRAY_SIZE); + + data = new byte[bytes]; + } + + // -------------------------------------------------------------------------------------------- + + /** + * If the size of the array is insufficient to hold the given capacity then + * copy the array into a new, larger array. + * + * @param minCapacity minimum required number of elements + */ + private void ensureCapacity(int minCapacity) { + long currentCapacity = data.length; + + if (minCapacity <= currentCapacity) { + return; + } + + // increase capacity by at least ~50% + long expandedCapacity = Math.max(minCapacity, currentCapacity + (currentCapacity >> 1)); + int newCapacity = (int) Math.min(MAX_ARRAY_SIZE, expandedCapacity); + + if (newCapacity < minCapacity) { + // throw exception as unbounded arrays are not expected to fill + throw new RuntimeException("Requested array size " + minCapacity + " exceeds limit of " + MAX_ARRAY_SIZE); + } + + data = Arrays.copyOf(data, newCapacity); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("["); + String separator = ""; + + for (StringValue sv : this) { + sb + .append(sv.getValue()) + .append(separator); + separator = ","; + } + + sb.append("]"); + + return sb.toString(); + } + + // -------------------------------------------------------------------------------------------- + // Iterable + // -------------------------------------------------------------------------------------------- + + private final ReadIterator iterator = new ReadIterator(); + + @Override + public Iterator<StringValue> iterator() { + iterator.reset(); + return iterator; + } + + private class ReadIterator + implements Iterator<StringValue> { + private static final int DEFAULT_SIZE = 64; + + private StringValue value = new StringValue(CharBuffer.allocate(DEFAULT_SIZE)); + + private int size = DEFAULT_SIZE; + + private int pos; + + @Override + public boolean hasNext() { + return pos < position; + } + + @Override + public StringValue next() { + // read length + int len = data[pos++] & 0xFF; + + if (len >= HIGH_BIT) { + int shift = 7; + int curr; + len = len & 0x7F; + while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) { + len |= (curr & 0x7F) << shift; + shift += 7; + } + len |= curr << shift; + } + + // ensure capacity + if (len > size) { + while (size < len) { + size *= 2; + } + + value = new StringValue(CharBuffer.allocate(size)); + } + + // read string characters + final char[] valueData = value.getCharArray(); + + for (int i = 0; i < len; i++) { + int c = data[pos++] & 0xFF; + if (c >= HIGH_BIT) { + int shift = 7; + int curr; + c = c & 0x7F; + while ((curr = data[pos++] & 0xFF) >= HIGH_BIT) { + c |= (curr & 0x7F) << shift; + shift += 7; + } + c |= curr << shift; + } + valueData[i] = (char) c; + } + + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + public void reset() { + pos = 0; + } + } + + // -------------------------------------------------------------------------------------------- + // IOReadableWritable + // -------------------------------------------------------------------------------------------- + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(length); + out.writeInt(position); + + out.write(data, 0, position); + } + + @Override + public void read(DataInputView in) throws IOException { + length = in.readInt(); + position = in.readInt(); + + markLength = 0; + markPosition = 0; + + ensureCapacity(position); + + in.read(data, 0, position); + } + + // -------------------------------------------------------------------------------------------- + // NormalizableKey + // -------------------------------------------------------------------------------------------- + + @Override + public int getMaxNormalizedKeyLen() { + return hashValue.getMaxNormalizedKeyLen(); + } + + @Override + public void copyNormalizedKey(MemorySegment target, int offset, int len) { + hash.reset(); + + hash.hash(position); + for (int i = 0 ; i < position ; i++) { + hash.hash(data[i]); + } + + hashValue.setValue(hash.hash()); + hashValue.copyNormalizedKey(target, offset, len); + } + + // -------------------------------------------------------------------------------------------- + // Comparable + // -------------------------------------------------------------------------------------------- + + @Override + public int compareTo(ValueArray<StringValue> o) { + StringValueArray other = (StringValueArray) o; + + // sorts first on number of data in the array, then comparison between + // the first non-equal element in the arrays + int cmp = Integer.compare(position, other.position); + + if (cmp != 0) { + return cmp; + } + + for (int i = 0 ; i < position ; i++) { + cmp = Byte.compare(data[i], other.data[i]); + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + // -------------------------------------------------------------------------------------------- + // Key + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + int hash = 1; + + for (int i = 0 ; i < position ; i++) { + hash = 31 * hash + data[i]; + } + + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof StringValueArray) { + StringValueArray other = (StringValueArray) obj; + + if (length != other.length) { + return false; + } + + if (position != other.position) { + return false; + } + + for (int i = 0 ; i < position ; i++) { + if (data[i] != other.data[i]) { + return false; + } + } + + return true; + } + + return false; + } + + // -------------------------------------------------------------------------------------------- + // ResettableValue + // -------------------------------------------------------------------------------------------- + + @Override + public void setValue(ValueArray<StringValue> value) { + value.copyTo(this); + } + + // -------------------------------------------------------------------------------------------- + // CopyableValue + // -------------------------------------------------------------------------------------------- + + @Override + public int getBinaryLength() { + return -1; + } + + @Override + public void copyTo(ValueArray<StringValue> target) { + StringValueArray other = (StringValueArray) target; + + other.length = length; + other.position = position; + other.markLength = markLength; + other.markPosition = markPosition; + + other.ensureCapacity(position); + System.arraycopy(data, 0, other.data, 0, position); + } + + @Override + public ValueArray<StringValue> copy() { + ValueArray<StringValue> copy = new StringValueArray(); + + this.copyTo(copy); + + return copy; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + copyInternal(source, target); + } + + protected static void copyInternal(DataInputView source, DataOutputView target) throws IOException { + int length = source.readInt(); + target.writeInt(length); + + int position = source.readInt(); + target.writeInt(position); + + target.write(source, position); + } + + // -------------------------------------------------------------------------------------------- + // ValueArray + // -------------------------------------------------------------------------------------------- + + @Override + public int size() { + return length; + } + + @Override + public boolean isFull() { + if (isBounded) { + return position >= boundedLength; + } else { + return position == MAX_ARRAY_SIZE; + } + } + + @Override + public boolean add(StringValue value) { + if (isBounded && position >= boundedLength) { + return false; + } + + // up to five bytes storing length + if (position + 5 > data.length) { + ensureCapacity(position + 5); + } + + // update local variable until serialization succeeds + int newPosition = position; + + // write the length, variable-length encoded + int len = value.length(); + + while (len >= HIGH_BIT) { + data[newPosition++] = (byte) (len | HIGH_BIT); + len >>>= 7; + } + data[newPosition++] = (byte) len; + + // write the char data, variable-length encoded + final char[] valueData = value.getCharArray(); + int remainingCapacity = data.length - newPosition; + + len = value.length(); + for (int i = 0; i < len; i++) { + // up to three bytes storing length + if (remainingCapacity < 3) { + ensureCapacity(remainingCapacity + 3); + remainingCapacity = data.length - newPosition; + } + + int c = valueData[i]; + + while (c >= HIGH_BIT) { + data[newPosition++] = (byte) (c | HIGH_BIT); + remainingCapacity--; + c >>>= 7; + } + data[newPosition++] = (byte) c; + remainingCapacity--; + } + + length++; + position = newPosition; + + return true; + } + + @Override + public boolean addAll(ValueArray<StringValue> other) { + StringValueArray source = (StringValueArray) other; + + int sourceSize = source.position; + int newPosition = position + sourceSize; + + if (newPosition > data.length) { + if (isBounded) { + return false; + } else { + ensureCapacity(newPosition); + } + } + + System.arraycopy(source.data, 0, data, position, sourceSize); + length += source.length; + position = newPosition; + + return true; + } + + @Override + public void clear() { + length = 0; + position = 0; + } + + @Override + public void mark() { + markLength = length; + markPosition = position; + } + + @Override + public void reset() { + length = markLength; + position = markPosition; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java new file mode 100644 index 0000000..df88a8e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArrayComparator.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +import static org.apache.flink.graph.types.valuearray.StringValueArray.HIGH_BIT; + +/** + * Specialized comparator for StringValueArray based on CopyableValueComparator. + * + * This can be used for grouping keys but not for sorting keys. + */ +@Internal +public class StringValueArrayComparator extends TypeComparator<StringValueArray> { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final StringValueArray reference = new StringValueArray(); + + private final TypeComparator<?>[] comparators = new TypeComparator[] {this}; + + public StringValueArrayComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(StringValueArray record) { + return record.hashCode(); + } + + @Override + public void setReference(StringValueArray toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(StringValueArray candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator<StringValueArray> referencedComparator) { + int comp = ((StringValueArrayComparator) referencedComparator).reference.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(StringValueArray first, StringValueArray second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + /** + * Read the length of the next serialized {@code StringValue}. + * + * @param source the input view containing the record + * @return the length of the next serialized {@code StringValue} + * @throws IOException if the input view raised an exception when reading the length + */ + private static int readStringLength(DataInputView source) throws IOException { + int len = source.readByte() & 0xFF; + + if (len >= HIGH_BIT) { + int shift = 7; + int curr; + len = len & 0x7F; + while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) { + len |= (curr & 0x7F) << shift; + shift += 7; + } + len |= curr << shift; + } + + return len; + } + + /** + * Read the next character from the serialized {@code StringValue}. + * + * @param source the input view containing the record + * @return the next {@code char} of the current serialized {@code StringValue} + * @throws IOException if the input view raised an exception when reading the length + */ + private static char readStringChar(DataInputView source) throws IOException { + int c = source.readByte() & 0xFF; + + if (c >= HIGH_BIT) { + int shift = 7; + int curr; + c = c & 0x7F; + while ((curr = source.readByte() & 0xFF) >= HIGH_BIT) { + c |= (curr & 0x7F) << shift; + shift += 7; + } + c |= curr << shift; + } + + return (char) c; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + int firstCount = firstSource.readInt(); + int secondCount = secondSource.readInt(); + + int minCount = Math.min(firstCount, secondCount); + while (minCount-- > 0) { + int firstLength = readStringLength(firstSource); + int secondLength = readStringLength(secondSource); + + int minLength = Math.min(firstLength, secondLength); + while (minLength-- > 0) { + char firstChar = readStringChar(firstSource); + char secondChar = readStringChar(secondSource); + + int cmp = Character.compare(firstChar, secondChar); + if (cmp != 0) { + return ascendingComparison ? cmp : -cmp; + } + } + + int cmp = Integer.compare(firstLength, secondLength); + if (cmp != 0) { + return ascendingComparison ? cmp : -cmp; + } + } + + int cmp = Integer.compare(firstCount, secondCount); + return ascendingComparison ? cmp : -cmp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(StringValueArray.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(StringValueArray record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator<StringValueArray> duplicate() { + return new StringValueArrayComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator<?>[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(StringValueArray record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public StringValueArray readWithKeyDenormalization(StringValueArray reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java new file mode 100644 index 0000000..0e875e3 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Specialized serializer for {@code StringValueArray}. + */ +public final class StringValueArraySerializer extends TypeSerializerSingleton<StringValueArray> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public StringValueArray createInstance() { + return new StringValueArray(); + } + + @Override + public StringValueArray copy(StringValueArray from) { + return copy(from, new StringValueArray()); + } + + @Override + public StringValueArray copy(StringValueArray from, StringValueArray reuse) { + reuse.setValue(from); + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(StringValueArray record, DataOutputView target) throws IOException { + record.write(target); + } + + @Override + public StringValueArray deserialize(DataInputView source) throws IOException { + return deserialize(new StringValueArray(), source); + } + + @Override + public StringValueArray deserialize(StringValueArray reuse, DataInputView source) throws IOException { + reuse.read(source); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + StringValueArray.copyInternal(source, target); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof StringValueArraySerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/963f46e7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java new file mode 100644 index 0000000..6e34b71 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ValueArray.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.types.valuearray; + +import org.apache.flink.api.common.typeinfo.TypeInfo; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.types.ResettableValue; +import org.apache.flink.types.Value; + +import java.io.Serializable; + +/** + * Basic interface for array types which reuse objects during serialization. + * + * Value arrays are usable as grouping keys but not sorting keys. + * + * @param <T> the {@link Value} type + */ +@TypeInfo(ValueArrayTypeInfoFactory.class) +public interface ValueArray<T> +extends Iterable<T>, IOReadableWritable, Serializable, NormalizableKey<ValueArray<T>>, ResettableValue<ValueArray<T>>, CopyableValue<ValueArray<T>> { + + /** + * Returns the number of elements stored in the array. + * + * @return the number of elements stored in the array + */ + int size(); + + /** + * An bounded array fills when the allocated capacity has been fully used. + * An unbounded array will only fill when the underlying data structure has + * reached capacity, for example the ~2^31 element limit for Java arrays. + * + * @return whether the array is full + */ + boolean isFull(); + + /** + * Appends the value to this array if and only if the array capacity would + * not be exceeded. + * + * @param value the value to add to this array + * @return whether the value was added to the array + */ + boolean add(T value); + + /** + * Appends all of the values in the specified array to the end of this + * array. If the combined array would exceed capacity then no values are + * appended. + * + * @param source array containing values to be added to this array + * @return whether the values were added to the array + */ + boolean addAll(ValueArray<T> source); + + /** + * Saves the array index, which can be restored by calling {@code reset()}. + * + * This is not serialized and is not part of the contract for + * {@link #equals(Object)}. + */ + void mark(); + + /** + * Restores the array index to when {@code mark()} was last called. + */ + void reset(); + + /** + * Resets the array to the empty state. The implementation is *not* + * expected to release the underlying data structure. This allows the array + * to be reused with minimal impact on the garbage collector. + * + * This may reset the {@link #mark()} in order to allow arrays be shrunk. + */ + void clear(); +}
