Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2968#discussion_r91652223
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
 ---
    @@ -0,0 +1,698 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.types.KeyFieldOutOfBoundsException;
    +import org.apache.flink.types.Row;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +
    +import static 
org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
    +/**
    + * Comparator for {@link Row}
    + */
    +public class RowComparator extends CompositeTypeComparator<Row> {
    +
    +   private static final long serialVersionUID = 1L;
    +   /** The number of fields of the Row */
    +   private final int arity;
    +   /** key positions describe which fields are keys in what order */
    +   private final int[] keyPositions;
    +   /** null-aware comparators for the key fields, in the same order as the 
key fields */
    +   private final NullAwareComparator<Object>[] comparators;
    +   /** serializers to deserialize the first n fields for comparison */
    +   private final TypeSerializer<Object>[] serializers;
    +   /** auxiliary fields for normalized key support */
    +   private final int[] normalizedKeyLengths;
    +   private final int numLeadingNormalizableKeys;
    +   private final int normalizableKeyPrefixLen;
    +   private final boolean invertNormKey;
    +
    +   // null masks for serialized comparison
    +   private final boolean[] nullMask1;
    +   private final boolean[] nullMask2;
    +
    +   // cache for the deserialized key field objects
    +   transient private final Object[] deserializedKeyFields1;
    +   transient private final Object[] deserializedKeyFields2;
    +
    +   /**
    +    * General constructor for RowComparator.
    +    *
    +    * @param arity        the number of fields of the Row
    +    * @param keyPositions key positions describe which fields are keys in 
what order
    +    * @param comparators  non-null-aware comparators for the key fields, 
in the same order as
    +    *                     the key fields
    +    * @param serializers  serializers to deserialize the first n fields 
for comparison
    +    * @param orders       sorting orders for the fields
    +    */
    +   public RowComparator(
    +           int arity,
    +           int[] keyPositions,
    +           TypeComparator<Object>[] comparators,
    +           TypeSerializer<Object>[] serializers,
    +           boolean[] orders) {
    +           this(arity, keyPositions, makeNullAware(comparators, orders), 
serializers);
    +   }
    +
    +
    +   /**
    +    * Intermediate constructor for creating auxiliary fields.
    +    */
    +   private RowComparator(
    +           int arity,
    +           int[] keyPositions,
    +           NullAwareComparator<Object>[] comparators,
    +           TypeSerializer<Object>[] serializers) {
    +           this(
    +                   arity,
    +                   keyPositions,
    +                   comparators,
    +                   serializers,
    +                   createAuxiliaryFields(keyPositions, comparators));
    +   }
    +
    +   /**
    +    * Intermediate constructor for creating auxiliary fields.
    +    */
    +   private RowComparator(
    +           int arity,
    +           int[] keyPositions,
    +           NullAwareComparator<Object>[] comparators,
    +           TypeSerializer<Object>[] serializers,
    +           Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) {
    +           this(
    +                   arity,
    +                   keyPositions,
    +                   comparators,
    +                   serializers,
    +                   auxiliaryFields.f0,
    +                   auxiliaryFields.f1,
    +                   auxiliaryFields.f2,
    +                   auxiliaryFields.f3);
    +   }
    +
    +   /**
    +    * Intermediate constructor for creating auxiliary fields.
    +    */
    +   private RowComparator(
    +           int arity,
    +           int[] keyPositions,
    +           NullAwareComparator<Object>[] comparators,
    +           TypeSerializer<Object>[] serializers,
    +           int[] normalizedKeyLengths,
    +           int numLeadingNormalizableKeys, int normalizableKeyPrefixLen, 
boolean invertNormKey) {
    +           this.arity = arity;
    +           this.keyPositions = keyPositions;
    +           this.comparators = comparators;
    +           this.serializers = serializers;
    +           this.normalizedKeyLengths = normalizedKeyLengths;
    +           this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
    +           this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
    +           this.invertNormKey = invertNormKey;
    +           this.nullMask1 = new boolean[arity];
    +           this.nullMask2 = new boolean[arity];
    +           deserializedKeyFields1 = instantiateDeserializationFields();
    +           deserializedKeyFields2 = instantiateDeserializationFields();
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Comparator Methods
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   @Override
    +   public void getFlatComparator(List<TypeComparator> flatComparators) {
    +           for (NullAwareComparator<Object> c : comparators) {
    +                   Collections.addAll(flatComparators, 
c.getFlatComparators());
    +           }
    +   }
    +
    +   @Override
    +   public int hash(Row record) {
    +           int code = 0;
    +           int i = 0;
    +
    +           try {
    +                   for (; i < keyPositions.length; i++) {
    +                           code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
    +                           Object element = 
record.getField(keyPositions[i]); // element can be null
    +                           code += comparators[i].hash(element);
    +                   }
    +           } catch (IndexOutOfBoundsException e) {
    +                   throw new KeyFieldOutOfBoundsException(keyPositions[i]);
    +           }
    +
    +           return code;
    +   }
    +
    +   @Override
    +   public void setReference(Row toCompare) {
    +           int i = 0;
    +           try {
    +                   for (; i < keyPositions.length; i++) {
    +                           TypeComparator<Object> comparator = 
comparators[i];
    +                           Object element = 
toCompare.getField(keyPositions[i]);
    +                           comparator.setReference(element);   // element 
can be null
    +                   }
    +           } catch (IndexOutOfBoundsException e) {
    +                   throw new KeyFieldOutOfBoundsException(keyPositions[i]);
    +           }
    +   }
    +
    +   @Override
    +   public boolean equalToReference(Row candidate) {
    +           int i = 0;
    +           try {
    +                   for (; i < keyPositions.length; i++) {
    +                           TypeComparator<Object> comparator = 
comparators[i];
    +                           Object element = 
candidate.getField(keyPositions[i]);   // element can be null
    +                           // check if reference is not equal
    +                           if (!comparator.equalToReference(element)) {
    +                                   return false;
    +                           }
    +                   }
    +           } catch (IndexOutOfBoundsException e) {
    +                   throw new KeyFieldOutOfBoundsException(keyPositions[i]);
    +           }
    +           return true;
    +   }
    +
    +   @Override
    +   public int compareToReference(TypeComparator<Row> referencedComparator) 
{
    +           RowComparator other = (RowComparator) referencedComparator;
    +           int i = 0;
    +           try {
    +                   for (; i < keyPositions.length; i++) {
    +                           int cmp = 
comparators[i].compareToReference(other.comparators[i]);
    +                           if (cmp != 0) {
    +                                   return cmp;
    +                           }
    +                   }
    +           } catch (IndexOutOfBoundsException e) {
    +                   throw new KeyFieldOutOfBoundsException(keyPositions[i]);
    +           }
    +           return 0;
    +   }
    +
    +   @Override
    +   public int compare(Row first, Row second) {
    +           int i = 0;
    +           try {
    +                   for (; i < keyPositions.length; i++) {
    +                           int keyPos = keyPositions[i];
    +                           TypeComparator<Object> comparator = 
comparators[i];
    +                           Object firstElement = first.getField(keyPos);   
// element can be null
    +                           Object secondElement = second.getField(keyPos); 
// element can be null
    +
    +                           int cmp = comparator.compare(firstElement, 
secondElement);
    +                           if (cmp != 0) {
    +                                   return cmp;
    +                           }
    +                   }
    +           } catch (IndexOutOfBoundsException e) {
    +                   throw new KeyFieldOutOfBoundsException(keyPositions[i]);
    +           }
    +           return 0;
    +   }
    +
    +   @Override
    +   public int compareSerialized(
    +           DataInputView firstSource,
    +           DataInputView secondSource) throws IOException {
    +
    +           int len = serializers.length;
    +           int keyLen = keyPositions.length;
    +
    +           readIntoNullMask(arity, firstSource, nullMask1);
    +           readIntoNullMask(arity, secondSource, nullMask2);
    +
    +           // deserialize
    +           for (int i = 0; i < len; i++) {
    +                   TypeSerializer<Object> serializer = serializers[i];
    +
    +                   // deserialize field 1
    +                   if (!nullMask1[i]) {
    +                           deserializedKeyFields1[i] = 
serializer.deserialize(
    +                                   deserializedKeyFields1[i],
    +                                   firstSource);
    +                   }
    +
    +                   // deserialize field 2
    +                   if (!nullMask2[i]) {
    +                           deserializedKeyFields2[i] = 
serializer.deserialize(
    +                                   deserializedKeyFields2[i],
    +                                   secondSource);
    +                   }
    +           }
    +
    +           // compare
    +           for (int i = 0; i < keyLen; i++) {
    +                   int keyPos = keyPositions[i];
    +                   TypeComparator<Object> comparator = comparators[i];
    +
    +                   boolean isNull1 = nullMask1[keyPos];
    +                   boolean isNull2 = nullMask2[keyPos];
    +
    +                   int cmp = 0;
    +                   // both values are null -> equality
    +                   if (isNull1 && isNull2) {
    +                           cmp = 0;
    +                   }
    +                   // first value is null -> inequality
    +                   else if (isNull1) {
    +                           cmp = comparator.compare(null, 
deserializedKeyFields2[keyPos]);
    +                   }
    +                   // second value is null -> inequality
    +                   else if (isNull2) {
    +                           cmp = 
comparator.compare(deserializedKeyFields1[keyPos], null);
    +                   }
    +                   // no null values
    +                   else {
    +                           cmp = comparator.compare(
    +                                   deserializedKeyFields1[keyPos],
    +                                   deserializedKeyFields2[keyPos]);
    +                   }
    +
    +                   if (cmp != 0) {
    +                           return cmp;
    +                   }
    +           }
    +
    +           return 0;
    +   }
    +
    +   @Override
    +   public boolean supportsNormalizedKey() {
    +           return numLeadingNormalizableKeys > 0;
    +   }
    +
    +   @Override
    +   public boolean supportsSerializationWithKeyNormalization() {
    +           return false;
    +   }
    +
    +   @Override
    +   public int getNormalizeKeyLen() {
    +           return normalizableKeyPrefixLen;
    +   }
    +
    +   @Override
    +   public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +           return numLeadingNormalizableKeys < keyPositions.length ||
    +                           normalizableKeyPrefixLen == Integer.MAX_VALUE ||
    +                           normalizableKeyPrefixLen > keyBytes;
    +   }
    +
    +   @Override
    +   public void putNormalizedKey(
    +           Row record, MemorySegment target, int offset, int numBytes) {
    +           int bytesLeft = numBytes;
    +           int currentOffset = offset;
    +
    +           for (int i=0;i<numLeadingNormalizableKeys && bytesLeft > 0;i++){
    +                   int len = normalizedKeyLengths[i];
    +                   len = bytesLeft >= len ? len : bytesLeft;
    +
    +                   TypeComparator<Object> comparator = comparators[i];
    +                   Object element = record.getField(keyPositions[i]);  // 
element can be null
    +                   // write key
    +                   comparator.putNormalizedKey(element, target, 
currentOffset, len);
    +
    +                   bytesLeft -= len;
    +                   currentOffset += len;
    +           }
    +
    +   }
    +
    +   @Override
    +   public void writeWithKeyNormalization(
    +           Row record,
    +           DataOutputView target) throws IOException {
    +           throw new UnsupportedOperationException(
    +                   "Record serialization with leading normalized keys not 
supported.");
    +
    +   }
    +
    +   @Override
    +   public Row readWithKeyDenormalization(Row reuse, DataInputView source) 
throws IOException {
    +           throw new UnsupportedOperationException(
    +                   "Record deserialization with leading normalized keys 
not supported.");
    +   }
    +
    +   @Override
    +   public boolean invertNormalizedKey() {
    +           return invertNormKey;
    +   }
    +
    +   @Override
    +   public TypeComparator<Row> duplicate() {
    +           NullAwareComparator<?>[] comparatorsCopy = new 
NullAwareComparator<?>[comparators.length];
    +           for (int i = 0; i < comparators.length; i++) {
    +                   comparatorsCopy[i] = (NullAwareComparator<?>) 
comparators[i].duplicate();
    +           }
    +
    +           TypeSerializer<?>[] serializersCopy = new 
TypeSerializer<?>[serializers.length];
    +           for (int i = 0; i < serializers.length; i++) {
    +                   serializersCopy[i] = serializers[i].duplicate();
    +           }
    +
    +           return new RowComparator(
    +                   arity,
    +                   keyPositions,
    +                   (NullAwareComparator<Object>[]) comparatorsCopy,
    +                   (TypeSerializer<Object>[]) serializersCopy,
    +                   normalizedKeyLengths,
    +                   numLeadingNormalizableKeys,
    +                   normalizableKeyPrefixLen,
    +                   invertNormKey);
    +   }
    +
    +   @Override
    +   public int extractKeys(Object record, Object[] target, int index) {
    +           int len = comparators.length;
    +           int localIndex = index;
    +           for(int i=0;i<len;i++){
    +                   Object element = ((Row) 
record).getField(keyPositions[i]);  // element can be null
    +                   localIndex += comparators[i].extractKeys(element, 
target, localIndex);
    +           }
    +           return localIndex - index;
    +   }
    +
    +
    +   private Object[] instantiateDeserializationFields() {
    +           Object[] newFields = new Object[serializers.length];
    +           for (int i = 0; i < serializers.length; i++) {
    +                   newFields[i] = serializers[i].createInstance();
    +           }
    +           return newFields;
    +   }
    +
    +   /**
    +    * @return creates auxiliary fields for normalized key support
    +    */
    +   private static Tuple4<int[], Integer, Integer, Boolean>
    --- End diff --
    
    Unfortunately, the generic type of Tuple can not be primitive types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to