[
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062112#comment-15062112
]
ASF GitHub Bot commented on FLINK-3140:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1465#discussion_r47911612
--- Diff:
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator
+import org.apache.flink.api.common.typeutils.{CompositeTypeComparator,
TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
+import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker
+import org.apache.flink.core.memory.{MemorySegment, DataOutputView,
DataInputView}
+import org.apache.flink.types.{KeyFieldOutOfBoundsException,
NullKeyFieldException}
+
+/**
+ * Comparator for [[Row]].
+ */
+class RowComparator private (
+ /** key positions describe which fields are keys in what order */
+ val keyPositions: Array[Int],
+ /** comparators for the key fields, in the same order as the key
fields */
+ val comparators: Array[TypeComparator[_]],
+ /** serializers to deserialize the first n fields for comparison */
+ val serializers: Array[TypeSerializer[_]],
+ /** auxiliary fields for normalized key support */
+ private var normalizedKeyLengths: Array[Int],
+ private var numLeadingNormalizableKeys: Int,
+ private var normalizableKeyPrefixLen: Int,
+ private var invertNormKey: Boolean)
+ extends CompositeTypeComparator[Row] with Serializable {
+
+ // null masks for serialized comparison
+ private val nullMask1 = new Array[Boolean](serializers.length)
+ private val nullMask2 = new Array[Boolean](serializers.length)
+
+ // cache for the deserialized key field objects
+ @transient
+ private var deserializedKeyFields1: Array[Any] = null
+ @transient
+ private var deserializedKeyFields2: Array[Any] = null
+
+ // null checker for reference comparison
+ private val nullChecker = new NullChecker()
+
+ def this(
+ keyPositions: Array[Int],
+ comparators: Array[TypeComparator[_]],
+ serializers: Array[TypeSerializer[_]]) = {
+ this(keyPositions, comparators, serializers, new
Array[Int](keyPositions.length), 0, 0, false)
+ // set up auxiliary fields for normalized key support
+ setupAuxiliaryFields()
+ }
+
+ private def setupAuxiliaryFields(): Unit = {
+ var i = 0
+ while (i < keyPositions.length) {
+ val k: TypeComparator[_] = comparators(i)
+ // as long as the leading keys support normalized keys, we can build
up the composite key
+ if (k.supportsNormalizedKey()) {
+ if (i == 0) {
+ // the first comparator decides whether we need to invert the
key direction
+ invertNormKey = k.invertNormalizedKey()
+ }
+ else if (k.invertNormalizedKey() != invertNormKey) {
+ // if a successor does not agree on the inversion direction, it
cannot be part of the
+ // normalized key
+ return
+ }
+ numLeadingNormalizableKeys += 1
+ val len: Int = k.getNormalizeKeyLen
+ if (len < 0) {
+ throw new RuntimeException("Comparator " + k.getClass.getName +
+ " specifies an invalid length for the normalized key: " + len)
+ }
+ normalizedKeyLengths(i) = len + 1 // add one for a null byte
+ normalizableKeyPrefixLen += len + 1 // add one for a null byte
+ if (normalizableKeyPrefixLen < 0) {
+ // overflow, which means we are out of budget for normalized key
space anyways
+ normalizableKeyPrefixLen = Integer.MAX_VALUE
+ return
+ }
+ }
+ else {
+ return
+ }
+ i += 1
+ }
+ }
+
+ private def instantiateDeserializationUtils(): Unit = {
+ deserializedKeyFields1 = new Array[Any](serializers.length)
+ deserializedKeyFields2 = new Array[Any](serializers.length)
+
+ var i = 0
+ while (i < serializers.length) {
+ deserializedKeyFields1(i) = serializers(i).createInstance()
+ deserializedKeyFields2(i) = serializers(i).createInstance()
+ i += 1
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Comparator Methods
+ //
--------------------------------------------------------------------------------------------
+
+ override def getFlatComparator(flatComparators:
util.List[TypeComparator[_]]): Unit =
+ comparators.foreach {
+ case ctc: CompositeTypeComparator[_] =>
ctc.getFlatComparator(flatComparators)
+ case c@_ => flatComparators.add(c)
+ }
+
+ override def compareToReference(referencedComparator:
TypeComparator[Row]): Int = {
+ val other: RowComparator =
referencedComparator.asInstanceOf[RowComparator]
+ var i = 0
+ try {
+ while (i < keyPositions.length) {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val otherComparator =
other.comparators(i).asInstanceOf[TypeComparator[Any]]
+
+ val nullCheck = comparator.equalToReference(nullChecker)
+ val nullCheckOther = otherComparator.equalToReference(nullChecker)
+
+ var cmp = 0
+ // both values are null -> equality
+ if (nullCheck && nullCheckOther) {
+ cmp = 0
+ }
+ // one value is null -> inequality
+ // order is considered for basic types
+ else if (nullCheck || nullCheckOther) {
+ if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
+ val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
+ if (nullCheck) {
+ return if (basicComp.isAscendingComparison) 1 else -1
+ }
+ else if (nullCheckOther) {
+ return if (basicComp.isAscendingComparison) -1 else 1
+ }
+ }
+ else {
+ return if (nullCheck) 1 else -1
+ }
+ }
+ else {
+ cmp = comparator.compareToReference(otherComparator)
+ }
+
+ if (cmp != 0) {
+ return cmp
+ }
+ i = i + 1
+ }
+ 0
+ }
+ catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
--- End diff --
We could write this as
```
override def compareToReference(referencedComparator: TypeComparator[Row]):
Int = {
val other: RowComparator =
referencedComparator.asInstanceOf[RowComparator]
comparators.zip(other.comparators).view.map{
case (comparator, otherComparator) =>
val nullCheck =
comparator.asInstanceOf[TypeComparator[Any]].equalToReference(nullChecker)
val nullCheckOther =
otherComparator.asInstanceOf[TypeComparator[Any]]
.equalToReference(nullChecker)
(nullCheck, nullCheckOther) match {
case (true, true) => 0
case (true, false) =>
comparator match {
case basicComp: BasicTypeComparator[_] if
basicComp.isAscendingComparison =>
1
case basicComp: BasicTypeComparator[_] if
!basicComp.isAscendingComparison =>
-1
case _ => 1
}
case (false, true) =>
comparator match {
case basicComp: BasicTypeComparator[_] if
basicComp.isAscendingComparison =>
-1
case basicComp: BasicTypeComparator[_] if
!basicComp.isAscendingComparison =>
1
case _ => -1
}
case (false, false) =>
comparator.compareToReference(otherComparator)
}
}.dropWhile(_ == 0).headOption.getOrElse(0)
}
```
> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
> Key: FLINK-3140
> URL: https://issues.apache.org/jira/browse/FLINK-3140
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Chengxiang Li
> Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row
> Serializer/Comparator which is aware of NULL value fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)