[FLINK-5187] [core] Port Row and related type utils to Java and move them to flink-core.
This closes #2968. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f8a255 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f8a255 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f8a255 Branch: refs/heads/master Commit: 86f8a255db6ce2ff9e09c2824e85c4930427ecdb Parents: 15e7f0a Author: Jark Wu <[email protected]> Authored: Thu Dec 8 22:44:29 2016 +0800 Committer: Fabian Hueske <[email protected]> Committed: Thu Dec 15 11:36:40 2016 +0100 ---------------------------------------------------------------------- docs/dev/types_serialization.md | 6 +- .../flink/api/java/typeutils/RowTypeInfo.java | 203 ++++++++ .../typeutils/runtime/NullAwareComparator.java | 240 +++++++++ .../java/typeutils/runtime/NullMaskUtils.java | 105 ++++ .../java/typeutils/runtime/RowComparator.java | 488 +++++++++++++++++++ .../java/typeutils/runtime/RowSerializer.java | 243 +++++++++ .../main/java/org/apache/flink/types/Row.java | 116 +++++ .../api/java/typeutils/RowTypeInfoTest.java | 69 +++ .../typeutils/runtime/RowComparatorTest.java | 156 ++++++ .../RowComparatorWithManyFieldsTests.java | 103 ++++ .../typeutils/runtime/RowSerializerTest.java | 197 ++++++++ .../java/org/apache/flink/types/RowTest.java | 37 ++ 12 files changed, 1961 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/docs/dev/types_serialization.md ---------------------------------------------------------------------- diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md index ea02df0..2b43563 100644 --- a/docs/dev/types_serialization.md +++ b/docs/dev/types_serialization.md @@ -87,9 +87,11 @@ Internally, Flink makes the following distinctions between types: * Composite types - * Flink Java Tuples (part of the Flink Java API) + * Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported - * Scala *case classes* (including Scala tuples) + * Scala *case classes* (including Scala tuples): max 22 fields, null fields not supported + + * Row: tuples with arbitrary number of fields and support for null fields * POJOs: classes that follow a certain bean-like pattern http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java new file mode 100644 index 0000000..03cbe61 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.RowComparator; +import org.apache.flink.api.java.typeutils.runtime.RowSerializer; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * TypeInformation for {@link Row} + */ +@PublicEvolving +public class RowTypeInfo extends TupleTypeInfoBase<Row> { + + private static final long serialVersionUID = 9158518989896601963L; + + protected final String[] fieldNames; + /** Temporary variable for directly passing orders to comparators. */ + private boolean[] comparatorOrders = null; + + public RowTypeInfo(TypeInformation<?>... types) { + super(Row.class, types); + + this.fieldNames = new String[types.length]; + + for (int i = 0; i < types.length; i++) { + fieldNames[i] = "f" + i; + } + } + + @Override + public TypeComparator<Row> createComparator( + int[] logicalKeyFields, + boolean[] orders, + int logicalFieldOffset, + ExecutionConfig config) { + + comparatorOrders = orders; + TypeComparator<Row> comparator = super.createComparator( + logicalKeyFields, + orders, + logicalFieldOffset, + config); + comparatorOrders = null; + return comparator; + } + + @Override + protected TypeComparatorBuilder<Row> createTypeComparatorBuilder() { + if (comparatorOrders == null) { + throw new IllegalStateException("Cannot create comparator builder without orders."); + } + return new RowTypeComparatorBuilder(comparatorOrders); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public int getFieldIndex(String fieldName) { + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(fieldName)) { + return i; + } + } + return -1; + } + + @Override + public TypeSerializer<Row> createSerializer(ExecutionConfig config) { + int len = getArity(); + TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len]; + for (int i = 0; i < len; i++) { + fieldSerializers[i] = types[i].createSerializer(config); + } + return new RowSerializer(fieldSerializers); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof RowTypeInfo; + } + + @Override + public int hashCode() { + return 31 * super.hashCode() + Arrays.hashCode(fieldNames); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder("Row"); + if (types.length > 0) { + bld.append('(').append(fieldNames[0]).append(": ").append(types[0]); + + for (int i = 1; i < types.length; i++) { + bld.append(", ").append(fieldNames[i]).append(": ").append(types[i]); + } + + bld.append(')'); + } + return bld.toString(); + } + + private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> { + + private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>(); + private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>(); + private final boolean[] comparatorOrders; + + public RowTypeComparatorBuilder(boolean[] comparatorOrders) { + this.comparatorOrders = comparatorOrders; + } + + @Override + public void initializeTypeComparatorBuilder(int size) { + fieldComparators.ensureCapacity(size); + logicalKeyFields.ensureCapacity(size); + } + + @Override + public void addComparatorField(int fieldId, TypeComparator<?> comparator) { + fieldComparators.add(comparator); + logicalKeyFields.add(fieldId); + } + + @Override + public TypeComparator<Row> createTypeComparator(ExecutionConfig config) { + checkState( + fieldComparators.size() > 0, + "No field comparators were defined for the TupleTypeComparatorBuilder." + ); + + checkState( + logicalKeyFields.size() > 0, + "No key fields were defined for the TupleTypeComparatorBuilder." + ); + + checkState( + fieldComparators.size() == logicalKeyFields.size(), + "The number of field comparators and key fields is not equal." + ); + + final int maxKey = Collections.max(logicalKeyFields); + + checkState( + maxKey >= 0, + "The maximum key field must be greater or equal than 0." + ); + + TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1]; + + for (int i = 0; i <= maxKey; i++) { + fieldSerializers[i] = types[i].createSerializer(config); + } + + int[] keyPositions = new int[logicalKeyFields.size()]; + for (int i = 0; i < keyPositions.length; i++) { + keyPositions[i] = logicalKeyFields.get(i); + } + + TypeComparator[] comparators = new TypeComparator[fieldComparators.size()]; + for (int i = 0; i < fieldComparators.size(); i++) { + comparators[i] = fieldComparators.get(i); + } + + //noinspection unchecked + return new RowComparator( + getArity(), + keyPositions, + comparators, + (TypeSerializer<Object>[]) fieldSerializers, + comparatorOrders); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java new file mode 100644 index 0000000..3587811 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Null-aware comparator that wraps a comparator which does not support null references. + * <p> + * NOTE: This class assumes to be used within a composite type comparator (such + * as {@link RowComparator}) that handles serialized comparison. + */ +public class NullAwareComparator<T> extends TypeComparator<T> { + private static final long serialVersionUID = 1L; + + private final TypeComparator<T> wrappedComparator; + private final boolean order; + + // number of flat fields + private final int flatFields; + + // stores the null for reference comparison + private boolean nullReference = false; + + public NullAwareComparator(TypeComparator<T> wrappedComparator, boolean order) { + this.wrappedComparator = wrappedComparator; + this.order = order; + this.flatFields = wrappedComparator.getFlatComparators().length; + } + + @Override + public int hash(T record) { + if (record != null) { + return wrappedComparator.hash(record); + } else { + return 0; + } + } + + @Override + public void setReference(T toCompare) { + if (toCompare == null) { + nullReference = true; + } else { + nullReference = false; + wrappedComparator.setReference(toCompare); + } + } + + @Override + public boolean equalToReference(T candidate) { + // both values are null + if (candidate == null && nullReference) { + return true; + } + // one value is null + else if (candidate == null || nullReference) { + return false; + } + // no null value + else { + return wrappedComparator.equalToReference(candidate); + } + } + + @Override + public int compareToReference(TypeComparator<T> referencedComparator) { + NullAwareComparator otherComparator = (NullAwareComparator) referencedComparator; + boolean otherNullReference = otherComparator.nullReference; + // both values are null -> equality + if (nullReference && otherNullReference) { + return 0; + } + // first value is null -> inequality + // but order is considered + else if (nullReference) { + return order ? 1 : -1; + } + // second value is null -> inequality + // but order is considered + else if (otherNullReference) { + return order ? -1 : 1; + } + // no null values + else { + return wrappedComparator.compareToReference(otherComparator.wrappedComparator); + } + } + + @Override + public int compare(T first, T second) { + // both values are null -> equality + if (first == null && second == null) { + return 0; + } + // first value is null -> inequality + // but order is considered + else if (first == null) { + return order ? -1 : 1; + } + // second value is null -> inequality + // but order is considered + else if (second == null) { + return order ? 1 : -1; + } + // no null values + else { + return wrappedComparator.compare(first, second); + } + } + + @Override + public int compareSerialized( + DataInputView firstSource, + DataInputView secondSource) throws IOException { + + throw new UnsupportedOperationException( + "Comparator does not support null-aware serialized comparision."); + } + + @Override + public boolean supportsNormalizedKey() { + return wrappedComparator.supportsNormalizedKey(); + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + int len = wrappedComparator.getNormalizeKeyLen(); + if (len == Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + // add one for a null byte + return len + 1; + } + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1); + } + + @Override + public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) { + if (numBytes > 0) { + // write a null byte with padding + if (record == null) { + target.putBoolean(offset, false); + // write padding + for (int j = 0; j < numBytes - 1; j++) { + target.put(offset + 1 + j, (byte) 0); + } + } + // write a non-null byte with key + else { + target.putBoolean(offset, true); + // write key + wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1); + } + } + } + + @Override + public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException( + "Record serialization with leading normalized keys not supported."); + } + + @Override + public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException( + "Record deserialization with leading normalized keys not supported."); + } + + @Override + public boolean invertNormalizedKey() { + return wrappedComparator.invertNormalizedKey(); + } + + @Override + public TypeComparator<T> duplicate() { + return new NullAwareComparator<T>(wrappedComparator.duplicate(), order); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + if (record == null) { + for (int i = 0; i < flatFields; i++) { + target[index + i] = null; + } + return flatFields; + } else { + return wrappedComparator.extractKeys(record, target, index); + } + } + + @Override + public TypeComparator[] getFlatComparators() { + // determine the flat comparators and wrap them again in null-aware comparators + List<TypeComparator<?>> flatComparators = new ArrayList<>(); + if (wrappedComparator instanceof CompositeTypeComparator) { + ((CompositeTypeComparator) wrappedComparator).getFlatComparator(flatComparators); + } else { + flatComparators.add(wrappedComparator); + } + + TypeComparator<?>[] result = new TypeComparator[flatComparators.size()]; + for (int i = 0; i < result.length; i++) { + result[i] = new NullAwareComparator<>(flatComparators.get(i), order); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java new file mode 100644 index 0000000..010af7f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Row; + +import java.io.IOException; + +public class NullMaskUtils { + + public static void writeNullMask(int len, Row value, DataOutputView target) throws IOException { + int b = 0x00; + int bytePos = 0; + + int fieldPos = 0; + int numPos = 0; + while (fieldPos < len) { + b = 0x00; + // set bits in byte + bytePos = 0; + numPos = Math.min(8, len - fieldPos); + while (bytePos < numPos) { + b = b << 1; + // set bit if field is null + if (value.getField(fieldPos + bytePos) == null) { + b |= 0x01; + } + bytePos += 1; + } + fieldPos += numPos; + // shift bits if last byte is not completely filled + b <<= (8 - bytePos); + // write byte + target.writeByte(b); + } + } + + public static void readIntoNullMask( + int len, + DataInputView source, + boolean[] nullMask) throws IOException { + + int b = 0x00; + int bytePos = 0; + + int fieldPos = 0; + int numPos = 0; + while (fieldPos < len) { + // read byte + b = source.readUnsignedByte(); + bytePos = 0; + numPos = Math.min(8, len - fieldPos); + while (bytePos < numPos) { + nullMask[fieldPos + bytePos] = (b & 0x80) > 0; + b = b << 1; + bytePos += 1; + } + fieldPos += numPos; + } + } + + public static void readIntoAndCopyNullMask( + int len, + DataInputView source, + DataOutputView target, + boolean[] nullMask) throws IOException { + + int b = 0x00; + int bytePos = 0; + + int fieldPos = 0; + int numPos = 0; + while (fieldPos < len) { + // read byte + b = source.readUnsignedByte(); + // copy byte + target.writeByte(b); + bytePos = 0; + numPos = Math.min(8, len - fieldPos); + while (bytePos < numPos) { + nullMask[fieldPos + bytePos] = (b & 0x80) > 0; + b = b << 1; + bytePos += 1; + } + fieldPos += numPos; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java new file mode 100644 index 0000000..d6c5195 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Comparator for {@link Row} + */ +public class RowComparator extends CompositeTypeComparator<Row> { + + private static final long serialVersionUID = 1L; + /** The number of fields of the Row */ + private final int arity; + /** key positions describe which fields are keys in what order */ + private final int[] keyPositions; + /** null-aware comparators for the key fields, in the same order as the key fields */ + private final NullAwareComparator<Object>[] comparators; + /** serializers to deserialize the first n fields for comparison */ + private final TypeSerializer<Object>[] serializers; + /** auxiliary fields for normalized key support */ + private final int[] normalizedKeyLengths; + private final int numLeadingNormalizableKeys; + private final int normalizableKeyPrefixLen; + private final boolean invertNormKey; + + // null masks for serialized comparison + private final boolean[] nullMask1; + private final boolean[] nullMask2; + + // cache for the deserialized key field objects + transient private final Object[] deserializedKeyFields1; + transient private final Object[] deserializedKeyFields2; + + /** + * General constructor for RowComparator. + * + * @param arity the number of fields of the Row + * @param keyPositions key positions describe which fields are keys in what order + * @param comparators non-null-aware comparators for the key fields, in the same order as + * the key fields + * @param serializers serializers to deserialize the first n fields for comparison + * @param orders sorting orders for the fields + */ + public RowComparator( + int arity, + int[] keyPositions, + TypeComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + boolean[] orders) { + + this(arity, keyPositions, makeNullAware(comparators, orders), serializers); + } + + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers) { + + this( + arity, + keyPositions, + comparators, + serializers, + createAuxiliaryFields(keyPositions, comparators)); + } + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) { + + this( + arity, + keyPositions, + comparators, + serializers, + auxiliaryFields.f0, + auxiliaryFields.f1, + auxiliaryFields.f2, + auxiliaryFields.f3); + } + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + int[] normalizedKeyLengths, + int numLeadingNormalizableKeys, + int normalizableKeyPrefixLen, + boolean invertNormKey) { + + this.arity = arity; + this.keyPositions = keyPositions; + this.comparators = comparators; + this.serializers = serializers; + this.normalizedKeyLengths = normalizedKeyLengths; + this.numLeadingNormalizableKeys = numLeadingNormalizableKeys; + this.normalizableKeyPrefixLen = normalizableKeyPrefixLen; + this.invertNormKey = invertNormKey; + this.nullMask1 = new boolean[arity]; + this.nullMask2 = new boolean[arity]; + deserializedKeyFields1 = instantiateDeserializationFields(); + deserializedKeyFields2 = instantiateDeserializationFields(); + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void getFlatComparator(List<TypeComparator> flatComparators) { + for (NullAwareComparator<Object> c : comparators) { + Collections.addAll(flatComparators, c.getFlatComparators()); + } + } + + @Override + public int hash(Row record) { + int code = 0; + int i = 0; + + try { + for (; i < keyPositions.length; i++) { + code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; + Object element = record.getField(keyPositions[i]); // element can be null + code += comparators[i].hash(element); + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + + return code; + } + + @Override + public void setReference(Row toCompare) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + TypeComparator<Object> comparator = comparators[i]; + Object element = toCompare.getField(keyPositions[i]); + comparator.setReference(element); // element can be null + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @Override + public boolean equalToReference(Row candidate) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + TypeComparator<Object> comparator = comparators[i]; + Object element = candidate.getField(keyPositions[i]); // element can be null + // check if reference is not equal + if (!comparator.equalToReference(element)) { + return false; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return true; + } + + @Override + public int compareToReference(TypeComparator<Row> referencedComparator) { + RowComparator other = (RowComparator) referencedComparator; + int i = 0; + try { + for (; i < keyPositions.length; i++) { + int cmp = comparators[i].compareToReference(other.comparators[i]); + if (cmp != 0) { + return cmp; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return 0; + } + + @Override + public int compare(Row first, Row second) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + int keyPos = keyPositions[i]; + TypeComparator<Object> comparator = comparators[i]; + Object firstElement = first.getField(keyPos); // element can be null + Object secondElement = second.getField(keyPos); // element can be null + + int cmp = comparator.compare(firstElement, secondElement); + if (cmp != 0) { + return cmp; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return 0; + } + + @Override + public int compareSerialized( + DataInputView firstSource, + DataInputView secondSource) throws IOException { + + int len = serializers.length; + int keyLen = keyPositions.length; + + readIntoNullMask(arity, firstSource, nullMask1); + readIntoNullMask(arity, secondSource, nullMask2); + + // deserialize + for (int i = 0; i < len; i++) { + TypeSerializer<Object> serializer = serializers[i]; + + // deserialize field 1 + if (!nullMask1[i]) { + deserializedKeyFields1[i] = serializer.deserialize( + deserializedKeyFields1[i], + firstSource); + } + + // deserialize field 2 + if (!nullMask2[i]) { + deserializedKeyFields2[i] = serializer.deserialize( + deserializedKeyFields2[i], + secondSource); + } + } + + // compare + for (int i = 0; i < keyLen; i++) { + int keyPos = keyPositions[i]; + TypeComparator<Object> comparator = comparators[i]; + + boolean isNull1 = nullMask1[keyPos]; + boolean isNull2 = nullMask2[keyPos]; + + int cmp = 0; + // both values are null -> equality + if (isNull1 && isNull2) { + cmp = 0; + } + // first value is null -> inequality + else if (isNull1) { + cmp = comparator.compare(null, deserializedKeyFields2[keyPos]); + } + // second value is null -> inequality + else if (isNull2) { + cmp = comparator.compare(deserializedKeyFields1[keyPos], null); + } + // no null values + else { + cmp = comparator.compare( + deserializedKeyFields1[keyPos], + deserializedKeyFields2[keyPos]); + } + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + @Override + public boolean supportsNormalizedKey() { + return numLeadingNormalizableKeys > 0; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return normalizableKeyPrefixLen; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return numLeadingNormalizableKeys < keyPositions.length || + normalizableKeyPrefixLen == Integer.MAX_VALUE || + normalizableKeyPrefixLen > keyBytes; + } + + @Override + public void putNormalizedKey(Row record, MemorySegment target, int offset, int numBytes) { + int bytesLeft = numBytes; + int currentOffset = offset; + + for (int i = 0; i < numLeadingNormalizableKeys && bytesLeft > 0; i++) { + int len = normalizedKeyLengths[i]; + len = bytesLeft >= len ? len : bytesLeft; + + TypeComparator<Object> comparator = comparators[i]; + Object element = record.getField(keyPositions[i]); // element can be null + // write key + comparator.putNormalizedKey(element, target, currentOffset, len); + + bytesLeft -= len; + currentOffset += len; + } + + } + + @Override + public void writeWithKeyNormalization(Row record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException( + "Record serialization with leading normalized keys not supported."); + } + + @Override + public Row readWithKeyDenormalization(Row reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException( + "Record deserialization with leading normalized keys not supported."); + } + + @Override + public boolean invertNormalizedKey() { + return invertNormKey; + } + + @Override + public TypeComparator<Row> duplicate() { + NullAwareComparator<?>[] comparatorsCopy = new NullAwareComparator<?>[comparators.length]; + for (int i = 0; i < comparators.length; i++) { + comparatorsCopy[i] = (NullAwareComparator<?>) comparators[i].duplicate(); + } + + TypeSerializer<?>[] serializersCopy = new TypeSerializer<?>[serializers.length]; + for (int i = 0; i < serializers.length; i++) { + serializersCopy[i] = serializers[i].duplicate(); + } + + return new RowComparator( + arity, + keyPositions, + (NullAwareComparator<Object>[]) comparatorsCopy, + (TypeSerializer<Object>[]) serializersCopy, + normalizedKeyLengths, + numLeadingNormalizableKeys, + normalizableKeyPrefixLen, + invertNormKey); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + int len = comparators.length; + int localIndex = index; + for (int i = 0; i < len; i++) { + Object element = ((Row) record).getField(keyPositions[i]); // element can be null + localIndex += comparators[i].extractKeys(element, target, localIndex); + } + return localIndex - index; + } + + + private Object[] instantiateDeserializationFields() { + Object[] newFields = new Object[serializers.length]; + for (int i = 0; i < serializers.length; i++) { + newFields[i] = serializers[i].createInstance(); + } + return newFields; + } + + /** + * @return creates auxiliary fields for normalized key support + */ + private static Tuple4<int[], Integer, Integer, Boolean> + createAuxiliaryFields(int[] keyPositions, NullAwareComparator<Object>[] comparators) { + + int[] normalizedKeyLengths = new int[keyPositions.length]; + int numLeadingNormalizableKeys = 0; + int normalizableKeyPrefixLen = 0; + boolean inverted = false; + + for (int i = 0; i < keyPositions.length; i++) { + NullAwareComparator<Object> k = comparators[i]; + // as long as the leading keys support normalized keys, we can build up the composite key + if (k.supportsNormalizedKey()) { + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + inverted = k.invertNormalizedKey(); + } else if (k.invertNormalizedKey() != inverted) { + // if a successor does not agree on the inversion direction, it cannot be part of the + // normalized key + return new Tuple4<>( + normalizedKeyLengths, + numLeadingNormalizableKeys, + normalizableKeyPrefixLen, + inverted); + } + numLeadingNormalizableKeys++; + int len = k.getNormalizeKeyLen(); + if (len < 0) { + throw new RuntimeException( + "Comparator " + k.getClass().getName() + + " specifies an invalid length for the normalized key: " + len); + } + normalizedKeyLengths[i] = len; + normalizableKeyPrefixLen += len; + if (normalizableKeyPrefixLen < 0) { + // overflow, which means we are out of budget for normalized key space anyways + return new Tuple4<>( + normalizedKeyLengths, + numLeadingNormalizableKeys, + Integer.MAX_VALUE, + inverted); + } + } else { + return new Tuple4<>( + normalizedKeyLengths, + numLeadingNormalizableKeys, + normalizableKeyPrefixLen, + inverted); + } + } + return new Tuple4<>( + normalizedKeyLengths, + numLeadingNormalizableKeys, + normalizableKeyPrefixLen, + inverted); + } + + private static NullAwareComparator<Object>[] makeNullAware( + TypeComparator<Object>[] comparators, + boolean[] orders) { + + checkArgument(comparators.length == orders.length); + NullAwareComparator<?>[] result = new NullAwareComparator<?>[comparators.length]; + for (int i = 0; i < comparators.length; i++) { + result[i] = new NullAwareComparator<Object>(comparators[i], orders[i]); + } + return (NullAwareComparator<Object>[]) result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java new file mode 100644 index 0000000..5457c05 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask; +import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; +import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Serializer for {@link Row}. + */ +public class RowSerializer extends TypeSerializer<Row> { + + private static final long serialVersionUID = 1L; + private final boolean[] nullMask; + private final TypeSerializer<Object>[] fieldSerializers; + + public RowSerializer(TypeSerializer<?>[] fieldSerializers) { + this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); + this.nullMask = new boolean[fieldSerializers.length]; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<Row> duplicate() { + boolean stateful = false; + TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new RowSerializer(duplicateFieldSerializers); + } else { + return this; + } + } + + @Override + public Row createInstance() { + return new Row(fieldSerializers.length); + } + + @Override + public Row copy(Row from) { + int len = fieldSerializers.length; + + if (from.getArity() != len) { + throw new RuntimeException("Row arity of from does not match serializers."); + } + + Row result = new Row(len); + for (int i = 0; i < len; i++) { + Object fromField = from.getField(i); + if (fromField != null) { + Object copy = fieldSerializers[i].copy(fromField); + result.setField(i, copy); + } else { + result.setField(i, null); + } + } + return result; + } + + @Override + public Row copy(Row from, Row reuse) { + int len = fieldSerializers.length; + + // cannot reuse, do a non-reuse copy + if (reuse == null) { + return copy(from); + } + + if (from.getArity() != len || reuse.getArity() != len) { + throw new RuntimeException( + "Row arity of reuse or from is incompatible with this RowSerializer."); + } + + for (int i = 0; i < len; i++) { + Object fromField = from.getField(i); + if (fromField != null) { + Object reuseField = reuse.getField(i); + if (reuseField != null) { + Object copy = fieldSerializers[i].copy(fromField, reuseField); + reuse.setField(i, copy); + } else { + Object copy = fieldSerializers[i].copy(fromField); + reuse.setField(i, copy); + } + } else { + reuse.setField(i, null); + } + } + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Row record, DataOutputView target) throws IOException { + int len = fieldSerializers.length; + + if (record.getArity() != len) { + throw new RuntimeException("Row arity of from does not match serializers."); + } + + // write a null mask + writeNullMask(len, record, target); + + // serialize non-null fields + for (int i = 0; i < len; i++) { + Object o = record.getField(i); + if (o != null) { + fieldSerializers[i].serialize(o, target); + } + } + } + + + @Override + public Row deserialize(DataInputView source) throws IOException { + int len = fieldSerializers.length; + + Row result = new Row(len); + + // read null mask + readIntoNullMask(len, source, nullMask); + + for (int i = 0; i < len; i++) { + if (nullMask[i]) { + result.setField(i, null); + } else { + result.setField(i, fieldSerializers[i].deserialize(source)); + } + } + + return result; + } + + @Override + public Row deserialize(Row reuse, DataInputView source) throws IOException { + int len = fieldSerializers.length; + + if (reuse.getArity() != len) { + throw new RuntimeException("Row arity of from does not match serializers."); + } + + // read null mask + readIntoNullMask(len, source, nullMask); + + for (int i = 0; i < len; i++) { + if (nullMask[i]) { + reuse.setField(i, null); + } else { + Object reuseField = reuse.getField(i); + if (reuseField != null) { + reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source)); + } else { + reuse.setField(i, fieldSerializers[i].deserialize(source)); + } + } + } + + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int len = fieldSerializers.length; + + // copy null mask + readIntoAndCopyNullMask(len, source, target, nullMask); + + for (int i = 0; i < len; i++) { + if (!nullMask[i]) { + fieldSerializers[i].copy(source, target); + } + } + } + + @Override + public boolean equals(Object obj) { + if (canEqual(obj)) { + RowSerializer other = (RowSerializer) obj; + if (this.fieldSerializers.length == other.fieldSerializers.length) { + for (int i = 0; i < this.fieldSerializers.length; i++) { + if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) { + return false; + } + } + return true; + } + } + + return false; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof RowSerializer; + } + + @Override + public int hashCode() { + return Arrays.hashCode(fieldSerializers); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/types/Row.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java new file mode 100644 index 0000000..6825b71 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/Row.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.types; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.util.StringUtils; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * A Row can have arbitrary number of fields and contain a set of fields, which may all be + * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's + * type extraction mechanism can't extract correct field types. So that users should manually + * tell Flink the type information via creating a {@link RowTypeInfo}. + * + * <p> + * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can + * set fields by {@link #setField(int, Object)}. + * <p> + * Row is in principle serializable. However, it may contain non-serializable fields, + * in which case serialization will fail. + * + */ +@PublicEvolving +public class Row implements Serializable{ + + private static final long serialVersionUID = 1L; + + /** The array to store actual values. */ + private final Object[] fields; + + /** + * Create a new Row instance. + * @param arity The number of fields in the Row + */ + public Row(int arity) { + this.fields = new Object[arity]; + } + + /** + * Get the number of fields in the Row. + * @return The number of fields in the Row. + */ + public int getArity() { + return fields.length; + } + + /** + * Gets the field at the specified position. + * @param pos The position of the field, 0-based. + * @return The field at the specified position. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. + */ + public Object getField(int pos) { + return fields[pos]; + } + + /** + * Sets the field at the specified position. + * + * @param pos The position of the field, 0-based. + * @param value The value to be assigned to the field at the specified position. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. + */ + public void setField(int pos, Object value) { + fields[pos] = value; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fields.length; i++) { + if (i > 0) { + sb.append(","); + } + sb.append(StringUtils.arrayAwareToString(fields[i])); + } + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Row row = (Row) o; + + return Arrays.equals(fields, row.fields); + } + + @Override + public int hashCode() { + return Arrays.hashCode(fields); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java new file mode 100644 index 0000000..8de7bf7 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class RowTypeInfoTest { + + @Test + public void testRowTypeInfoEquality() { + RowTypeInfo typeInfo1 = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowTypeInfo typeInfo2 = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + assertEquals(typeInfo1, typeInfo2); + assertEquals(typeInfo1.hashCode(), typeInfo2.hashCode()); + } + + @Test + public void testRowTypeInfoInequality() { + RowTypeInfo typeInfo1 = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + RowTypeInfo typeInfo2 = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO); + + assertNotEquals(typeInfo1, typeInfo2); + assertNotEquals(typeInfo1.hashCode(), typeInfo2.hashCode()); + } + + @Test + public void testNestedRowTypeInfo() { + RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + new RowTypeInfo( + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BIG_DEC_TYPE_INFO + ), + BasicTypeInfo.STRING_TYPE_INFO); + + assertEquals("Row(f0: Short, f1: BigDecimal)", typeInfo.getTypeAt("f1").toString()); + assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java new file mode 100644 index 0000000..ca54bd4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.types.Row; +import org.junit.BeforeClass; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + +public class RowComparatorTest extends ComparatorTestBase<Row> { + + private static final RowTypeInfo typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO + ), + TypeExtractor.createTypeInfo(MyPojo.class)); + + private static MyPojo testPojo1 = new MyPojo(); + private static MyPojo testPojo2 = new MyPojo(); + private static MyPojo testPojo3 = new MyPojo(); + + private static final Row[] data = new Row[]{ + createRow(null, null, null, null, null), + createRow(0, null, null, null, null), + createRow(0, 0.0, null, null, null), + createRow(0, 0.0, "a", null, null), + createRow(1, 0.0, "a", null, null), + createRow(1, 1.0, "a", null, null), + createRow(1, 1.0, "b", null, null), + createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3) + }; + + @BeforeClass + public static void init() { + // TODO we cannot test null here as PojoComparator has no support for null keys + testPojo1.name = ""; + testPojo2.name = "Test1"; + testPojo3.name = "Test2"; + } + + @Override + protected void deepEquals(String message, Row should, Row is) { + int arity = should.getArity(); + assertEquals(message, arity, is.getArity()); + for (int i = 0; i < arity; i++) { + Object copiedValue = should.getField(i); + Object element = is.getField(i); + assertEquals(message, element, copiedValue); + } + } + + @Override + protected TypeComparator<Row> createComparator(boolean ascending) { + return typeInfo.createComparator( + new int[] {0, 1, 2, 3, 4, 5, 6}, + new boolean[] {ascending, ascending, ascending, ascending, ascending, ascending, ascending}, + 0, + new ExecutionConfig()); + } + + @Override + protected TypeSerializer<Row> createSerializer() { + return typeInfo.createSerializer(new ExecutionConfig()); + } + + @Override + protected Row[] getSortedTestData() { + return data; + } + + @Override + protected boolean supportsNullKeys() { + return true; + } + + private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) { + Row row = new Row(5); + row.setField(0, f0); + row.setField(1, f1); + row.setField(2, f2); + row.setField(3, f3); + row.setField(4, f4); + return row; + } + + public static class MyPojo implements Serializable, Comparable<MyPojo> { + // we cannot use null because the PojoComparator does not support null properly + public String name = ""; + + @Override + public int compareTo(MyPojo o) { + if (name == null && o.name == null) { + return 0; + } else if (name == null) { + return -1; + } else if (o.name == null) { + return 1; + } else { + return name.compareTo(o.name); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MyPojo myPojo = (MyPojo) o; + + return name != null ? name.equals(myPojo.name) : myPojo.name == null; + + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java new file mode 100644 index 0000000..d0fdbd6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.RowComparator; +import org.apache.flink.types.Row; +import org.junit.BeforeClass; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link RowComparator} for wide rows. + */ +public class RowComparatorWithManyFieldsTests extends ComparatorTestBase<Row> { + + private static final int numberOfFields = 10; + private static RowTypeInfo typeInfo; + private static final Row[] data = new Row[]{ + createRow(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0"), + createRow("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1"), + createRow("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2"), + createRow("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3") + }; + + @BeforeClass + public static void setUp() throws Exception { + TypeInformation<?>[] fieldTypes = new TypeInformation[numberOfFields]; + for (int i = 0; i < numberOfFields; i++) { + fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO; + } + typeInfo = new RowTypeInfo(fieldTypes); + + } + + @Override + protected void deepEquals(String message, Row should, Row is) { + int arity = should.getArity(); + assertEquals(message, arity, is.getArity()); + for (int i = 0; i < arity; i++) { + Object copiedValue = should.getField(i); + Object element = is.getField(i); + assertEquals(message, element, copiedValue); + } + } + + @Override + protected TypeComparator<Row> createComparator(boolean ascending) { + return typeInfo.createComparator( + new int[]{0}, + new boolean[]{ascending}, + 0, + new ExecutionConfig()); + } + + @Override + protected TypeSerializer<Row> createSerializer() { + return typeInfo.createSerializer(new ExecutionConfig()); + } + + @Override + protected Row[] getSortedTestData() { + return data; + } + + @Override + protected boolean supportsNullKeys() { + return true; + } + + private static Row createRow(Object... values) { + checkNotNull(values); + checkArgument(values.length == numberOfFields); + Row row = new Row(numberOfFields); + for (int i = 0; i < values.length; i++) { + row.setField(i, values[i]); + } + return row; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java new file mode 100644 index 0000000..d08d68a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertEquals; + +public class RowSerializerTest { + + @Test + public void testRowSerializer() { + TypeInformation<Row> typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + Row row1 = new Row(2); + row1.setField(0, 1); + row1.setField(1, "a"); + + Row row2 = new Row(2); + row2.setField(0, 2); + row2.setField(1, null); + + TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig()); + RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2); + instance.testAll(); + } + + @Test + public void testLargeRowSerializer() { + TypeInformation<Row> typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + Row row = new Row(13); + row.setField(0, 2); + row.setField(1, null); + row.setField(3, null); + row.setField(4, null); + row.setField(5, null); + row.setField(6, null); + row.setField(7, null); + row.setField(8, null); + row.setField(9, null); + row.setField(10, null); + row.setField(11, null); + row.setField(12, "Test"); + + TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig()); + RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row); + testInstance.testAll(); + } + + @Test + public void testRowSerializerWithComplexTypes() { + TypeInformation<Row> typeInfo = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO), + TypeExtractor.createTypeInfo(MyPojo.class)); + + MyPojo testPojo1 = new MyPojo(); + testPojo1.name = null; + MyPojo testPojo2 = new MyPojo(); + testPojo2.name = "Test1"; + MyPojo testPojo3 = new MyPojo(); + testPojo3.name = "Test2"; + + Row[] data = new Row[]{ + createRow(null, null, null, null, null), + createRow(0, null, null, null, null), + createRow(0, 0.0, null, null, null), + createRow(0, 0.0, "a", null, null), + createRow(1, 0.0, "a", null, null), + createRow(1, 1.0, "a", null, null), + createRow(1, 1.0, "b", null, null), + createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2), + createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3) + }; + + TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig()); + RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data); + testInstance.testAll(); + } + + // ---------------------------------------------------------------------------------------------- + + private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) { + Row row = new Row(5); + row.setField(0, f0); + row.setField(1, f1); + row.setField(2, f2); + row.setField(3, f3); + row.setField(4, f4); + return row; + } + + + private class RowSerializerTestInstance extends SerializerTestInstance<Row> { + + RowSerializerTestInstance( + TypeSerializer<Row> serializer, + Row... testData) { + super(serializer, Row.class, -1, testData); + } + + @Override + protected void deepEquals(String message, Row should, Row is) { + int arity = should.getArity(); + assertEquals(message, arity, is.getArity()); + for (int i = 0; i < arity; i++) { + Object copiedValue = should.getField(i); + Object element = is.getField(i); + assertEquals(message, element, copiedValue); + } + } + } + + public static class MyPojo implements Serializable, Comparable<MyPojo> { + public String name = null; + + @Override + public int compareTo(MyPojo o) { + if (name == null && o.name == null) { + return 0; + } else if (name == null) { + return -1; + } else if (o.name == null) { + return 1; + } else { + return name.compareTo(o.name); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MyPojo myPojo = (MyPojo) o; + + return name != null ? name.equals(myPojo.name) : myPojo.name == null; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/types/RowTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java new file mode 100644 index 0000000..35ba32d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.types; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RowTest { + @Test + public void testRowToString() { + Row row = new Row(5); + row.setField(0, 1); + row.setField(1, "hello"); + row.setField(2, null); + row.setField(3, new Tuple2<>(2, "hi")); + row.setField(4, "hello world"); + + assertEquals("1,hello,null,(2,hi),hello world", row.toString()); + } +}
