danny0405 commented on a change in pull request #11950:
URL: https://github.com/apache/flink/pull/11950#discussion_r421214451
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -279,8 +278,61 @@ private Operation convertAlterTable(SqlAlterTable
sqlAlterTable) {
throw new
ValidationException(String.format("Table %s doesn't exist or is a temporary
table.",
tableIdentifier.toString()));
}
+ } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint)
{
+ Optional<CatalogManager.TableLookupResult>
optionalCatalogTable =
+
catalogManager.getTable(tableIdentifier);
+ if (optionalCatalogTable.isPresent() &&
!optionalCatalogTable.get().isTemporary()) {
+ SqlTableConstraint constraint =
((SqlAlterTableAddConstraint) sqlAlterTable)
+ .getConstraint();
+ validateTableConstraint(constraint);
+ CatalogTable oriCatalogTable = (CatalogTable)
optionalCatalogTable.get().getTable();
+ TableSchema.Builder builder = TableSchemaUtils
+
.builderWithGivenSchema(oriCatalogTable.getSchema());
+ if (constraint.getConstraintName() != null) {
+
builder.primaryKey(constraint.getConstraintName(), constraint.getColumnNames());
Review comment:
The builder will finally validate the nullability.
##########
File path:
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
##########
@@ -164,23 +168,35 @@ public void validate() throws SqlValidateException {
validator.addColumn(column);
}
- for (SqlNode primaryKeyNode : this.primaryKeyList) {
- String primaryKey = ((SqlIdentifier)
primaryKeyNode).getSimple();
- if (!validator.contains(primaryKey)) {
- throw new SqlValidateException(
- primaryKeyNode.getParserPosition(),
- "Primary key [" + primaryKey + "] not
defined in columns, at " +
-
primaryKeyNode.getParserPosition());
+ // Validate table constraints.
+ boolean pkDefined = false;
+ Set<String> constraintNames = new HashSet<>();
+ for (SqlTableConstraint constraint : getFullConstraints()) {
+ String constraintName = constraint.getConstraintName();
+ // Validate constraint name should be unique.
+ if (constraintName != null &&
!constraintNames.add(constraintName)) {
+ throw new
SqlValidateException(constraint.getParserPosition(),
+ String.format("Duplicate
definition for constraint [%s]", constraintName));
}
- }
-
- for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
- for (SqlNode uniqueKeyNode : uniqueKeys) {
- String uniqueKey = ((SqlIdentifier)
uniqueKeyNode).getSimple();
- if (!validator.contains(uniqueKey)) {
- throw new SqlValidateException(
-
uniqueKeyNode.getParserPosition(),
- "Unique key [" +
uniqueKey + "] not defined in columns, at " +
uniqueKeyNode.getParserPosition());
+ // Validate primary key definition should be unique.
+ if (constraint.isPrimaryKey()) {
+ if (pkDefined) {
+ throw new
SqlValidateException(constraint.getParserPosition(),
+ "Duplicate primary key
definition");
+ } else {
+ pkDefined = true;
+ }
+ }
+ // Validate the key field exists.
+ if (constraint.isTableConstraint()) {
+ for (SqlNode column : constraint.getColumns()) {
Review comment:
primary key on computed column is not supported from the parser, i would
add a test.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -919,6 +919,20 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends
AbstractTestBase {
.getTable(new ObjectPath(tableEnv.getCurrentDatabase, "t2"))
.getProperties
assertEquals(expectedProperties, properties)
+ val currentCatalog = tableEnv.getCurrentCatalog
+ val currentDB = tableEnv.getCurrentDatabase
+ tableEnv.sqlUpdate("alter table t2 add constraint ct1 primary key(a) not
enforced")
Review comment:
It did fail. sqlUpdate is not deprecated, all the left tests use that,
use a single executeSql seems weird.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -612,10 +668,14 @@ private TableSchema createTableSchema(SqlCreateTable
sqlCreateTable) {
if (node instanceof SqlTableColumn) {
SqlTableColumn column = (SqlTableColumn) node;
RelDataType relType = column.getType()
- .deriveType(validator,
column.getType().getNullable());
+ .deriveType(validator);
+ RelDataType colType = validator.getTypeFactory()
+ .createTypeWithNullability(
+ relType,
+
sqlCreateTable.isColumnNullable(column));
Review comment:
I have some different thoughts, we should make the schema builder strict
and simple for nullability, that means we should not modify the nullability if
there is already a definition there, the only thing builder should do is
validation if the building is valid.
Different cases have varadic requests for nullability or other constraints,
so let the builder invoker to keep the nullability correct is more reasonable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]