[
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093820#comment-15093820
]
ASF GitHub Bot commented on FLINK-3140:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1465#discussion_r49448186
--- Diff:
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.typeutils.{CompositeTypeComparator,
TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
+import
org.apache.flink.api.table.typeinfo.RowComparator.{createAuxiliaryFields,
makeNullAware}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView,
MemorySegment}
+import org.apache.flink.types.{KeyFieldOutOfBoundsException,
NullKeyFieldException}
+
+/**
+ * Comparator for [[Row]].
+ */
+class RowComparator private (
+ /** key positions describe which fields are keys in what order */
+ val keyPositions: Array[Int],
+ /** null-aware comparators for the key fields, in the same order as
the key fields */
+ val comparators: Array[NullAwareComparator[_]],
+ /** serializers to deserialize the first n fields for comparison */
+ val serializers: Array[TypeSerializer[_]],
+ /** auxiliary fields for normalized key support */
+ private val auxiliaryFields: (Array[Int], Int, Int, Boolean))
+ extends CompositeTypeComparator[Row] with Serializable {
+
+ // null masks for serialized comparison
+ private val nullMask1 = new Array[Boolean](serializers.length)
+ private val nullMask2 = new Array[Boolean](serializers.length)
+
+ // cache for the deserialized key field objects
+ @transient
+ private lazy val deserializedKeyFields1: Array[Any] =
instantiateDeserializationFields()
+
+ @transient
+ private lazy val deserializedKeyFields2: Array[Any] =
instantiateDeserializationFields()
+
+ // create auxiliary fields
+ private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1
+ private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
+ private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
+ private val invertNormKey: Boolean = auxiliaryFields._4
+
+ /**
+ * Intermediate constructor for creating auxiliary fields.
+ */
+ def this(
+ keyPositions: Array[Int],
+ comparators: Array[NullAwareComparator[_]],
+ serializers: Array[TypeSerializer[_]]) = {
+ this(
+ keyPositions,
+ comparators,
+ serializers,
+ createAuxiliaryFields(keyPositions, comparators))
+ }
+
+ /**
+ * General constructor for RowComparator.
+ *
+ * @param keyPositions key positions describe which fields are keys in
what order
+ * @param comparators non-null-aware comparators for the key fields, in
the same order as
+ * the key fields
+ * @param serializers serializers to deserialize the first n fields for
comparison
+ * @param orders sorting orders for the fields
+ */
+ def this(
+ keyPositions: Array[Int],
+ comparators: Array[TypeComparator[_]],
+ serializers: Array[TypeSerializer[_]],
+ orders: Array[Boolean]) = {
+ this(
+ keyPositions,
+ makeNullAware(comparators, orders),
+ serializers)
+ }
+
+ private def instantiateDeserializationFields(): Array[Any] = {
+ val newFields = new Array[Any](serializers.length)
+ var i = 0
+ while (i < serializers.length) {
+ newFields(i) = serializers(i).createInstance()
+ i += 1
+ }
+ newFields
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Comparator Methods
+ //
--------------------------------------------------------------------------------------------
+
+ override def getFlatComparator(flatComparators:
util.List[TypeComparator[_]]): Unit =
+ comparators.map(_.wrappedComparator).foreach {
--- End diff --
The key fields extracted by `extractKeys()` can be null. These keys are
compared by the comparators returned by this method. Hence, we must return
comparators which are null-aware.
> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
> Key: FLINK-3140
> URL: https://issues.apache.org/jira/browse/FLINK-3140
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Chengxiang Li
> Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row
> Serializer/Comparator which is aware of NULL value fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)