LadyForest commented on code in PR #21504:
URL: https://github.com/apache/flink/pull/21504#discussion_r1057779254


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -417,6 +422,92 @@ void checkColumnExists(String columnName) {
         }
     }
 
+    private static class ModifySchemaConverter extends SchemaConverter {
+
+        ModifySchemaConverter(
+                Schema originalSchema,
+                FlinkTypeFactory typeFactory,
+                SqlValidator sqlValidator,
+                Consumer<SqlTableConstraint> constraintValidator,
+                Function<SqlNode, String> escapeExpressions,
+                SchemaResolver schemaResolver) {
+            super(
+                    originalSchema,
+                    typeFactory,
+                    sqlValidator,
+                    constraintValidator,
+                    escapeExpressions,
+                    schemaResolver);
+        }
+
+        @Override
+        void checkColumnExists(String columnName) {
+            if (!sortedColumnNames.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sTry to modify a column `%s` which does not 
exist in the table.",
+                                EX_MSG_PREFIX, columnName));
+            }
+        }
+
+        @Override
+        void checkPrimaryKeyExists() {
+            if (primaryKey == null) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe base table does not define any primary 
key constraint. You might "
+                                        + "want to add a new one.",
+                                EX_MSG_PREFIX));
+            }
+        }
+
+        @Override
+        void checkWatermarkExists() {
+            if (watermarkSpec == null) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe base table does not define any 
watermark. You might "
+                                        + "want to add a new one.",
+                                EX_MSG_PREFIX));
+            }
+        }
+
+        @Override
+        Optional<Integer> getColumnPosition(SqlTableColumnPosition 
columnPosition) {
+            if (columnPosition.isFirstColumn() || 
columnPosition.isAfterReferencedColumn()) {
+                
sortedColumnNames.remove(columnPosition.getColumn().getName().getSimple());
+                return super.getColumnPosition(columnPosition);
+            }
+            return Optional.empty();
+        }
+
+        @Override
+        Schema.UnresolvedPhysicalColumn convertPhysicalColumn(
+                SqlTableColumn.SqlRegularColumn physicalColumn) {
+            Schema.UnresolvedPhysicalColumn newColumn = 
super.convertPhysicalColumn(physicalColumn);
+            String columnName = newColumn.getName();
+            // preserves the primary key's nullability
+            if (primaryKey != null && 
primaryKey.getColumnNames().contains(columnName)) {
+                newColumn =
+                        new Schema.UnresolvedPhysicalColumn(
+                                columnName,
+                                newColumn.getDataType().notNull(),
+                                newColumn.getComment().orElse(null));
+            }

Review Comment:
   > Why not mark the column is not null when building the final schema with 
the final pk?
   
   Because under some conditions, the final pk does not get a chance to be 
updated.
   Consider the following case
   ```sql
   create table T (
     f0 int,
     f1 string primary key not enforced, -- f1 is converted to string not null 
implicitly
     f2 double
   ) with (
     ...
   );
   
   -- then change f1 pos
   alter table T modify f1 string first
   ```
   
   For this case, the schema change only contains a column modification, and 
L#103`alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);`
 does not get executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to