yuchuanchen commented on code in PR #21563:
URL: https://github.com/apache/flink/pull/21563#discussion_r1066500768


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java:
##########
@@ -94,6 +99,217 @@ public static DataType projectRow(DataType dataType, int[] 
indexPaths) {
         return Projection.of(indexPaths).project(dataType);
     }
 
+    public static List<RowType.RowField> buildRowFields(RowType allType, 
int[][] projectedFields) {
+        final List<RowType.RowField> updatedFields = new ArrayList<>();
+        for (int[] projectedField : projectedFields) {
+            updatedFields.add(
+                    buildRowFieldInProjectFields(
+                            allType.getFields().get(projectedField[0]), 
projectedField, 0));
+        }
+        return updatedFields;
+    }
+
+    public static RowType buildRow(RowType allType, int[][] projectedFields) {
+        return new RowType(
+                allType.isNullable(), mergeRowFields(buildRowFields(allType, 
projectedFields)));
+    }
+
+    public static RowType buildRow(RowType allType, List<RowType.RowField> 
updatedFields) {
+        return new RowType(allType.isNullable(), 
mergeRowFields(updatedFields));
+    }
+
+    public static RowType.RowField buildRowFieldInProjectFields(
+            RowType.RowField rowField, int[] fields, int index) {
+        if (fields.length == 1 || index == fields.length - 1) {
+            LogicalType rowType = rowField.getType();
+            if (rowType.is(ROW)) {
+                rowType = new RowType(rowType.isNullable(), ((RowType) 
rowType).getFields());
+            }
+            return new RowField(rowField.getName(), rowType);
+        }
+        Preconditions.checkArgument(rowField.getType().is(ROW), "Row data type 
expected.");
+        final List<RowType.RowField> updatedFields = new ArrayList<>();
+        RowType rowtype = ((RowType) rowField.getType());
+        updatedFields.add(
+                buildRowFieldInProjectFields(
+                        rowtype.getFields().get(fields[index + 1]), fields, 
index + 1));
+        return new RowType.RowField(
+                rowField.getName(), new RowType(rowtype.isNullable(), 
updatedFields));
+    }
+
+    public static List<RowType.RowField> mergeRowFields(List<RowType.RowField> 
updatedFields) {
+        List<RowField> updatedFieldsCopy =
+                
updatedFields.stream().map(RowField::copy).collect(Collectors.toList());
+        final List<String> fieldNames =
+                
updatedFieldsCopy.stream().map(RowField::getName).collect(Collectors.toList());
+        if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) 
{
+            throw new ValidationException(
+                    "Field names must contain at least one non-whitespace 
character.");
+        }
+        final Set<String> duplicates =
+                fieldNames.stream()
+                        .filter(n -> Collections.frequency(fieldNames, n) > 1)
+                        .collect(Collectors.toSet());
+        if (duplicates.isEmpty()) {
+            return updatedFieldsCopy;
+        }
+        List<RowType.RowField> duplicatesFields =
+                updatedFieldsCopy.stream()
+                        .filter(f -> duplicates.contains(f.getName()))
+                        .collect(Collectors.toList());
+        updatedFieldsCopy.removeAll(duplicatesFields);
+        Map<String, List<RowField>> duplicatesMap = new HashMap<>();
+        duplicatesFields.forEach(
+                f -> {
+                    List<RowField> tmp = 
duplicatesMap.getOrDefault(f.getName(), new ArrayList<>());
+                    tmp.add(f);
+                    duplicatesMap.put(f.getName(), tmp);
+                });
+        duplicatesMap.forEach(
+                (fieldName, duplicateList) -> {
+                    List<RowField> fieldsToMerge = new ArrayList<>();
+                    for (RowField rowField : duplicateList) {
+                        Preconditions.checkArgument(
+                                rowField.getType().is(ROW), "Row data type 
expected.");
+                        RowType rowType = (RowType) rowField.getType();
+                        fieldsToMerge.addAll(rowType.getFields());
+                    }
+                    RowField mergedField =
+                            new RowField(
+                                    fieldName,
+                                    new RowType(
+                                            
duplicateList.get(0).getType().isNullable(),
+                                            mergeRowFields(fieldsToMerge)));
+                    updatedFieldsCopy.add(mergedField);
+                });
+        return updatedFieldsCopy;
+    }
+
+    public static int[][] computeProjectedFields(int[] selectFields) {
+        int[][] projectedFields = new int[selectFields.length][1];
+        for (int i = 0; i < selectFields.length; i++) {
+            projectedFields[i][0] = selectFields[i];
+        }
+        return projectedFields;
+    }
+
+    public static int[][] computeProjectedFields(
+            String[] selectedFieldNames, String[] fullFieldNames) {
+        int[] selectFields = computeTopLevelFields(selectedFieldNames, 
fullFieldNames);
+        int[][] projectedFields = new int[selectFields.length][1];
+        for (int i = 0; i < selectFields.length; i++) {
+            projectedFields[i][0] = selectFields[i];
+        }
+        return projectedFields;
+    }
+
+    public static int[] computeTopLevelFields(
+            String[] selectedFieldNames, String[] fullFieldNames) {
+        int[] selectedFields = new int[selectedFieldNames.length];
+        for (int i = 0; i < selectedFields.length; i++) {
+            String name = selectedFieldNames[i];
+            int index = Arrays.asList(fullFieldNames).indexOf(name);
+            Preconditions.checkState(
+                    index >= 0,
+                    "Produced field name %s not found in table schema fields 
%s",
+                    name,
+                    Arrays.toString(fullFieldNames));
+            selectedFields[i] = index;
+        }
+        return selectedFields;
+    }
+
+    public static Map<String, Integer> getFieldNameToIndex(List<String> 
fieldNames) {
+        Map<String, Integer> fieldNameToIndex = new HashMap<>();
+        for (int i = 0; i < fieldNames.size(); i++) {
+            fieldNameToIndex.put(fieldNames.get(i), i);
+        }
+        return fieldNameToIndex;
+    }
+
+    public static boolean isVectorizationUnsupported(LogicalType t) {
+        switch (t.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return false;
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) t;
+                return 
!canCreateColumnVectorAsChildrenType(arrayType.getElementType(), arrayType);
+            case MAP:
+                MapType mapType = (MapType) t;
+                return 
!(canCreateColumnVectorAsChildrenType(mapType.getKeyType(), mapType)
+                        && 
canCreateColumnVectorAsChildrenType(mapType.getValueType(), mapType));
+            case ROW:
+                RowType rowType = (RowType) t;
+                for (RowType.RowField rowField : rowType.getFields()) {
+                    if 
(!canCreateColumnVectorAsChildrenType(rowField.getType(), rowType)) {
+                        return true;
+                    }
+                }
+                return false;
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+            case MULTISET:
+            case DISTINCT_TYPE:
+            case STRUCTURED_TYPE:
+            case NULL:
+            case RAW:
+            case SYMBOL:
+            default:
+                return true;
+        }
+    }
+
+    public static boolean canCreateColumnVectorAsChildrenType(LogicalType t, 
LogicalType parent) {
+        switch (t.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case DOUBLE:
+            case FLOAT:
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case BIGINT:
+            case SMALLINT:
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case DECIMAL:
+                return true;
+            case ROW:
+                RowType rowType = (RowType) t;
+                if (!parent.getTypeRoot().equals(ROW)) {

Review Comment:
   To exclude the situation such as Array<Row>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to