[
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062058#comment-15062058
]
ASF GitHub Bot commented on FLINK-3140:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1465#discussion_r47907286
--- Diff:
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
---
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator
+import org.apache.flink.api.common.typeutils.{CompositeTypeComparator,
TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
+import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker
+import org.apache.flink.core.memory.{MemorySegment, DataOutputView,
DataInputView}
+import org.apache.flink.types.{KeyFieldOutOfBoundsException,
NullKeyFieldException}
+
+/**
+ * Comparator for [[Row]].
+ */
+class RowComparator private (
+ /** key positions describe which fields are keys in what order */
+ val keyPositions: Array[Int],
+ /** comparators for the key fields, in the same order as the key
fields */
+ val comparators: Array[TypeComparator[_]],
+ /** serializers to deserialize the first n fields for comparison */
+ val serializers: Array[TypeSerializer[_]],
+ /** auxiliary fields for normalized key support */
+ private var normalizedKeyLengths: Array[Int],
+ private var numLeadingNormalizableKeys: Int,
+ private var normalizableKeyPrefixLen: Int,
+ private var invertNormKey: Boolean)
+ extends CompositeTypeComparator[Row] with Serializable {
+
+ // null masks for serialized comparison
+ private val nullMask1 = new Array[Boolean](serializers.length)
+ private val nullMask2 = new Array[Boolean](serializers.length)
+
+ // cache for the deserialized key field objects
+ @transient
+ private var deserializedKeyFields1: Array[Any] = null
+ @transient
+ private var deserializedKeyFields2: Array[Any] = null
+
+ // null checker for reference comparison
+ private val nullChecker = new NullChecker()
+
+ def this(
+ keyPositions: Array[Int],
+ comparators: Array[TypeComparator[_]],
+ serializers: Array[TypeSerializer[_]]) = {
+ this(keyPositions, comparators, serializers, new
Array[Int](keyPositions.length), 0, 0, false)
+ // set up auxiliary fields for normalized key support
+ setupAuxiliaryFields()
+ }
+
+ private def setupAuxiliaryFields(): Unit = {
+ var i = 0
+ while (i < keyPositions.length) {
+ val k: TypeComparator[_] = comparators(i)
+ // as long as the leading keys support normalized keys, we can build
up the composite key
+ if (k.supportsNormalizedKey()) {
+ if (i == 0) {
+ // the first comparator decides whether we need to invert the
key direction
+ invertNormKey = k.invertNormalizedKey()
+ }
+ else if (k.invertNormalizedKey() != invertNormKey) {
+ // if a successor does not agree on the inversion direction, it
cannot be part of the
+ // normalized key
+ return
+ }
+ numLeadingNormalizableKeys += 1
+ val len: Int = k.getNormalizeKeyLen
+ if (len < 0) {
+ throw new RuntimeException("Comparator " + k.getClass.getName +
+ " specifies an invalid length for the normalized key: " + len)
+ }
+ normalizedKeyLengths(i) = len + 1 // add one for a null byte
+ normalizableKeyPrefixLen += len + 1 // add one for a null byte
+ if (normalizableKeyPrefixLen < 0) {
+ // overflow, which means we are out of budget for normalized key
space anyways
+ normalizableKeyPrefixLen = Integer.MAX_VALUE
+ return
+ }
+ }
+ else {
+ return
+ }
+ i += 1
+ }
+ }
--- End diff --
We could do it like this
```
def createAuxiliaryFields(
comparators: Array[TypeComparator[_]])
: (List[Int], Int, Int) = {
comparators.takeWhile {
c =>
c.supportsNormalizedKey() && c.invertNormalizedKey() ==
comparators(0).invertNormalizedKey()
}.foldLeft((List[Int](), 0, 0)) {
case ((normKeyLengths, numLeadingNormKeys, normKeyPrefixLen),
comparator) =>
val len = comparator.getNormalizeKeyLen
if (len < 0) {
throw new RuntimeException("Comparator " +
comparator.getClass.getName +
" specifies an invalid length for the normalized key: " + len)
}
((len + 1) :: normKeyLengths, numLeadingNormKeys + 1,
normKeyPrefixLen + len + 1)
}
}
```
> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
> Key: FLINK-3140
> URL: https://issues.apache.org/jira/browse/FLINK-3140
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Chengxiang Li
> Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row
> Serializer/Comparator which is aware of NULL value fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)