Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1566#discussion_r51997240
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
---
@@ -72,15 +77,59 @@ public int getTotalFields() {
@Override
public boolean isKeyType() {
- return false;
+ return true;
}
@SuppressWarnings("unchecked")
@Override
public TypeSerializer<T> createSerializer(ExecutionConfig
executionConfig) {
return (TypeSerializer<T>) new GenericArraySerializer<C>(
- componentInfo.getTypeClass(),
- componentInfo.createSerializer(executionConfig));
+ componentInfo.getTypeClass(),
+
componentInfo.createSerializer(executionConfig));
+ }
+
+ @SuppressWarnings("unchecked")
+ private TypeComparator<? super Object>
getBaseComparatorInfo(TypeInformation<? extends Object> componentInfo, boolean
sortOrderAscending, ExecutionConfig executionConfig) {
+ /**
+ * method tries to find out the Comparator to be used to
compare each element (of primitive type or composite type) of the provided
Object arrays.
+ */
+ if (componentInfo instanceof ObjectArrayTypeInfo) {
+ return getBaseComparatorInfo(((ObjectArrayTypeInfo)
componentInfo).getComponentInfo(), sortOrderAscending, executionConfig);
+ }
+ else if (componentInfo instanceof PrimitiveArrayTypeInfo) {
+ return getBaseComparatorInfo(((PrimitiveArrayTypeInfo<?
extends Object>) componentInfo).getComponentType(), sortOrderAscending,
executionConfig);
+ }
+ else {
+ if (componentInfo instanceof AtomicType) {
+ return ((AtomicType<? super Object>)
componentInfo).createComparator(sortOrderAscending, executionConfig);
+ }
+ else if (componentInfo instanceof CompositeType) {
+ int componentArity = ((CompositeType<? extends
Object>) componentInfo).getArity();
+ int [] logicalKeyFields = new
int[componentArity];
+ boolean[] orders = new boolean[componentArity];
+
+ for (int i=0;i < componentArity;i++) {
+ logicalKeyFields[i] = i;
+ orders[i] = sortOrderAscending;
+ }
+
+ return ((CompositeType<? super Object>)
componentInfo).createComparator(logicalKeyFields, orders, 0, executionConfig);
+ }
+ else {
+ throw new IllegalArgumentException("Could not
add a comparator for the component type " + componentInfo.getClass().getName());
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeComparator<T> createComparator(boolean sortOrderAscending,
ExecutionConfig executionConfig) {
+
+ return (TypeComparator<T>) new ObjectArrayComparator<T,C>(
+ sortOrderAscending,
+ (GenericArraySerializer<T>)
createSerializer(executionConfig),
--- End diff --
Why this cast here?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---