KurtYoung commented on a change in pull request #9952:
[FLINK-14321][sql-parser] Support to parse watermark statement in SQL DDL
URL: https://github.com/apache/flink/pull/9952#discussion_r338435922
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
##########
@@ -310,9 +322,67 @@ private void printIndent(SqlWriter writer) {
public List<SqlNode> columnList = new ArrayList<>();
public SqlNodeList primaryKeyList = SqlNodeList.EMPTY;
public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+ @Nullable public SqlWatermark watermark;
}
public String[] fullTableName() {
return tableName.names.toArray(new String[0]);
}
+
+ //
-------------------------------------------------------------------------------------
+
+ private static final class ColumnValidator {
+
+ private final Set<String> allColumnNames = new HashSet<>();
+
+ /**
+ * Adds column name to the registered column set. This will add
nested column names recursive.
+ * Nested column names are qualified using "." separator.
+ */
+ public void addColumn(SqlNode column) throws
SqlValidateException {
+ String columnName;
+ if (column instanceof SqlTableColumn) {
+ SqlTableColumn tableColumn = (SqlTableColumn)
column;
+ columnName = tableColumn.getName().getSimple();
+ addNestedColumn(columnName,
tableColumn.getType());
+ } else if (column instanceof SqlBasicCall) {
+ SqlBasicCall tableColumn = (SqlBasicCall)
column;
+ columnName =
tableColumn.getOperands()[1].toString();
+ } else {
+ throw new
UnsupportedOperationException("Unsupported column:" + column);
+ }
+
+ addColumnName(columnName, column.getParserPosition());
+ }
+
+ /**
+ * Returns true if the column name is existed in the registered
column set.
+ * This supports qualified column name using "." separator.
+ */
+ public boolean contains(String columnName) {
+ return allColumnNames.contains(columnName);
+ }
+
+ private void addNestedColumn(String columnName, SqlDataTypeSpec
columnType) throws SqlValidateException {
+ SqlTypeNameSpec typeName = columnType.getTypeNameSpec();
+ // validate composite type
+ if (typeName instanceof ExtendedSqlRowTypeNameSpec) {
Review comment:
I think this is a design choice. The motivation is we want to make sure the
event time column exists when defining watermark. So we lean to only support
the types which we can get enough information to do the static check. `ROW` and
other basic types are allowed to do static check but not for collection types
such as `ARRAY` and `MAP`, since you never know whether if `array[1]` or
`map['key1']` really exist or not.
It's just a safe choice, nothing else.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services