hailin0 commented on code in PR #8082:
URL: https://github.com/apache/seatunnel/pull/8082#discussion_r1855466733
##########
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java:
##########
@@ -68,6 +90,100 @@ record = serializer.serialize(element);
manager.write(record);
}
+ @Override
+ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
+ if (event instanceof AlterTableColumnsEvent) {
+ AlterTableColumnsEvent alterTableColumnsEvent =
(AlterTableColumnsEvent) event;
+ List<AlterTableColumnEvent> events =
alterTableColumnsEvent.getEvents();
+ for (AlterTableColumnEvent alterTableColumnEvent : events) {
+ String sourceDialectName =
alterTableColumnEvent.getSourceDialectName();
+ if (StringUtils.isBlank(sourceDialectName)) {
+ throw new SeaTunnelException(
+ "The sourceDialectName in AlterTableColumnEvent
can not be empty. event: "
+ + event);
+ }
+ processSchemaChangeEvent(alterTableColumnEvent);
+ }
+ } else {
+ log.warn("We only support AlterTableColumnsEvent, but actual event
is " + event);
+ }
+
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Connection conn =
+ DriverManager.getConnection(
+ sinkConfig.getJdbcUrl(),
+ sinkConfig.getUsername(),
+ sinkConfig.getPassword());
+ SchemaUtils.applySchemaChange(event, conn, sinkTablePath);
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed connecting to %s via JDBC.",
sinkConfig.getJdbcUrl()), e);
+ }
+ }
+
+ protected void processSchemaChangeEvent(AlterTableColumnEvent event)
throws IOException {
+ List<Column> columns = new ArrayList<>(tableSchema.getColumns());
+ switch (event.getEventType()) {
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ AlterTableAddColumnEvent alterTableAddColumnEvent =
+ (AlterTableAddColumnEvent) event;
+ Column addColumn = alterTableAddColumnEvent.getColumn();
+ String afterColumn = alterTableAddColumnEvent.getAfterColumn();
+ if (StringUtils.isNotBlank(afterColumn)) {
+ Optional<Column> columnOptional =
+ columns.stream()
+ .filter(column ->
afterColumn.equals(column.getName()))
+ .findFirst();
+ if (!columnOptional.isPresent()) {
+ columns.add(addColumn);
+ break;
+ }
+ columnOptional.ifPresent(
+ column -> {
+ int index = columns.indexOf(column);
+ columns.add(index + 1, addColumn);
+ });
+ } else {
+ columns.add(addColumn);
+ }
+ break;
+ case SCHEMA_CHANGE_DROP_COLUMN:
+ String dropColumn = ((AlterTableDropColumnEvent)
event).getColumn();
+ columns.removeIf(column ->
column.getName().equalsIgnoreCase(dropColumn));
+ break;
+ case SCHEMA_CHANGE_MODIFY_COLUMN:
+ Column modifyColumn = ((AlterTableModifyColumnEvent)
event).getColumn();
+ replaceColumnByIndex(
+ event.getEventType(), columns, modifyColumn.getName(),
modifyColumn);
+ break;
+ case SCHEMA_CHANGE_CHANGE_COLUMN:
+ AlterTableChangeColumnEvent alterTableChangeColumnEvent =
+ (AlterTableChangeColumnEvent) event;
+ Column changeColumn = alterTableChangeColumnEvent.getColumn();
+ String oldColumnName =
alterTableChangeColumnEvent.getOldColumn();
+ replaceColumnByIndex(event.getEventType(), columns,
oldColumnName, changeColumn);
+ break;
+ default:
+ throw new SeaTunnelException(
+ "Unsupported schemaChangeEvent for event type: " +
event.getEventType());
+ }
+ this.tableSchema =
Review Comment:
```suggestion
this.tableSchema =
TableSchemaChangeEventDispatcher.reset(..).apply(event);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]