KurtYoung commented on a change in pull request #9952: 
[FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
URL: https://github.com/apache/flink/pull/9952#discussion_r338435922
 
 

 ##########
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 ##########
 @@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) {
                public List<SqlNode> columnList = new ArrayList<>();
                public SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
                public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+               @Nullable public SqlWatermark watermark;
        }
 
        public String[] fullTableName() {
                return tableName.names.toArray(new String[0]);
        }
+
+       // 
-------------------------------------------------------------------------------------
+
+       private static final class ColumnValidator {
+
+               private final Set<String> allColumnNames = new HashSet<>();
+
+               /**
+                * Adds column name to the registered column set. This will add 
nested column names recursive.
+                * Nested column names are qualified using "." separator.
+                */
+               public void addColumn(SqlNode column) throws 
SqlValidateException {
+                       String columnName;
+                       if (column instanceof SqlTableColumn) {
+                               SqlTableColumn tableColumn = (SqlTableColumn) 
column;
+                               columnName = tableColumn.getName().getSimple();
+                               addNestedColumn(columnName, 
tableColumn.getType());
+                       } else if (column instanceof SqlBasicCall) {
+                               SqlBasicCall tableColumn = (SqlBasicCall) 
column;
+                               columnName = 
tableColumn.getOperands()[1].toString();
+                       } else {
+                               throw new 
UnsupportedOperationException("Unsupported column:" + column);
+                       }
+
+                       addColumnName(columnName, column.getParserPosition());
+               }
+
+               /**
+                * Returns true if the column name is existed in the registered 
column set.
+                * This supports qualified column name using "." separator.
+                */
+               public boolean contains(String columnName) {
+                       return allColumnNames.contains(columnName);
+               }
+
+               private void addNestedColumn(String columnName, SqlDataTypeSpec 
columnType) throws SqlValidateException {
+                       SqlTypeNameSpec typeName = columnType.getTypeNameSpec();
+                       // validate composite type
+                       if (typeName instanceof ExtendedSqlRowTypeNameSpec) {
 
 Review comment:
   I think this is a design choice. The motivation is we want to make sure the 
event time column exists when defining watermark. So we lean to only support 
the types which we can get enough information to do the static check. `ROW` and 
other basic types are allowed to do static check but not for collection types 
such as `ARRAY` and `MAP`, since you never know whether if `array[1]` or  
`map['key1']` really exist or not. 
   
   It's just a safe choice, nothing else.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to