lvyanquan commented on code in PR #3323:
URL: https://github.com/apache/flink-cdc/pull/3323#discussion_r1604846544
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java:
##########
@@ -129,25 +130,127 @@ private void applyCreateTable(CreateTableEvent event)
private void applyAddColumn(AddColumnEvent event)
throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
- List<SchemaChange> tableChangeList = new ArrayList<>();
- event.getAddedColumns()
- .forEach(
- (column) -> {
- SchemaChange tableChange =
- SchemaChange.addColumn(
- column.getAddColumn().getName(),
- LogicalTypeConversion.toDataType(
-
DataTypeUtils.toFlinkDataType(
-
column.getAddColumn().getType())
-
.getLogicalType()));
- tableChangeList.add(tableChange);
- });
+ List<SchemaChange> tableChangeList =
applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(),
event.tableId().getTableName()),
tableChangeList,
true);
}
+ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent
event)
+ throws Catalog.TableNotExistException {
+ List<SchemaChange> tableChangeList = new ArrayList<>();
+ for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
+ SchemaChange tableChange;
+ switch (columnWithPosition.getPosition()) {
+ case FIRST:
+ tableChange =
+ SchemaChange.addColumn(
+
columnWithPosition.getAddColumn().getName(),
+ LogicalTypeConversion.toDataType(
+ DataTypeUtils.toFlinkDataType(
+ columnWithPosition
+
.getAddColumn()
+ .getType())
+ .getLogicalType()),
+
columnWithPosition.getAddColumn().getComment(),
+ SchemaChange.Move.first(
+
columnWithPosition.getAddColumn().getName()));
+ tableChangeList.add(tableChange);
+ break;
+ case LAST:
+ SchemaChange schemaChangeWithLastPosition =
+ applyAddColumnWithLastPosition(
+ event.tableId().getSchemaName(),
+ event.tableId().getTableName(),
+ columnWithPosition);
+ tableChangeList.add(schemaChangeWithLastPosition);
+ break;
+ case BEFORE:
+ SchemaChange schemaChangeWithBeforePosition =
+ applyAddColumnWithBeforePosition(
+ event.tableId().getSchemaName(),
+ event.tableId().getTableName(),
+ columnWithPosition);
+ tableChangeList.add(schemaChangeWithBeforePosition);
+ break;
+ case AFTER:
+ if (columnWithPosition.getExistedColumnName() == null) {
+ throw new IllegalArgumentException(
+ "Existing column name must be provided for
AFTER position");
+ }
+ tableChange =
+ SchemaChange.addColumn(
+
columnWithPosition.getAddColumn().getName(),
+ LogicalTypeConversion.toDataType(
+ DataTypeUtils.toFlinkDataType(
+ columnWithPosition
+
.getAddColumn()
+ .getType())
+ .getLogicalType()),
+
columnWithPosition.getAddColumn().getComment(),
+ SchemaChange.Move.after(
+
columnWithPosition.getAddColumn().getName(),
+
columnWithPosition.getExistedColumnName()));
+ tableChangeList.add(tableChange);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown column position: " +
columnWithPosition.getPosition());
+ }
+ }
+ return tableChangeList;
+ }
+
+ private SchemaChange applyAddColumnWithLastPosition(
+ String schemaName,
+ String tableName,
+ AddColumnEvent.ColumnWithPosition columnWithPosition)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(new Identifier(schemaName, tableName));
+ List<String> columnNames = table.rowType().getFieldNames();
+ String lastColumnName = columnNames.get(columnNames.size() - 1);
+ return SchemaChange.addColumn(
+ columnWithPosition.getAddColumn().getName(),
+ LogicalTypeConversion.toDataType(
+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
+ .getLogicalType()),
+ columnWithPosition.getAddColumn().getComment(),
+ SchemaChange.Move.after(
+ columnWithPosition.getAddColumn().getName(),
lastColumnName));
Review Comment:
`SchemaChange.Move.after(columnWithPosition.getAddColumn().getName(),
lastColumnName)` can be removed as it's the default behavior of
[SchemaManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java).
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java:
##########
@@ -129,25 +130,127 @@ private void applyCreateTable(CreateTableEvent event)
private void applyAddColumn(AddColumnEvent event)
throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
- List<SchemaChange> tableChangeList = new ArrayList<>();
- event.getAddedColumns()
- .forEach(
- (column) -> {
- SchemaChange tableChange =
- SchemaChange.addColumn(
- column.getAddColumn().getName(),
- LogicalTypeConversion.toDataType(
-
DataTypeUtils.toFlinkDataType(
-
column.getAddColumn().getType())
-
.getLogicalType()));
- tableChangeList.add(tableChange);
- });
+ List<SchemaChange> tableChangeList =
applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(),
event.tableId().getTableName()),
tableChangeList,
true);
}
+ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent
event)
+ throws Catalog.TableNotExistException {
+ List<SchemaChange> tableChangeList = new ArrayList<>();
+ for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
+ SchemaChange tableChange;
+ switch (columnWithPosition.getPosition()) {
+ case FIRST:
+ tableChange =
+ SchemaChange.addColumn(
+
columnWithPosition.getAddColumn().getName(),
+ LogicalTypeConversion.toDataType(
+ DataTypeUtils.toFlinkDataType(
+ columnWithPosition
+
.getAddColumn()
+ .getType())
+ .getLogicalType()),
+
columnWithPosition.getAddColumn().getComment(),
+ SchemaChange.Move.first(
+
columnWithPosition.getAddColumn().getName()));
+ tableChangeList.add(tableChange);
+ break;
+ case LAST:
+ SchemaChange schemaChangeWithLastPosition =
+ applyAddColumnWithLastPosition(
+ event.tableId().getSchemaName(),
+ event.tableId().getTableName(),
+ columnWithPosition);
+ tableChangeList.add(schemaChangeWithLastPosition);
+ break;
+ case BEFORE:
+ SchemaChange schemaChangeWithBeforePosition =
+ applyAddColumnWithBeforePosition(
+ event.tableId().getSchemaName(),
+ event.tableId().getTableName(),
+ columnWithPosition);
+ tableChangeList.add(schemaChangeWithBeforePosition);
+ break;
+ case AFTER:
+ if (columnWithPosition.getExistedColumnName() == null) {
+ throw new IllegalArgumentException(
+ "Existing column name must be provided for
AFTER position");
+ }
Review Comment:
Can be simplified by Preconditions#checkNotNull
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]