dailai commented on code in PR #8211:
URL: https://github.com/apache/seatunnel/pull/8211#discussion_r1872465526
##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java:
##########
@@ -197,6 +233,133 @@ public void write(SeaTunnelRow element) throws
IOException {
}
}
+ @Override
+ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
+ if (event instanceof AlterTableColumnsEvent) {
+ for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent)
event).getEvents()) {
+ applySingleSchemaChangeEvent(columnEvent);
+ }
+ } else if (event instanceof AlterTableColumnEvent) {
+ applySingleSchemaChangeEvent(event);
+ } else {
+ throw new UnsupportedOperationException("Unsupported alter table
event: " + event);
+ }
+ reOpenTableWrite(event);
+ }
+
+ private void reOpenTableWrite(SchemaChangeEvent event) {
+ this.sourceTableSchema =
TABLE_SCHEMACHANGER.reset(sourceTableSchema).apply(event);
+ this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
+ this.paimonFileStoretable = (FileStoreTable)
paimonCatalog.getPaimonTable(paimonTablePath);
+ this.sinkPaimonTableSchema = this.paimonFileStoretable.schema();
+ this.newTableWrite();
+ }
+
+ private void newTableWrite() {
+ this.tableWriteBuilder =
+ JobContextUtil.isBatchJob(jobContext)
+ ? this.paimonFileStoretable.newBatchWriteBuilder()
+ : this.paimonFileStoretable.newStreamWriteBuilder();
+ TableWrite oldTableWrite = this.tableWrite;
+ this.tableWrite =
+ tableWriteBuilder
+ .newWrite()
+ .withIOManager(
+ IOManager.create(
+
splitPaths(paimonSinkConfig.getChangelogTmpPath())));
+ tableWriteClose(oldTableWrite);
+ }
+
+ private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
Review Comment:
Ok, thinks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]