lsyldliu commented on code in PR #19419:
URL: https://github.com/apache/flink/pull/19419#discussion_r858245938
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -117,6 +128,193 @@ public static Operation convertAddReplaceColumns(
catalogTable.getComment()));
}
+ public static Operation convertAlterTableDropConstraint(
+ ObjectIdentifier tableIdentifier,
+ CatalogTable catalogTable,
+ SqlAlterTableDropConstraint alterTableDropConstraint) {
+ boolean isPrimaryKey = alterTableDropConstraint.isPrimaryKey();
+ Optional<Schema.UnresolvedPrimaryKey> oriPrimaryKey =
+ catalogTable.getUnresolvedSchema().getPrimaryKey();
+ // validate primary key is exists in table
+ if (!oriPrimaryKey.isPresent()) {
+ throw new ValidationException(
+ String.format("Table %s does not exist primary key.",
tableIdentifier));
+ }
+
+ String constraintName = null;
+ if (alterTableDropConstraint.getConstraintName().isPresent()) {
+ constraintName =
alterTableDropConstraint.getConstraintName().get().getSimple();
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(constraintName)
+ &&
!oriPrimaryKey.get().getConstraintName().equals(constraintName)) {
+ throw new ValidationException(
+ String.format(
+ "CONSTRAINT [%s] does not exist in table %s",
+ constraintName, tableIdentifier));
+ }
+
+ return new AlterTableDropConstraintOperation(tableIdentifier,
isPrimaryKey, constraintName);
+ }
+
+ public static Operation convertDropWatermark(
+ ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
+ Schema originSchema = catalogTable.getUnresolvedSchema();
+ if (CollectionUtil.isNullOrEmpty(originSchema.getWatermarkSpecs())) {
+ throw new ValidationException(
+ String.format("Table %s does not exist watermark.",
tableIdentifier));
+ }
+
+ Schema.Builder builder = Schema.newBuilder();
+ // build column
+ builder.fromColumns(originSchema.getColumns());
+
+ // build primary key
+ Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey =
originSchema.getPrimaryKey();
+ if (originPrimaryKey.isPresent()) {
+ String constrainName = originPrimaryKey.get().getConstraintName();
+ List<String> primaryKeyNames =
originPrimaryKey.get().getColumnNames();
+ builder.primaryKeyNamed(constrainName, primaryKeyNames);
+ }
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ builder.build(),
+ catalogTable.getComment(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getOptions()));
+ }
+
+ public static Operation convertDropColumns(
+ ObjectIdentifier tableIdentifier,
+ CatalogTable catalogTable,
+ ResolvedSchema originResolveSchema,
+ SqlAlterTableDropColumns sqlAlterTableDropColumns) {
+ Schema originSchema = catalogTable.getUnresolvedSchema();
+ List<String> originTableColumns =
+ originSchema.getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toList());
+
+ // filter the dropped column which is not in table firstly
+ List<String> toDropColumns =
+ sqlAlterTableDropColumns.getColumns().getList().stream()
+ .map(SqlIdentifier.class::cast)
+ .map(SqlIdentifier::getSimple)
+ .filter(column -> originTableColumns.contains(column))
+ .collect(Collectors.toList());
+
+ // validate column size
+ if (originTableColumns.size() == 1 && toDropColumns.size() > 0) {
+ throw new ValidationException(
+ String.format(
+ "Table %s only has one column, please use DROP
TABLE syntax.",
+ tableIdentifier));
+ }
+
+ // validate the dropped column is referenced by computed column
+ toDropColumns.forEach(
+ column -> validateComputedColumn(column, originTableColumns,
originResolveSchema));
+
+ // validate the dropped column is referenced by watermark
+ toDropColumns.forEach(
+ column ->
+ validateWatermark(
+ column,
+ originResolveSchema.getWatermarkSpecs(),
+ originTableColumns));
+
+ Schema.Builder builder = Schema.newBuilder();
+ // build column
+ builder.fromColumns(
+ originSchema.getColumns().stream()
+ .filter(originColumn ->
!toDropColumns.contains(originColumn.getName()))
+ .collect(Collectors.toList()));
+
+ // build watermark
+ originSchema
+ .getWatermarkSpecs()
+ .forEach(
+ watermarkSpec ->
+ builder.watermark(
+ watermarkSpec.getColumnName(),
+
watermarkSpec.getWatermarkExpression()));
+ // build primary key
+ Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey =
originSchema.getPrimaryKey();
+ if (originPrimaryKey.isPresent()) {
+ List<String> originPrimaryKeyNames =
originPrimaryKey.get().getColumnNames();
+ String constrainName = originPrimaryKey.get().getConstraintName();
+ List<String> newPrimaryKeyNames =
+ originPrimaryKeyNames.stream()
+ .filter(pkName -> !toDropColumns.contains(pkName))
+ .collect(Collectors.toList());
+ if (newPrimaryKeyNames.size() > 0) {
Review Comment:
mysql and postgresql.Here is some scenes when drop column:
Constraint refers to this column and other column.Remove this column from
Constraint.If constraint only refers to this column ,drop column and constraint
together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]