lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858260447


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
                         catalogTable.getComment()));
     }
 
+    public static Operation convertAlterTableDropConstraint(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            SqlAlterTableDropConstraint alterTableDropConstraint) {
+        boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+        Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+                catalogTable.getUnresolvedSchema().getPrimaryKey();
+        // validate primary key is exists in table
+        if (!oriPrimaryKey.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist primary key.", 
tableIdentifier));
+        }
+
+        String constraintName = null;
+        if (alterTableDropConstraint.getConstraintName().isPresent()) {
+            constraintName = 
alterTableDropConstraint.getConstraintName().get().getSimple();
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+                && 
!oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "CONSTRAINT [%s] does not exist in table %s",
+                            constraintName, tableIdentifier));
+        }
+
+        return new AlterTableDropConstraintOperation(tableIdentifier, 
isPrimaryKey, constraintName);
+    }
+
+    public static Operation convertDropWatermark(
+            ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+            throw new ValidationException(
+                    String.format("Table %s does not exist watermark.", 
tableIdentifier));
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        builder.fromColumns(originSchema.getColumns());
+
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = 
originSchema.getPrimaryKey();

Review Comment:
   I did not understand the meaning of this sentence, can you give more context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to