danny0405 commented on a change in pull request #11950:
URL: https://github.com/apache/flink/pull/11950#discussion_r421214451



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -279,8 +278,61 @@ private Operation convertAlterTable(SqlAlterTable 
sqlAlterTable) {
                                throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
                                        tableIdentifier.toString()));
                        }
+               } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) 
{
+                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
+                                       
catalogManager.getTable(tableIdentifier);
+                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
+                               SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
+                                               .getConstraint();
+                               validateTableConstraint(constraint);
+                               CatalogTable oriCatalogTable = (CatalogTable) 
optionalCatalogTable.get().getTable();
+                               TableSchema.Builder builder = TableSchemaUtils
+                                               
.builderWithGivenSchema(oriCatalogTable.getSchema());
+                               if (constraint.getConstraintName() != null) {
+                                       
builder.primaryKey(constraint.getConstraintName(), constraint.getColumnNames());

Review comment:
       The builder will finally validate the nullability.

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
##########
@@ -164,23 +168,35 @@ public void validate() throws SqlValidateException {
                        validator.addColumn(column);
                }
 
-               for (SqlNode primaryKeyNode : this.primaryKeyList) {
-                       String primaryKey = ((SqlIdentifier) 
primaryKeyNode).getSimple();
-                       if (!validator.contains(primaryKey)) {
-                               throw new SqlValidateException(
-                                       primaryKeyNode.getParserPosition(),
-                                       "Primary key [" + primaryKey + "] not 
defined in columns, at " +
-                                               
primaryKeyNode.getParserPosition());
+               // Validate table constraints.
+               boolean pkDefined = false;
+               Set<String> constraintNames = new HashSet<>();
+               for (SqlTableConstraint constraint : getFullConstraints()) {
+                       String constraintName = constraint.getConstraintName();
+                       // Validate constraint name should be unique.
+                       if (constraintName != null && 
!constraintNames.add(constraintName)) {
+                               throw new 
SqlValidateException(constraint.getParserPosition(),
+                                               String.format("Duplicate 
definition for constraint [%s]", constraintName));
                        }
-               }
-
-               for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
-                       for (SqlNode uniqueKeyNode : uniqueKeys) {
-                               String uniqueKey = ((SqlIdentifier) 
uniqueKeyNode).getSimple();
-                               if (!validator.contains(uniqueKey)) {
-                                       throw new SqlValidateException(
-                                                       
uniqueKeyNode.getParserPosition(),
-                                                       "Unique key [" + 
uniqueKey + "] not defined in columns, at " + 
uniqueKeyNode.getParserPosition());
+                       // Validate primary key definition should be unique.
+                       if (constraint.isPrimaryKey()) {
+                               if (pkDefined) {
+                                       throw new 
SqlValidateException(constraint.getParserPosition(),
+                                                       "Duplicate primary key 
definition");
+                               } else {
+                                       pkDefined = true;
+                               }
+                       }
+                       // Validate the key field exists.
+                       if (constraint.isTableConstraint()) {
+                               for (SqlNode column : constraint.getColumns()) {

Review comment:
       primary key on computed column is not supported from the parser, i would 
add a test.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -919,6 +919,20 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends 
AbstractTestBase {
       .getTable(new ObjectPath(tableEnv.getCurrentDatabase, "t2"))
       .getProperties
     assertEquals(expectedProperties, properties)
+    val currentCatalog = tableEnv.getCurrentCatalog
+    val currentDB = tableEnv.getCurrentDatabase
+    tableEnv.sqlUpdate("alter table t2 add constraint ct1 primary key(a) not 
enforced")

Review comment:
       It did fail. sqlUpdate is not deprecated, all the left tests use that, 
use a single executeSql seems weird.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -612,10 +668,14 @@ private TableSchema createTableSchema(SqlCreateTable 
sqlCreateTable) {
                        if (node instanceof SqlTableColumn) {
                                SqlTableColumn column = (SqlTableColumn) node;
                                RelDataType relType = column.getType()
-                                       .deriveType(validator, 
column.getType().getNullable());
+                                       .deriveType(validator);
+                               RelDataType colType = validator.getTypeFactory()
+                                               .createTypeWithNullability(
+                                                               relType,
+                                                               
sqlCreateTable.isColumnNullable(column));

Review comment:
       I have some different thoughts, we should make the schema builder strict 
and simple for nullability, that means we should not modify the nullability if 
there is already a definition there, the only thing builder should do is 
validation if the building is valid.
   
   Different cases have varadic requests for nullability or other constraints, 
so let the builder invoker to keep the nullability correct is more reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to