wuchong commented on a change in pull request #10500: 
[FLINK-15161][table-common] Introduce TypeTransformation interface and basic 
transformations
URL: https://github.com/apache/flink/pull/10500#discussion_r356960766
 
 

 ##########
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 ##########
 @@ -36,7 +52,101 @@ public static DataType replaceLogicalType(DataType 
dataType, LogicalType replace
                        .bridgedTo(dataType.getConversionClass());
        }
 
+       /**
+        * Transforms the given data type (can be nested) to a different data 
type using the given
+        * transformations. The given transformations will be called in order.
+        *
+        * @param typeToTransform data type to be transformed.
+        * @param transformations the transformations to transform data type to 
another type.
+        * @return the new data type,
+        */
+       public static DataType transform(DataType typeToTransform, 
TypeTransformation... transformations) {
+               Preconditions.checkArgument(transformations.length > 0, 
"transformations should not be empty.");
+               DataType newType = typeToTransform;
+               for (TypeTransformation transformation : transformations) {
+                       newType = newType.accept(new 
DataTypeTransformer(transformation));
+               }
+               return newType;
+       }
+
        private DataTypeUtils() {
                // no instantiation
        }
+
+
+       // 
------------------------------------------------------------------------------------------
+
+       private static class DataTypeTransformer implements 
DataTypeVisitor<DataType> {
+
+               private final TypeTransformation transformation;
+
+               private DataTypeTransformer(TypeTransformation transformation) {
+                       this.transformation = transformation;
+               }
+
+               @Override
+               public DataType visit(AtomicDataType atomicDataType) {
+                       return transformation.transform(atomicDataType);
+               }
+
+               @Override
+               public DataType visit(CollectionDataType collectionDataType) {
+                       DataType newElementType = 
collectionDataType.getElementDataType().accept(this);
+                       LogicalType logicalType = 
collectionDataType.getLogicalType();
+                       LogicalType newLogicalType;
+                       if (logicalType instanceof ArrayType) {
+                               newLogicalType = new ArrayType(
 
 Review comment:
   We have to create a new logical type, because users may transform the 
original type to another logical type, e.g. `DataTypes.INT()` -> 
`DataTypes.TIMESTAMP()`. From the interface of `TypeTransformation`, we didn't 
stop users to do this. 
   
   For collection types, 
    `CollectionDataType(ArrayType(elementLogicalType), elementDataType)`, if 
the `elementDataType` is changed, we have to update the corresponding 
`elementLogicalType` to reflect the truth.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to