Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1566#discussion_r51997240
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 ---
    @@ -72,15 +77,59 @@ public int getTotalFields() {
     
        @Override
        public boolean isKeyType() {
    -           return false;
    +           return true;
        }
     
        @SuppressWarnings("unchecked")
        @Override
        public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return (TypeSerializer<T>) new GenericArraySerializer<C>(
    -                   componentInfo.getTypeClass(),
    -                   componentInfo.createSerializer(executionConfig));
    +                           componentInfo.getTypeClass(),
    +                           
componentInfo.createSerializer(executionConfig));
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private TypeComparator<? super Object> 
getBaseComparatorInfo(TypeInformation<? extends Object> componentInfo, boolean 
sortOrderAscending, ExecutionConfig executionConfig) {
    +           /**
    +            * method tries to find out the Comparator to be used to 
compare each element (of primitive type or composite type) of the provided 
Object arrays.
    +            */
    +           if (componentInfo instanceof ObjectArrayTypeInfo) {
    +                   return getBaseComparatorInfo(((ObjectArrayTypeInfo) 
componentInfo).getComponentInfo(), sortOrderAscending, executionConfig);
    +           }
    +           else if (componentInfo instanceof PrimitiveArrayTypeInfo) {
    +                   return getBaseComparatorInfo(((PrimitiveArrayTypeInfo<? 
extends Object>) componentInfo).getComponentType(), sortOrderAscending, 
executionConfig);
    +           }
    +           else {
    +                   if (componentInfo instanceof AtomicType) {
    +                           return ((AtomicType<? super Object>) 
componentInfo).createComparator(sortOrderAscending, executionConfig);
    +                   }
    +                   else if (componentInfo instanceof CompositeType) {
    +                           int componentArity = ((CompositeType<? extends 
Object>) componentInfo).getArity();
    +                           int [] logicalKeyFields = new 
int[componentArity];
    +                           boolean[] orders = new boolean[componentArity];
    +
    +                           for (int i=0;i < componentArity;i++) {
    +                                   logicalKeyFields[i] = i;
    +                                   orders[i] = sortOrderAscending;
    +                           }
    +
    +                           return ((CompositeType<? super Object>) 
componentInfo).createComparator(logicalKeyFields, orders, 0, executionConfig);
    +                   }
    +                   else {
    +                           throw new IllegalArgumentException("Could not 
add a comparator for the component type " + componentInfo.getClass().getName());
    +                   }
    +           }
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
    +
    +           return (TypeComparator<T>) new ObjectArrayComparator<T,C>(
    +                   sortOrderAscending,
    +                   (GenericArraySerializer<T>) 
createSerializer(executionConfig),
    --- End diff --
    
    Why this cast here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to