KurtYoung commented on a change in pull request #10096: 
[FLINK-14623][table-api] Add computed column information into TableSc…
URL: https://github.com/apache/flink/pull/10096#discussion_r343079229
 
 

 ##########
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##########
 @@ -346,22 +322,137 @@ public static Builder builder() {
                return new Builder();
        }
 
+       //~ Tools 
------------------------------------------------------------------
+
+       /**
+        * Tools method to transform arrays of table names and types
+        * into a {@link TableColumn} list.
+        */
+       private static List<TableColumn> createTableColumns(
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+                       DataType[] fieldDataTypes = 
fromLegacyInfoToDataType(fieldTypes);
+                       validateFields(fieldNames, fieldDataTypes);
+                       List<TableColumn> columns = new ArrayList<>();
+               for (int i = 0; i < fieldNames.length; i++) {
+                       columns.add(TableColumn.of(fieldNames[i], 
fieldDataTypes[i]));
+               }
+               return columns;
+       }
+
+       /** Fields sanity check. */
+       private static void validateFields(String[] fieldNames, DataType[] 
fieldTypes) {
+               if (fieldNames.length != fieldTypes.length) {
+                       throw new ValidationException(
+                               "Number of field names and field data types 
must be equal.\n" +
+                                       "Number of names is " + 
fieldNames.length +
+                                       ", number of data types is " + 
fieldTypes.length + ".\n" +
+                                       "List of field names: " + 
Arrays.toString(fieldNames) + "\n" +
+                                       "List of field data types: " + 
Arrays.toString(fieldTypes));
+               }
+               // validate and create name to index mapping
+               final Set<String> duplicateNames = new HashSet<>();
+               final Set<String> uniqueNames = new HashSet<>();
+               for (final String fieldName : fieldNames) {
+                       // check uniqueness of field names
+                       if (uniqueNames.contains(fieldName)) {
+                               duplicateNames.add(fieldName);
+                       } else {
+                               uniqueNames.add(fieldName);
+                       }
+               }
+               if (!duplicateNames.isEmpty()) {
+                       throw new ValidationException(
+                               "Field names must be unique.\n" +
+                                       "List of duplicate fields: " + 
duplicateNames.toString() + "\n" +
+                                       "List of all fields: " + 
Arrays.toString(fieldNames));
+               }
+       }
+
+       /** Watermark specification sanity check. */
+       private static void validateWatermarkSpecs(List<TableColumn> columns,
+                       List<WatermarkSpec> watermarkSpecs) {
+               // Validate and create name to type mapping.
+               // Field name to data type mapping, we need this because the 
row time attribute
+               // field can be nested.
+               final Map<String, DataType> fieldNameToType = new HashMap<>();
+               for (TableColumn column : columns) {
+                       validateAndCreateNameToTypeMapping(fieldNameToType,
+                               column.getName(),
+                               column.getType(),
+                               "");
+               }
+
+               // Validate watermark and rowtime attribute.
+               for (WatermarkSpec watermark : watermarkSpecs) {
+                       String rowtimeAttribute = 
watermark.getRowtimeAttribute();
+                       DataType rowtimeType = 
Optional.ofNullable(fieldNameToType.get(rowtimeAttribute))
+                               .orElseThrow(() -> new 
ValidationException(String.format(
+                                       "Rowtime attribute '%s' is not defined 
in schema.", rowtimeAttribute)));
+                       if (rowtimeType.getLogicalType().getTypeRoot() != 
TIMESTAMP_WITHOUT_TIME_ZONE) {
+                               throw new ValidationException(String.format(
+                                       "Rowtime attribute '%s' must be of type 
TIMESTAMP but is of type '%s'.",
+                                       rowtimeAttribute, rowtimeType));
+                       }
+                       LogicalType watermarkOutputType = 
watermark.getWatermarkExprOutputType().getLogicalType();
+                       if (watermarkOutputType.getTypeRoot() != 
TIMESTAMP_WITHOUT_TIME_ZONE) {
+                               throw new ValidationException(String.format(
+                                       "Watermark strategy '%s' must be of 
type TIMESTAMP but is of type '%s'.",
+                                       
watermark.getWatermarkExpressionString(),
+                                       
watermarkOutputType.asSerializableString()));
+                       }
+               }
+       }
+
+       /**
+        * Creates a mapping from field name to data type, the field name can 
be a nested field.
+        * This is mainly used for validating whether the rowtime attribute 
(might be nested) exists
+        * in the schema. During creating, it also validates whether there is 
duplicate field names.
+        *
+        * <p>For example, a "f0" field of ROW type has two nested fields "q1" 
and "q2". Then the
+        * mapping will be ["f0" -> ROW, "f0.q1" -> INT, "f0.q2" -> STRING].
+        *
+        * <pre>
+        * {@code
+        *     f0 ROW<q1 INT, q2 STRING>
+        * }
+        * </pre>
+        *
+        * @param fieldNameToType Field name to type mapping that to update
+        * @param fieldName       Name of this field, e.g. "q1" or "q2" in the 
above example
+        * @param fieldType       Data type of this field
+        * @param parentFieldName Field name of parent type, e.g. "f0" in the 
above example
+        */
+       private static void validateAndCreateNameToTypeMapping(
+               Map<String, DataType> fieldNameToType,
 
 Review comment:
   format style

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to