[ 
https://issues.apache.org/jira/browse/FLINK-2678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126975#comment-15126975
 ] 

ASF GitHub Bot commented on FLINK-2678:
---------------------------------------

Github user sbcd90 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51476173
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java
 ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils.base;
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Array;
    +import java.util.Arrays;
    +
    +public class GenericArrayComparator<T> extends TypeComparator<T[]> 
implements java.io.Serializable {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   private transient T[] reference;
    +
    +   protected final boolean ascendingComparison;
    +
    +   private final TypeSerializer<T[]> serializer;
    +
    +   // For use by getComparators
    +   @SuppressWarnings("rawtypes")
    +   private final TypeComparator[] comparators = new TypeComparator[] 
{this};
    +
    +   public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> 
serializer) {
    +           this.ascendingComparison = ascending;
    +           this.serializer = serializer;
    +   }
    +
    +   @Override
    +   public void setReference(T[] reference) {
    +           this.reference = reference;
    +   }
    +
    +   @Override
    +   public boolean equalToReference(T[] candidate) {
    +           return compare(this.reference, candidate) == 0;
    +   }
    +
    +   @Override
    +   public int compareToReference(TypeComparator<T[]> referencedComparator) 
{
    +           int comp = compare(((GenericArrayComparator<T>) 
referencedComparator).reference, reference);
    +           return ascendingComparison ? comp : -comp;
    +   }
    +
    +   @Override
    +   public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
    +           T[] firstArray = serializer.deserialize(firstSource);
    +           T[] secondArray = serializer.deserialize(secondSource);
    +
    +           int comp = compare(firstArray, secondArray);
    +           return ascendingComparison ? comp : -comp;
    +   }
    +
    +   @Override
    +   public int extractKeys(Object record, Object[] target, int index) {
    +           target[index] = record;
    +           return 1;
    +   }
    +
    +   @Override
    +   public TypeComparator[] getFlatComparators() {
    +           return comparators;
    +   }
    +
    +   @Override
    +   public boolean supportsNormalizedKey() {
    +           return false;
    +   }
    +
    +   @Override
    +   public boolean supportsSerializationWithKeyNormalization() {
    +           return false;
    +   }
    +
    +   @Override
    +   public int getNormalizeKeyLen() {
    +           return 0;
    +   }
    +
    +   @Override
    +   public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
    +           throw new UnsupportedOperationException();
    +   }
    +
    +   @Override
    +   public void putNormalizedKey(T[] record, MemorySegment target, int 
offset, int numBytes) {
    +           throw new UnsupportedOperationException();
    +   }
    +
    +   @Override
    +   public void writeWithKeyNormalization(T[] record, DataOutputView 
target) throws IOException {
    +           throw new UnsupportedOperationException();
    +   }
    +
    +   @Override
    +   public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) 
throws IOException {
    +           throw new UnsupportedOperationException();
    +   }
    +
    +   @Override
    +   public boolean invertNormalizedKey() {
    +           return !ascendingComparison;
    +   }
    +
    +   @Override
    +   public int hash(T[] record) {
    +           return Arrays.hashCode(record);
    +   }
    +
    +   private int compareValues(Object first, Object second) {
    +           if (first.getClass().equals(Boolean.class) && 
second.getClass().equals(Boolean.class)) {
    +                   return new BooleanComparator(true).compare((Boolean) 
first, (Boolean) second);
    +           }
    +           else if (first.getClass().equals(Byte.class) && 
second.getClass().equals(Byte.class)) {
    +                   return new ByteComparator(true).compare((Byte) first, 
(Byte) second);
    +           }
    +           else if (first.getClass().equals(Character.class) && 
second.getClass().equals(Character.class)) {
    +                   return new CharComparator(true).compare((Character) 
first, (Character) second);
    +           }
    +           else if (first.getClass().equals(Double.class) && 
second.getClass().equals(Double.class)) {
    +                   return new DoubleComparator(true).compare((Double) 
first, (Double) second);
    +           }
    +           else if (first.getClass().equals(Float.class) && 
second.getClass().equals(Float.class)) {
    +                   return new FloatComparator(true).compare((Float) first, 
(Float) second);
    +           }
    +           else if (first.getClass().equals(Integer.class) && 
second.getClass().equals(Integer.class)) {
    +                   return new IntComparator(true).compare((Integer) first, 
(Integer) second);
    +           }
    +           else if (first.getClass().equals(Long.class) && 
second.getClass().equals(Long.class)) {
    +                   return new LongComparator(true).compare((Long) first, 
(Long) second);
    +           }
    +           else if (first.getClass().equals(Short.class) && 
second.getClass().equals(Short.class)) {
    +                   return new ShortComparator(true).compare((Short) first, 
(Short) second);
    +           }
    +           else if (first.getClass().equals(String.class) && 
second.getClass().equals(String.class)) {
    +                   return new StringComparator(true).compare((String) 
first, (String) second);
    +           }
    +           return -1;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private int parseGenericArray(Object firstArray, Object secondArray) {
    +           int compareResult = 0;
    +           if (firstArray.getClass().isArray() && 
secondArray.getClass().isArray()) {
    +                   int min = Array.getLength(firstArray);
    +                   int tempResult = 0;
    +
    +                   if (min < Array.getLength(secondArray)) {
    +                           tempResult = -1;
    +                   }
    +                   if (min > Array.getLength(secondArray)) {
    +                           min = Array.getLength(secondArray);
    +                           tempResult = 1;
    +                   }
    +
    +                   for(int i=0; i < min; i++) {
    +                           int val = 
parseGenericArray(Array.get(firstArray, i), Array.get(secondArray, i));
    +                           if (val != 0 && compareResult == 0) {
    +                                   compareResult = val;
    +                           }
    +                   }
    +
    +                   if (compareResult == 0) {
    +                           compareResult = tempResult;
    +                   }
    +           }
    +           else {
    +                   compareResult = compareValues(firstArray, secondArray);
    +           }
    +           return compareResult;
    +   }
    +
    +   @Override
    +   public int compare(T[] first, T[] second) {
    --- End diff --
    
    `ObjectArrayTypeInfo.componentInfo` is of type `TypeInformation<T>` & 
accessing a method like `createComparator` from one of the implementing classes 
of `TypeInformation<T>` is difficult. One possible solution is to have a static 
map like `TYPES` which maps classes to comparator classes. From 
`ObjectArrayTypeInfo.componentInfo` I get the typeClass & access the 
corresponding comparator from the map. Is there any other better solution?


> DataSet API does not support multi-dimensional arrays as keys
> -------------------------------------------------------------
>
>                 Key: FLINK-2678
>                 URL: https://issues.apache.org/jira/browse/FLINK-2678
>             Project: Flink
>          Issue Type: Wish
>          Components: DataSet API
>            Reporter: Till Rohrmann
>            Assignee: Subhobrata Dey
>            Priority: Minor
>
> The DataSet API does not support grouping/sorting on field which are 
> multi-dimensional arrays. It could be helpful to also support these types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to