[ 
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15115579#comment-15115579
 ] 

ASF GitHub Bot commented on FLINK-3140:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1465#discussion_r50724445
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
 ---
    @@ -0,0 +1,423 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.typeinfo
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, 
TypeComparator, TypeSerializer}
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
    +import 
org.apache.flink.api.table.typeinfo.RowComparator.{createAuxiliaryFields, 
makeNullAware}
    +import org.apache.flink.core.memory.{DataInputView, DataOutputView, 
MemorySegment}
    +import org.apache.flink.types.KeyFieldOutOfBoundsException
    +
    +/**
    + * Comparator for [[Row]].
    + */
    +class RowComparator private (
    +    /** key positions describe which fields are keys in what order */
    +    val keyPositions: Array[Int],
    +    /** null-aware comparators for the key fields, in the same order as 
the key fields */
    +    val comparators: Array[NullAwareComparator[_]],
    +    /** serializers to deserialize the first n fields for comparison */
    +    val serializers: Array[TypeSerializer[_]],
    +    /** auxiliary fields for normalized key support */
    +    private val auxiliaryFields: (Array[Int], Int, Int, Boolean))
    +  extends CompositeTypeComparator[Row] with Serializable {
    +
    +  // null masks for serialized comparison
    +  private val nullMask1 = new Array[Boolean](serializers.length)
    +  private val nullMask2 = new Array[Boolean](serializers.length)
    +
    +  // cache for the deserialized key field objects
    +  @transient
    +  private lazy val deserializedKeyFields1: Array[Any] = 
instantiateDeserializationFields()
    +
    +  @transient
    +  private lazy val deserializedKeyFields2: Array[Any] = 
instantiateDeserializationFields()
    +
    +  // create auxiliary fields
    +  private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1
    +  private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
    +  private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
    +  private val invertNormKey: Boolean = auxiliaryFields._4
    +
    +  /**
    +   * Intermediate constructor for creating auxiliary fields.
    +   */
    +  def this(
    +      keyPositions: Array[Int],
    +      comparators: Array[NullAwareComparator[_]],
    +      serializers: Array[TypeSerializer[_]]) = {
    +    this(
    +      keyPositions,
    +      comparators,
    +      serializers,
    +      createAuxiliaryFields(keyPositions, comparators))
    +  }
    +
    +  /**
    +   * General constructor for RowComparator.
    +   *
    +   * @param keyPositions key positions describe which fields are keys in 
what order
    +   * @param comparators non-null-aware comparators for the key fields, in 
the same order as
    +   *   the key fields
    +   * @param serializers serializers to deserialize the first n fields for 
comparison
    +   * @param orders sorting orders for the fields
    +   */
    +  def this(
    +      keyPositions: Array[Int],
    +      comparators: Array[TypeComparator[_]],
    +      serializers: Array[TypeSerializer[_]],
    +      orders: Array[Boolean]) = {
    +    this(
    +      keyPositions,
    +      makeNullAware(comparators, orders),
    +      serializers)
    +  }
    +
    +  private def instantiateDeserializationFields(): Array[Any] = {
    +    val newFields = new Array[Any](serializers.length)
    +    var i = 0
    +    while (i < serializers.length) {
    +      newFields(i) = serializers(i).createInstance()
    +      i += 1
    +    }
    +    newFields
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  Comparator Methods
    +  // 
--------------------------------------------------------------------------------------------
    +
    +  override def compareToReference(referencedComparator: 
TypeComparator[Row]): Int = {
    +    val other: RowComparator = 
referencedComparator.asInstanceOf[RowComparator]
    +    var i = 0
    +    try {
    +      while (i < keyPositions.length) {
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val otherComparator = 
other.comparators(i).asInstanceOf[TypeComparator[Any]]
    +
    +        val cmp = comparator.compareToReference(otherComparator)
    +        if (cmp != 0) {
    +          return cmp
    +        }
    +        i = i + 1
    +      }
    +      0
    +    }
    +    catch {
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +  }
    +
    +  override def compareSerialized(firstSource: DataInputView, secondSource: 
DataInputView): Int = {
    +    val len = serializers.length
    +    val keyLen = keyPositions.length
    +
    +    readIntoNullMask(len, firstSource, nullMask1)
    +    readIntoNullMask(len, secondSource, nullMask2)
    +
    +    // deserialize
    +    var i = 0
    +    while (i < len) {
    +      val serializer = serializers(i).asInstanceOf[TypeSerializer[Any]]
    +
    +      // deserialize field 1
    +      if (!nullMask1(i)) {
    +        deserializedKeyFields1(i) = 
serializer.deserialize(deserializedKeyFields1(i), firstSource)
    +      }
    +
    +      // deserialize field 2
    +      if (!nullMask2(i)) {
    +        deserializedKeyFields2(i) = 
serializer.deserialize(deserializedKeyFields2(i), secondSource)
    +      }
    +
    +      i += 1
    +    }
    +
    +    // compare
    +    i = 0
    +    while (i < keyLen) {
    +      val keyPos = keyPositions(i)
    +      val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +
    +      val isNull1 = nullMask1(keyPos)
    +      val isNull2 = nullMask2(keyPos)
    +
    +      var cmp = 0
    +      // both values are null -> equality
    +      if (isNull1 && isNull2) {
    +        cmp = 0
    +      }
    +      // first value is null -> inequality
    +      else if (isNull1) {
    +        cmp = comparator.compare(null, deserializedKeyFields2(keyPos))
    +      }
    +      // second value is null -> inequality
    +      else if (isNull2) {
    +        cmp = comparator.compare(deserializedKeyFields1(keyPos), null)
    +      }
    +      // no null values
    +      else {
    +        cmp = comparator.compare(deserializedKeyFields1(keyPos), 
deserializedKeyFields2(keyPos))
    +      }
    +
    +      if (cmp != 0) {
    +        return cmp
    +      }
    +
    +      i += 1
    +    }
    +    0
    +  }
    +
    +  override def supportsNormalizedKey(): Boolean = 
numLeadingNormalizableKeys > 0
    +
    +  override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen
    +
    +  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
    +    numLeadingNormalizableKeys < keyPositions.length ||
    +      normalizableKeyPrefixLen == Integer.MAX_VALUE ||
    +      normalizableKeyPrefixLen > keyBytes
    +
    +  override def invertNormalizedKey(): Boolean = invertNormKey
    +
    +  override def supportsSerializationWithKeyNormalization(): Boolean = false
    +
    +  override def writeWithKeyNormalization(record: Row, target: 
DataOutputView): Unit =
    +    throw new UnsupportedOperationException("Record serialization with 
leading normalized keys " +
    +      "not supported.")
    +
    +  override def readWithKeyDenormalization(reuse: Row, source: 
DataInputView): Row =
    +    throw new UnsupportedOperationException("Record deserialization with 
leading normalized keys " +
    +      "not supported.")
    +
    +  override def duplicate(): TypeComparator[Row] = {
    +    // copy comparator and serializer factories
    +    val comparatorsCopy = 
comparators.map(_.duplicate().asInstanceOf[NullAwareComparator[_]])
    +    val serializersCopy = serializers.map(_.duplicate())
    +
    +    new RowComparator(
    +      keyPositions,
    +      comparatorsCopy,
    +      serializersCopy,
    +      auxiliaryFields)
    +  }
    +
    +  override def hash(value: Row): Int = {
    +    var code: Int = 0
    +    var i = 0
    +    try {
    +      while(i < keyPositions.length) {
    +        code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
    +        val element = value.productElement(keyPositions(i)) // element can 
be null
    +        code += 
comparators(i).asInstanceOf[TypeComparator[Any]].hash(element)
    --- End diff --
    
    I think you can you get rid of these casts if you define `comparators` as 
`Array[NullAwareComparator[Any]]` instead of `Array[NullAwareComparator[_]]`.


> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
>                 Key: FLINK-3140
>                 URL: https://issues.apache.org/jira/browse/FLINK-3140
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Chengxiang Li
>            Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row 
> Serializer/Comparator which is aware of NULL value fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to