[
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093764#comment-15093764
]
ASF GitHub Bot commented on FLINK-3140:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1465#discussion_r49442607
--- Diff:
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala
---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.core.memory.{MemorySegment, DataOutputView,
DataInputView}
+
+/**
+ * Null-aware comparator that wraps a comparator which does not support
null references.
+ */
+class NullAwareComparator[T](
+ val wrappedComparator: TypeComparator[T],
+ val order: Boolean)
+ extends TypeComparator[T] {
+
+ // stores the null for reference comparison
+ private var nullReference = false
+
+ override def hash(record: T): Int = {
+ if (record != null) {
+ wrappedComparator.hash(record)
+ }
+ else {
+ 0
+ }
+ }
+
+ override def getNormalizeKeyLen: Int = {
+ val len = wrappedComparator.getNormalizeKeyLen
+ if (len == Integer.MAX_VALUE) {
+ Integer.MAX_VALUE
+ }
+ else {
+ len + 1 // add one for a null byte
+ }
+ }
+
+ override def putNormalizedKey(
+ record: T,
+ target: MemorySegment,
+ offset: Int,
+ numBytes: Int)
+ : Unit = {
+ if (numBytes > 0) {
+ // write a null byte with padding
+ if (record == null) {
+ target.putBoolean(offset, false)
+ // write padding
+ var j = 0
+ while (j < numBytes - 1) {
+ target.put(offset + 1 + j, 0.toByte)
+ j += 1
+ }
+ }
+ // write a non-null byte with key
+ else {
+ target.putBoolean(offset, true)
+ // write key
+ wrappedComparator.putNormalizedKey(record, target, offset + 1,
numBytes - 1)
+ }
+ }
+ }
+
+ override def invertNormalizedKey(): Boolean =
wrappedComparator.invertNormalizedKey()
+
+ override def supportsSerializationWithKeyNormalization(): Boolean = false
+
+ override def writeWithKeyNormalization(record: T, target:
DataOutputView): Unit =
+ throw new UnsupportedOperationException("Normalized keys not
supported.")
+
+ override def readWithKeyDenormalization(reuse: T, source:
DataInputView): T =
+ throw new UnsupportedOperationException("Normalized keys not
supported.")
+
+ override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
+ wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes)
+
+ override def setReference(toCompare: T): Unit = {
+ if (toCompare == null) {
+ nullReference = true
+ }
+ else {
+ nullReference = false
+ wrappedComparator.setReference(toCompare)
+ }
+ }
+
+ override def compare(first: T, second: T): Int = {
+ // both values are null -> equality
+ if (first == null && second == null) {
+ 0
+ }
+ // first value is null -> inequality
+ // but order is considered
+ else if (first == null) {
+ if (order) -1 else 1
+ }
+ // second value is null -> inequality
+ // but order is considered
+ else if (second == null) {
+ if (order) 1 else -1
+ }
+ // no null values
+ else {
+ wrappedComparator.compare(first, second)
+ }
+ }
+
+ override def compareToReference(referencedComparator:
TypeComparator[T]): Int = {
+ val otherComparator =
referencedComparator.asInstanceOf[NullAwareComparator[T]]
+ val otherNullReference = otherComparator.nullReference
+ // both values are null -> equality
+ if (nullReference && otherNullReference) {
+ 0
+ }
+ // first value is null -> inequality
+ // but order is considered
+ else if (nullReference) {
+ if (order) 1 else -1
+ }
+ // second value is null -> inequality
+ // but order is considered
+ else if (otherNullReference) {
+ if (order) -1 else 1
+ }
+ // no null values
+ else {
+
wrappedComparator.compareToReference(otherComparator.wrappedComparator)
+ }
+ }
+
+ override def supportsNormalizedKey(): Boolean =
wrappedComparator.supportsNormalizedKey()
+
+ override def equalToReference(candidate: T): Boolean = {
+ // both values are null
+ if (candidate == null && nullReference) {
+ true
+ }
+ // one value is null
+ else if (candidate == null || nullReference) {
+ false
+ }
+ // no null value
+ else {
+ wrappedComparator.equalToReference(candidate)
+ }
+ }
+
+ override def duplicate(): TypeComparator[T] = {
+ new NullAwareComparator[T](wrappedComparator.duplicate(), order)
+ }
+
+ override def extractKeys(record: Any, target: Array[AnyRef], index:
Int): Int = {
+ if (record == null) {
+ target(index) = null
+ 1
--- End diff --
This does only work, if the wrapped comparator is atomic. Otherwise, we
will get a different number of keys for `null` and `non-null` composite types.
Maybe, we can create a default instance using the type serializer and pass it
to the comparator to create the right number of values and later overwrite all
with `null`. This is hacky, but might work :-/.
> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
> Key: FLINK-3140
> URL: https://issues.apache.org/jira/browse/FLINK-3140
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Chengxiang Li
> Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row
> Serializer/Comparator which is aware of NULL value fields.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)