twalthr commented on a change in pull request #12377:
URL: https://github.com/apache/flink/pull/12377#discussion_r432060612



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
##########
@@ -97,100 +98,84 @@ public ArgumentCount getArgumentCount() {
                return Optional.of(argumentDataTypes);
        }
 
-       private boolean areComparable(LogicalType firstLogicalType, LogicalType 
secondLogicalType) {
+       private boolean areComparable(LogicalType firstType, LogicalType 
secondType) {
                // A hack to support legacy types. To be removed when we drop 
the legacy types.
-               if (firstLogicalType instanceof LegacyTypeInformationType ||
-                               secondLogicalType instanceof 
LegacyTypeInformationType) {
+               if (firstType instanceof LegacyTypeInformationType ||
+                               secondType instanceof 
LegacyTypeInformationType) {
                        return true;
                }
 
                // everything is comparable with null, it should return null in 
that case
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.NULL) || 
hasRoot(secondLogicalType, LogicalTypeRoot.NULL)) {
+               if (hasRoot(firstType, LogicalTypeRoot.NULL) || 
hasRoot(secondType, LogicalTypeRoot.NULL)) {
                        return true;
                }
 
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.STRUCTURED_TYPE) 
&&
-                               hasRoot(secondLogicalType, 
LogicalTypeRoot.STRUCTURED_TYPE)) {
-                       return areStructuredTypesComparable(firstLogicalType, 
secondLogicalType);
+               if (firstType.getTypeRoot() == secondType.getTypeRoot()) {
+                       return areTypesOfSameRootComparable(firstType, 
secondType);
                }
 
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.DISTINCT_TYPE) &&
-                               hasRoot(secondLogicalType, 
LogicalTypeRoot.DISTINCT_TYPE)) {
-                       return areDistinctTypesComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.RAW) && 
hasRoot(secondLogicalType, LogicalTypeRoot.RAW)) {
-                       return areRawTypesComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.ARRAY) && 
hasRoot(secondLogicalType, LogicalTypeRoot.ARRAY)) {
-                       return areCollectionsComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.MULTISET) &&
-                       hasRoot(secondLogicalType, LogicalTypeRoot.MULTISET)) {
-                       return areCollectionsComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.MAP) &&
-                               hasRoot(secondLogicalType, 
LogicalTypeRoot.MAP)) {
-                       return areCollectionsComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (hasRoot(firstLogicalType, LogicalTypeRoot.ROW) && 
hasRoot(secondLogicalType, LogicalTypeRoot.ROW)) {
-                       return areCollectionsComparable(firstLogicalType, 
secondLogicalType);
-               }
-
-               if (firstLogicalType.getTypeRoot() == 
secondLogicalType.getTypeRoot()) {
-                       return true;
-               }
-
-               if (hasFamily(firstLogicalType, LogicalTypeFamily.NUMERIC) &&
-                               hasFamily(secondLogicalType, 
LogicalTypeFamily.NUMERIC)) {
+               if (hasFamily(firstType, LogicalTypeFamily.NUMERIC) && 
hasFamily(secondType, LogicalTypeFamily.NUMERIC)) {
                        return true;
                }
 
                // DATE + ALL TIMESTAMPS
-               if (hasFamily(firstLogicalType, LogicalTypeFamily.DATETIME) &&
-                               hasFamily(secondLogicalType, 
LogicalTypeFamily.DATETIME)) {
+               if (hasFamily(firstType, LogicalTypeFamily.DATETIME) && 
hasFamily(secondType, LogicalTypeFamily.DATETIME)) {
                        return true;
                }
 
                // VARCHAR + CHAR (we do not compare collations here)
-               if (hasFamily(firstLogicalType, 
LogicalTypeFamily.CHARACTER_STRING) &&
-                               hasFamily(secondLogicalType, 
LogicalTypeFamily.CHARACTER_STRING)) {
+               if (hasFamily(firstType, LogicalTypeFamily.CHARACTER_STRING) &&
+                               hasFamily(secondType, 
LogicalTypeFamily.CHARACTER_STRING)) {
                        return true;
                }
 
                // VARBINARY + BINARY
-               if (hasFamily(firstLogicalType, 
LogicalTypeFamily.BINARY_STRING) &&
-                               hasFamily(secondLogicalType, 
LogicalTypeFamily.BINARY_STRING)) {
+               if (hasFamily(firstType, LogicalTypeFamily.BINARY_STRING) &&
+                               hasFamily(secondType, 
LogicalTypeFamily.BINARY_STRING)) {
                        return true;
                }
 
                return false;
        }
 
-       private boolean areRawTypesComparable(LogicalType firstLogicalType, 
LogicalType secondLogicalType) {
-               return 
firstLogicalType.copy(true).equals(secondLogicalType.copy(true)) &&
-                       Comparable.class.isAssignableFrom(((RawType<?>) 
firstLogicalType).getOriginatingClass());
+       private boolean areTypesOfSameRootComparable(LogicalType firstType, 
LogicalType secondType) {
+               switch (firstType.getTypeRoot()) {
+                       case ARRAY:
+                       case MULTISET:
+                       case MAP:
+                       case ROW:
+                               return areCollectionsComparable(firstType, 
secondType);
+                       case DISTINCT_TYPE:
+                               return areDistinctTypesComparable(firstType, 
secondType);
+                       case STRUCTURED_TYPE:
+                               return areStructuredTypesComparable(firstType, 
secondType);
+                       case RAW:
+                               return areRawTypesComparable(firstType, 
secondType);
+                       default:
+                               return true;
+               }
+       }
+
+       private boolean areRawTypesComparable(LogicalType firstType, 
LogicalType secondType) {
+               return firstType.equals(secondType) &&
+                       Comparable.class.isAssignableFrom(((RawType<?>) 
firstType).getOriginatingClass());
        }
 
-       private boolean areDistinctTypesComparable(LogicalType 
firstLogicalType, LogicalType secondLogicalType) {
-               DistinctType firstDistinctType = (DistinctType) 
firstLogicalType;
-               DistinctType secondDistinctType = (DistinctType) 
secondLogicalType;
-               return 
firstLogicalType.copy(true).equals(secondLogicalType.copy(true)) &&
+       private boolean areDistinctTypesComparable(LogicalType firstType, 
LogicalType secondType) {
+               DistinctType firstDistinctType = (DistinctType) firstType;
+               DistinctType secondDistinctType = (DistinctType) secondType;
+               return firstType.equals(secondType) &&
                        areComparable(firstDistinctType.getSourceType(), 
secondDistinctType.getSourceType());

Review comment:
       @dawidwys One last minute comment: We need to remove the nullability for 
all calls of `areComparable` here and for other types. Maybe it is better to 
introduce an intermediate function call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to