[
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068571#comment-15068571
]
ASF GitHub Bot commented on FLINK-3140:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1465#discussion_r48286873
--- Diff:
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
---
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator
+import org.apache.flink.api.common.typeutils.{CompositeTypeComparator,
TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
+import
org.apache.flink.api.table.typeinfo.RowComparator.createAuxiliaryFields
+import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker
+import org.apache.flink.core.memory.{MemorySegment, DataOutputView,
DataInputView}
+import org.apache.flink.types.{KeyFieldOutOfBoundsException,
NullKeyFieldException}
+
+/**
+ * Comparator for [[Row]].
+ */
+class RowComparator private (
+ /** key positions describe which fields are keys in what order */
+ val keyPositions: Array[Int],
+ /** comparators for the key fields, in the same order as the key
fields */
+ val comparators: Array[TypeComparator[_]],
+ /** serializers to deserialize the first n fields for comparison */
+ val serializers: Array[TypeSerializer[_]],
+ /** auxiliary fields for normalized key support */
+ private val auxiliaryFields: (List[Int], Int, Int))
+ extends CompositeTypeComparator[Row] with Serializable {
+
+ // null masks for serialized comparison
+ private val nullMask1 = new Array[Boolean](serializers.length)
+ private val nullMask2 = new Array[Boolean](serializers.length)
+
+ // cache for the deserialized key field objects
+ @transient
+ private lazy val deserializedKeyFields1: Array[Any] =
instantiateDeserializationFields()
+
+ @transient
+ private lazy val deserializedKeyFields2: Array[Any] =
instantiateDeserializationFields()
+
+ // null checker for reference comparison
+ private val nullChecker = new NullChecker()
+
+ // create auxiliary fields
+ private val normalizedKeyLengths: Array[Int] =
auxiliaryFields._1.toArray[Int]
+ private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
+ private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
+
+ // first comparator decides invert the key direction
+ private val invertNormKey: Boolean = comparators(0).invertNormalizedKey()
+
+ def this(
+ keyPositions: Array[Int],
+ comparators: Array[TypeComparator[_]],
+ serializers: Array[TypeSerializer[_]]) = {
+ this(
+ keyPositions,
+ comparators,
+ serializers,
+ createAuxiliaryFields(comparators))
+ }
+
+ private def instantiateDeserializationFields(): Array[Any] = {
+ val newFields = new Array[Any](serializers.length)
+ var i = 0
+ while (i < serializers.length) {
+ newFields(i) = serializers(i).createInstance()
+ i += 1
+ }
+ newFields
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Comparator Methods
+ //
--------------------------------------------------------------------------------------------
+
+ override def getFlatComparator(flatComparators:
util.List[TypeComparator[_]]): Unit =
+ comparators.foreach {
+ case ctc: CompositeTypeComparator[_] =>
ctc.getFlatComparator(flatComparators)
+ case c@_ => flatComparators.add(c)
+ }
+
+ override def compareToReference(referencedComparator:
TypeComparator[Row]): Int = {
+ val other: RowComparator =
referencedComparator.asInstanceOf[RowComparator]
+ var i = 0
+ try {
+ while (i < keyPositions.length) {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val otherComparator =
other.comparators(i).asInstanceOf[TypeComparator[Any]]
+
+ val nullCheck = comparator.equalToReference(nullChecker)
+ val nullCheckOther = otherComparator.equalToReference(nullChecker)
+
+ var cmp = 0
+ // both values are null -> equality
+ if (nullCheck && nullCheckOther) {
+ cmp = 0
+ }
+ // one value is null -> inequality
+ // order is considered for basic types
+ else if (nullCheck || nullCheckOther) {
+ if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
+ val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
+ if (nullCheck) {
+ return if (basicComp.isAscendingComparison) 1 else -1
+ }
+ else if (nullCheckOther) {
+ return if (basicComp.isAscendingComparison) -1 else 1
+ }
+ }
+ else {
+ return if (nullCheck) 1 else -1
+ }
+ }
+ else {
+ cmp = comparator.compareToReference(otherComparator)
+ }
+
+ if (cmp != 0) {
+ return cmp
+ }
+ i = i + 1
+ }
+ 0
+ }
+ catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+
+ override def compareSerialized(firstSource: DataInputView, secondSource:
DataInputView): Int = {
+ val len = serializers.length
+ val keyLen = keyPositions.length
+
+ readIntoNullMask(len, firstSource, nullMask1)
+ readIntoNullMask(len, secondSource, nullMask2)
+
+ // deserialize
+ var i = 0
+ while (i < len) {
+ val serializer = serializers(i).asInstanceOf[TypeSerializer[Any]]
+
+ // deserialize field 1
+ if (nullMask1(i)) {
+ deserializedKeyFields1(i) = null
+ }
+ else if (deserializedKeyFields1(i) != null) {
+ deserializedKeyFields1(i) =
serializer.deserialize(deserializedKeyFields1(i), firstSource)
+ }
+ else {
+ deserializedKeyFields1(i) = serializer.deserialize(firstSource)
+ }
+
+ // deserialize field 2
+ if (nullMask2(i)) {
+ deserializedKeyFields2(i) = null
+ }
+ else if (deserializedKeyFields2(i) != null) {
+ deserializedKeyFields2(i) =
serializer.deserialize(deserializedKeyFields2(i), secondSource)
+ }
+ else {
+ deserializedKeyFields2(i) = serializer.deserialize(secondSource)
+ }
+
+ i += 1
+ }
+
+ // compare
+ i = 0
+ while (i < keyLen) {
+ val keyPos = keyPositions(i)
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+
+ val firstElement = deserializedKeyFields1(keyPos)
+ val secondElement = deserializedKeyFields2(keyPos)
+
+ var cmp = 0
+ // both values are null -> equality
+ if (firstElement == null && secondElement == null) {
+ cmp = 0
+ }
+ // one value is null -> inequality
+ // order is considered for basic types
+ else if (firstElement == null || secondElement == null) {
+ if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
+ val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
+ if (firstElement == null) {
+ return if (basicComp.isAscendingComparison) -1 else 1
+ }
+ else if (secondElement == null) {
+ return if (basicComp.isAscendingComparison) 1 else -1
+ }
+ }
+ else {
+ return if (firstElement == null) -1 else 1
+ }
+ }
+ else {
+ cmp = comparator.compare(firstElement, secondElement)
+ }
+
+ if (cmp != 0) {
+ return cmp
+ }
+
+ i += 1
+ }
+ 0
+ }
+
+ override def supportsNormalizedKey(): Boolean =
numLeadingNormalizableKeys > 0
+
+ override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen
+
+ override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
+ numLeadingNormalizableKeys < keyPositions.length ||
+ normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+ normalizableKeyPrefixLen > keyBytes
+
+ override def invertNormalizedKey(): Boolean = invertNormKey
+
+ override def supportsSerializationWithKeyNormalization(): Boolean = false
+
+ override def writeWithKeyNormalization(record: Row, target:
DataOutputView): Unit =
+ throw new UnsupportedOperationException("Normalized keys not
suppported for Rows.")
+
+ override def readWithKeyDenormalization(reuse: Row, source:
DataInputView): Row =
+ throw new UnsupportedOperationException("Normalized keys not
suppported for Rows.")
+
+ override def duplicate(): TypeComparator[Row] = {
+ // copy comparator and serializer factories
+ val comparatorsCopy = comparators.map(_.duplicate())
+ val serializersCopy = serializers.map(_.duplicate())
+
+ new RowComparator(
+ keyPositions,
+ comparatorsCopy,
+ serializersCopy,
+ auxiliaryFields)
+ }
+
+ override def hash(value: Row): Int = {
+ var code: Int = 0
+ var i = 0
+ try {
+ while(i < keyPositions.length) {
+ code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
+ val element = value.productElement(keyPositions(i))
+ if (element != null) {
+ code +=
comparators(i).asInstanceOf[TypeComparator[Any]].hash(element)
+ }
+ i += 1
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ code
+ }
+
+ override def setReference(toCompare: Row) {
+ var i = 0
+ try {
+ while(i < keyPositions.length) {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val element = toCompare.productElement(keyPositions(i))
+ comparator.setReference(element) // element can be null
+ i += 1
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+
+ override def equalToReference(candidate: Row): Boolean = {
+ var i = 0
+ try {
+ while(i < keyPositions.length) {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val element = candidate.productElement(keyPositions(i))
+ // check if reference is null
+ if (element == null && !comparator.equalToReference(nullChecker)) {
+ return false
+ }
+ else if (element != null && !comparator.equalToReference(element))
{
+ return false
+ }
+ i += 1
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ true
+ }
+
+ override def compare(first: Row, second: Row): Int = {
+ var i = 0
+ try {
+ while(i < keyPositions.length) {
+ val keyPos: Int = keyPositions(i)
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val firstElement = first.productElement(keyPos)
+ val secondElement = second.productElement(keyPos)
+
+ var cmp = 0
+ // both values are null -> equality
+ if (firstElement == null && secondElement == null) {
+ cmp = 0
+ }
+ // one value is null -> inequality
+ // order is considered for basic types
+ else if (firstElement == null || secondElement == null) {
+ if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
+ val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
+ if (firstElement == null) {
+ return if (basicComp.isAscendingComparison) -1 else 1
+ }
+ else if (secondElement == null) {
+ return if (basicComp.isAscendingComparison) 1 else -1
+ }
+ }
+ else {
+ return if (firstElement == null) -1 else 1
+ }
+ }
+ else {
+ cmp = comparator.compare(firstElement, secondElement)
+ }
+
+ if (cmp != 0) {
+ return cmp
+ }
+ i += 1
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ 0
+ }
+
+ override def putNormalizedKey(record: Row, target: MemorySegment,
offset: Int, numBytes: Int)
+ : Unit = {
+ var bytesLeft = numBytes
+ var currentOffset = offset
+
+ var i = 0
+ var j = 0
+ while (i < numLeadingNormalizableKeys && bytesLeft > 0) {
+ var len = normalizedKeyLengths(i)
+ len = if (bytesLeft >= len) len else bytesLeft
+
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val element = record.productElement(keyPositions(i))
+ if (len > 0) {
+ // write a null byte with padding
+ if (element == null) {
+ target.putBoolean(currentOffset, true)
+ // write padding
+ j = 0
+ while (j < len - 1) {
+ target.put(currentOffset + j, 0.toByte)
+ j += 1
+ }
+ }
+ // write a non-null byte with key
+ else {
+ target.putBoolean(currentOffset, false)
+ // write key
+ comparator.putNormalizedKey(element, target, currentOffset, len
- 1)
+ }
+ }
+
+ bytesLeft -= len
+ currentOffset += len
+ i += 1
+ }
+ }
+
+ override def extractKeys(record: Any, target: Array[AnyRef], index:
Int): Int = {
+ val len = comparators.length
+ var localIndex = index
+ var i = 0
+ while (i < len) {
+ val element =
record.asInstanceOf[Row].productElement(keyPositions(i))
+ localIndex += comparators(i).extractKeys(element, target, localIndex)
+ i += 1
+ }
+ localIndex - index
+ }
+
+}
+
+object RowComparator {
+ private class NullChecker extends Comparable[AnyRef] with Serializable {
--- End diff --
I don't think it is a good idea to use the `NullChecker` for reference
comparisons against the field comparators. Doesn't this rely on a specific
behavior of the field comparators? The field comparator might call the `equals`
method on the null-valued reference object instead of the other object or it
may throw a NPE if null is set as reference object.
I think there are two better alternatives:
1. Track which fields of the reference object are null using a Boolean
array within the RowComparator and don't set null values in the field
comparator.
2. Wrap each field comparator with a null-aware comparator. This comparator
could also take the logic for the null handling of normalized keys.
> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
> Key: FLINK-3140
> URL: https://issues.apache.org/jira/browse/FLINK-3140
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Chengxiang Li
> Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row
> Serializer/Comparator which is aware of NULL value fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)