KurtYoung commented on a change in pull request #10098: 
[FLINK-14326][FLINK-14324][table] Support to apply watermark assigner in planner
URL: https://github.com/apache/flink/pull/10098#discussion_r343430509
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 ##########
 @@ -205,29 +205,43 @@ private Operation convertSqlQuery(SqlNode node) {
         * @return TableSchema
         */
        private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
-               // setup table columns
-               SqlNodeList columnList = sqlCreateTable.getColumnList();
-               TableSchema physicalSchema = null;
-               TableSchema.Builder builder = new TableSchema.Builder();
-               // collect the physical table schema first.
-               final List<SqlNode> physicalColumns = 
columnList.getList().stream()
-                       .filter(n -> n instanceof 
SqlTableColumn).collect(Collectors.toList());
-               for (SqlNode node : physicalColumns) {
-                       SqlTableColumn column = (SqlTableColumn) node;
-                       final RelDataType relType = column.getType()
-                               .deriveType(
-                                       flinkPlanner.getOrCreateSqlValidator(),
-                                       column.getType().getNullable());
-                       builder.field(column.getName().getSimple(),
-                               
LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
-                                       
FlinkTypeFactory.toLogicalType(relType)));
-                       physicalSchema = builder.build();
-               }
-               assert physicalSchema != null;
                if (sqlCreateTable.containsComputedColumn()) {
                        throw new SqlConversionException("Computed columns for 
DDL is not supported yet!");
                }
-               return physicalSchema;
+               TableSchema.Builder builder = new TableSchema.Builder();
+               SqlValidator validator = flinkPlanner.getOrCreateSqlValidator();
+               // setup table columns
+               SqlNodeList columnList = sqlCreateTable.getColumnList();
+               Map<String, RelDataType> nameToTypeMap = new HashMap<>();
+               for (SqlNode node : columnList.getList()) {
+                       if (node instanceof SqlTableColumn) {
+                               SqlTableColumn column = (SqlTableColumn) node;
+                               RelDataType relType = column.getType()
+                                       .deriveType(validator, 
column.getType().getNullable());
+                               String name = column.getName().getSimple();
+                               nameToTypeMap.put(name, relType);
+                               DataType dataType = 
LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+                                       
FlinkTypeFactory.toLogicalType(relType));
+                               builder.field(name, dataType);
+                       } else if (node instanceof SqlBasicCall) {
+                               // TODO: computed column ...
+                       }
+               }
+
+               // put watermark information into TableSchema
+               sqlCreateTable.getWatermark().ifPresent(watermark -> {
+                       String rowtimeAttribute = 
watermark.getEventTimeColumnName().toString();
+                       SqlNode expression = watermark.getWatermarkStrategy();
+                       // this will validate and expand function identifiers.
+                       SqlNode validated = 
validator.validateParameterizedExpression(expression, nameToTypeMap);
+                       RelDataType validatedType = 
validator.getValidatedNodeType(validated);
+                       DataType exprDataType = 
LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to