This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3473e2d7f1 [Feature][Flink] Support CDC source schema evolution on 
Flink engine (#9867)
3473e2d7f1 is described below

commit 3473e2d7f11b35abbd6e84a2c0ba4da1e7c646c0
Author: CloverDew <[email protected]>
AuthorDate: Thu Oct 23 11:27:49 2025 +0800

    [Feature][Flink] Support CDC source schema evolution on Flink engine (#9867)
---
 .../org/apache/seatunnel/api/event/EventType.java  |   1 +
 .../api/sink/SupportSchemaEvolutionSinkWriter.java |  46 +++
 .../sink/multitablesink/MultiTableSinkWriter.java  |  29 ++
 .../api/table/coordinator/SchemaCoordinator.java   | 329 +++++++++++++++++++++
 .../api/table/coordinator/SchemaResponse.java      |  53 ++++
 .../api/table/schema/event/FlushEvent.java         |  63 ++++
 .../exception/SchemaCoordinationException.java     |  80 +++++
 .../schema/exception/SchemaEvolutionErrorCode.java |  74 +++++
 .../schema/exception/SchemaEvolutionException.java |  84 ++++++
 .../exception/SchemaValidationException.java       |  82 +++++
 .../exception/SinkWriterSchemaException.java       |  78 +++++
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |  59 +++-
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |  42 +++
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  38 +++
 .../starter/flink/execution/SchemaOperator.java    | 283 ++++++++++++++++++
 .../flink/execution/SinkExecuteProcessor.java      |  29 +-
 .../flink/execution/SourceExecuteProcessor.java    |  35 ++-
 .../cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java | 280 ++++++++++++++++++
 .../src/test/resources/ddl/initial.sql             |  60 ++++
 .../src/test/resources/ddl/rename_columns.sql      |  29 ++
 ...mysqlcdc_to_mysql_with_flink_schema_change.conf |  53 ++++
 .../mysqlcdc_to_mysql_schema_evolution.conf        |  52 ++++
 .../resources/examples/schema_evolution_test.conf  |  42 +++
 .../translation/flink/sink/FlinkSinkWriter.java    |  60 ++++
 .../flink/source/FlinkRowCollector.java            |  13 +
 25 files changed, 1978 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index 26ae749f0b..3272f65419 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -31,4 +31,5 @@ public enum EventType {
     LIFECYCLE_WRITER_CLOSE,
     READER_MESSAGE_DELAYED,
     JOB_STATUS,
+    SCHEMA_CHANGE_FLUSH,
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
index 54727ec950..47c3839465 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 
 import java.io.IOException;
@@ -30,4 +32,48 @@ public interface SupportSchemaEvolutionSinkWriter {
      * @throws IOException
      */
     void applySchemaChange(SchemaChangeEvent event) throws IOException;
+
+    /**
+     * handle FlushEvent propagated from upstream
+     *
+     * @param event
+     * @throws IOException
+     */
+    default void handleFlushEvent(FlushEvent event) throws IOException {
+        flushData();
+        sendFlushSuccessful(event);
+    }
+
+    /**
+     * send success event to coordinator upon successful flash
+     *
+     * @param event
+     * @throws IOException
+     */
+    default void sendFlushSuccessful(FlushEvent event) throws IOException {
+        SchemaCoordinator coordinator = getSchemaCoordinator();
+        if (coordinator == null && event != null && event.getJobId() != null) {
+            coordinator = 
SchemaCoordinator.getOrCreateInstance(event.getJobId());
+        }
+
+        if (coordinator != null) {
+            coordinator.notifyFlushSuccessful(event.getJobId(), 
event.tableIdentifier());
+        }
+    }
+
+    /**
+     * Get the schema coordinator instance for reporting flush completion
+     *
+     * @return the schema coordinator instance, or null if not available
+     */
+    default SchemaCoordinator getSchemaCoordinator() {
+        return null;
+    }
+
+    /**
+     * flush data to other system
+     *
+     * @throws IOException
+     */
+    default void flushData() throws IOException {}
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index e05cf4cb8b..3721491ea9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -175,8 +176,36 @@ public class MultiTableSinkWriter
         }
     }
 
+    @Override
+    public void handleFlushEvent(FlushEvent event) throws IOException {
+        subSinkErrorCheck();
+        String targetTableId = 
event.tableIdentifier().toTablePath().getFullName();
+        for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
+            for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> 
sinkWriterEntry :
+                    sinkWritersWithIndex.get(i).entrySet()) {
+                if 
(sinkWriterEntry.getKey().getTableIdentifier().equals(targetTableId)) {
+                    synchronized (runnable.get(i)) {
+                        SinkWriter<SeaTunnelRow, ?, ?> sink = 
sinkWriterEntry.getValue();
+                        if (sink instanceof SupportSchemaEvolutionSinkWriter) {
+                            ((SupportSchemaEvolutionSinkWriter) 
sink).handleFlushEvent(event);
+                        }
+                    }
+                    return;
+                }
+            }
+        }
+    }
+
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (element != null && element.getOptions() != null) {
+            if (element.getOptions().containsKey("flush_event")
+                    || 
element.getOptions().containsKey("schema_change_event")) {
+                log.debug("Skipping schema change event row: {}", 
element.getOptions().keySet());
+                return;
+            }
+        }
+
         if (!submitted) {
             submitted = true;
             runnable.forEach(executorService::submit);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
new file mode 100644
index 0000000000..5b2b54d80e
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.coordinator;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaCoordinationException;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaValidationException;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** global unified coordinator for handling schema changes */
+@Slf4j
+public class SchemaCoordinator implements Serializable {
+
+    private static final Map<String, SchemaCoordinator> JOB_COORDINATORS =
+            new ConcurrentHashMap<>();
+    private final Map<TableIdentifier, SchemaChangeState> schemaChangeStates;
+    private final Map<TableIdentifier, Long> schemaVersions;
+    private final Map<TableIdentifier, ReentrantLock> schemaLocks;
+    private final Map<TableIdentifier, Map<String, 
CompletableFuture<SchemaResponse>>>
+            pendingRequests;
+    private final Map<TableIdentifier, Long> latestSchemaChangeTime = new 
ConcurrentHashMap<>();
+
+    public SchemaCoordinator() {
+        this.schemaChangeStates = new ConcurrentHashMap<>();
+        this.schemaVersions = new ConcurrentHashMap<>();
+        this.schemaLocks = new ConcurrentHashMap<>();
+        this.pendingRequests = new ConcurrentHashMap<>();
+    }
+
+    public static SchemaCoordinator getOrCreateInstance(String jobId) {
+        return JOB_COORDINATORS.computeIfAbsent(
+                jobId,
+                k -> {
+                    log.info("Creating new SchemaCoordinator instance for job: 
{}", jobId);
+                    return new SchemaCoordinator();
+                });
+    }
+
+    public static void removeInstance(String jobId) {
+        SchemaCoordinator removed = JOB_COORDINATORS.remove(jobId);
+        if (removed != null) {
+            log.info("Removed SchemaCoordinator instance for job: {}", jobId);
+        }
+    }
+
+    public void notifyFlushSuccessful(String jobId, TableIdentifier tableId) {
+        log.info("Received flush notification: jobId={}, tableId={}", jobId, 
tableId);
+
+        ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new 
ReentrantLock());
+        lock.lock();
+        try {
+            SchemaChangeState state = schemaChangeStates.get(tableId);
+            if (state == null) {
+                log.warn("No schema change state found for table: {}", 
tableId);
+                log.info("Available schema change states: {}", 
schemaChangeStates.keySet());
+                return;
+            }
+
+            if (!state.getJobId().equals(jobId)) {
+                log.warn(
+                        "Job ID mismatch: received jobId={}, expected 
jobId={}, tableId={}",
+                        jobId,
+                        state.getJobId(),
+                        tableId);
+                return;
+            }
+
+            // update the schema change status
+            state.incrementFlushCount();
+            log.info(
+                    "Received flush successful notification for table {} ({}), 
count: {}/{}",
+                    tableId,
+                    jobId,
+                    state.getFlushCount(),
+                    state.getTotalOperators());
+
+            if (state.isAllFlushed()) {
+                // all operators have completed flushing. The schema change 
can now be finalized
+                log.info("All operators flushed for table {}, completing 
schema change", tableId);
+                completeSchemaChange(tableId, jobId);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public CompletableFuture<SchemaResponse> requestSchemaChange(
+            TableIdentifier tableId, String jobId, CatalogTable newSchema, int 
totalOperators) {
+
+        ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new 
ReentrantLock());
+        lock.lock();
+        try {
+            SchemaChangeState existingState = schemaChangeStates.get(tableId);
+            if (existingState != null && existingState.getJobId() != null) {
+                log.warn(
+                        "Schema change already in progress for table: {}, 
existing jobId: {}, new jobId: {}",
+                        tableId,
+                        existingState.getJobId(),
+                        jobId);
+                return CompletableFuture.completedFuture(
+                        SchemaResponse.failure(
+                                new SchemaCoordinationException(
+                                                SchemaEvolutionErrorCode
+                                                        
.SCHEMA_CHANGE_ALREADY_IN_PROGRESS,
+                                                "Schema change already in 
progress",
+                                                tableId,
+                                                jobId)
+                                        .getMessage()));
+            }
+
+            Long lastProcessedTime = latestSchemaChangeTime.get(tableId);
+            if (lastProcessedTime != null && newSchema != null) {
+                long currentTime = System.currentTimeMillis();
+                if (currentTime <= lastProcessedTime) {
+                    log.warn(
+                            "Received outdated schema change event for table: 
{}, current time: {}, last processed: {}",
+                            tableId,
+                            currentTime,
+                            lastProcessedTime);
+                    return CompletableFuture.completedFuture(
+                            SchemaResponse.failure(
+                                    new SchemaValidationException(
+                                                    
SchemaEvolutionErrorCode.OUTDATED_SCHEMA_EVENT,
+                                                    "Schema change event is 
outdated",
+                                                    tableId,
+                                                    jobId)
+                                            .getMessage()));
+                }
+            }
+
+            if (newSchema != null && !isValidSchema(newSchema)) {
+                log.error("Invalid schema structure for table: {}", tableId);
+                return CompletableFuture.completedFuture(
+                        SchemaResponse.failure(
+                                new SchemaValidationException(
+                                                
SchemaEvolutionErrorCode.INVALID_SCHEMA_STRUCTURE,
+                                                "Invalid schema structure",
+                                                tableId,
+                                                jobId)
+                                        .getMessage()));
+            }
+            log.info("Processing schema change for table: {}, job: {}", 
tableId, jobId);
+
+            CatalogTable currentSchema = null;
+            if (existingState != null) {
+                currentSchema = existingState.getCurrentSchema();
+            }
+
+            SchemaChangeState state =
+                    new SchemaChangeState(currentSchema, newSchema, jobId, 
totalOperators);
+
+            schemaChangeStates.put(tableId, state);
+            log.info(
+                    "SchemaChangeStates tableId: {}, state: {}",
+                    tableId,
+                    schemaChangeStates.get(tableId));
+
+            // update the latest schema change time
+            if (newSchema != null) {
+                latestSchemaChangeTime.put(tableId, 
System.currentTimeMillis());
+            }
+
+            return waitForSchemaChangeConfirmation(tableId, jobId);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public CompletableFuture<SchemaResponse> waitForSchemaChangeConfirmation(
+            TableIdentifier tableId, String jobId) {
+        CompletableFuture<SchemaResponse> future = new CompletableFuture<>();
+
+        // add the request to the waiting queue
+        pendingRequests.computeIfAbsent(tableId, k -> new 
ConcurrentHashMap<>()).put(jobId, future);
+        return future;
+    }
+
+    private void completeSchemaChange(TableIdentifier tableId, String jobId) {
+        ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new 
ReentrantLock());
+        lock.lock();
+        try {
+            SchemaChangeState state = schemaChangeStates.get(tableId);
+            if (state == null) {
+                log.warn("No schema change state found for table: {} during 
completion", tableId);
+                return;
+            }
+
+            if (!jobId.equals(state.getJobId())) {
+                log.warn(
+                        "Job ID mismatch during completion: expected={}, 
actual={}, table={}",
+                        jobId,
+                        state.getJobId(),
+                        tableId);
+                return;
+            }
+
+            if (!state.isAllFlushed()) {
+                log.warn(
+                        "Attempting to complete schema change but not all 
operators have flushed: {}/{} for table {}",
+                        state.getFlushCount(),
+                        state.getTotalOperators(),
+                        tableId);
+                return;
+            }
+
+            long newVersion = schemaVersions.getOrDefault(tableId, 0L) + 1;
+            schemaVersions.put(tableId, newVersion);
+
+            SchemaResponse response = 
SchemaResponse.success(state.getNewSchema(), newVersion);
+
+            Map<String, CompletableFuture<SchemaResponse>> tableRequests =
+                    pendingRequests.get(tableId);
+            if (tableRequests != null) {
+                int completedCount = 0;
+                for (CompletableFuture<SchemaResponse> future : 
tableRequests.values()) {
+                    if (!future.isDone()) {
+                        future.complete(response);
+                        completedCount++;
+                    }
+                }
+                log.info(
+                        "Completed {} pending schema change requests for table 
{}",
+                        completedCount,
+                        tableId);
+                tableRequests.clear();
+            }
+
+            state.setCurrentSchema(state.getNewSchema());
+            state.setNewSchema(null);
+            state.setJobId(null);
+            state.resetFlushCount();
+
+            latestSchemaChangeTime.remove(tableId);
+            log.info("Schema change completed for table {}, version: {}", 
tableId, newVersion);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private boolean isValidSchema(CatalogTable schema) {
+        if (schema == null) {
+            return false;
+        }
+
+        if (schema.getTableId() == null) {
+            log.error("Schema has null table identifier");
+            return false;
+        }
+
+        if (schema.getTableSchema() == null) {
+            log.error("Schema has null table schema");
+            return false;
+        }
+
+        if (schema.getTableSchema().getColumns().isEmpty()) {
+            log.error("Schema has no columns");
+            return false;
+        }
+
+        return true;
+    }
+
+    @Getter
+    @Setter
+    @ToString
+    private static class SchemaChangeState {
+        private CatalogTable currentSchema;
+        private CatalogTable newSchema;
+        private String jobId;
+        private final AtomicInteger flushCount;
+        private final int totalOperators;
+
+        public SchemaChangeState(
+                CatalogTable currentSchema,
+                CatalogTable newSchema,
+                String jobId,
+                int totalOperators) {
+            this.currentSchema = currentSchema;
+            this.newSchema = newSchema;
+            this.jobId = jobId;
+            this.flushCount = new AtomicInteger(0);
+            this.totalOperators = totalOperators;
+        }
+
+        public int getFlushCount() {
+            return flushCount.get();
+        }
+
+        public void incrementFlushCount() {
+            flushCount.incrementAndGet();
+        }
+
+        public void resetFlushCount() {
+            flushCount.set(0);
+        }
+
+        public boolean isAllFlushed() {
+            return flushCount.get() >= totalOperators;
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
new file mode 100644
index 0000000000..62cd458292
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.coordinator;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+
+/** used to return change response information */
+@Getter
+public class SchemaResponse {
+    private final boolean success;
+    private final String message;
+    private final CatalogTable currentSchema;
+    private final Long schemaVersion;
+
+    private SchemaResponse(
+            boolean success, String message, CatalogTable currentSchema, Long 
schemaVersion) {
+        this.success = success;
+        this.message = message;
+        this.currentSchema = currentSchema;
+        this.schemaVersion = schemaVersion;
+    }
+
+    public static SchemaResponse success(CatalogTable currentSchema, Long 
schemaVersion) {
+        String message = "Schema change completed successfully, current 
version: " + schemaVersion;
+        return new SchemaResponse(true, message, currentSchema, schemaVersion);
+    }
+
+    public static SchemaResponse failure(String errorMessage) {
+        return new SchemaResponse(false, errorMessage, null, null);
+    }
+
+    @Deprecated
+    public String getErrorMessage() {
+        return message;
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
new file mode 100644
index 0000000000..449c017eaf
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.event;
+
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/** Used for communication between SchemaEvolutionOperator and sink */
+@Getter
+@Setter
+@ToString
+public class FlushEvent implements SchemaChangeEvent {
+    private final TableIdentifier tableIdentifier;
+    @Getter @Setter private CatalogTable changeAfter;
+    @Getter @Setter private String jobId;
+    @Getter @Setter private String statement;
+    @Getter @Setter protected String sourceDialectName;
+    private final Long createdTime;
+    private final SchemaChangeEvent schemaChangeEvent;
+    private final EventType eventType = EventType.SCHEMA_CHANGE_FLUSH;
+
+    public FlushEvent(SchemaChangeEvent schemaChangeEvent) {
+        this.schemaChangeEvent = schemaChangeEvent;
+        this.tableIdentifier = schemaChangeEvent.tableIdentifier();
+        this.jobId = schemaChangeEvent.getJobId();
+        this.createdTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public TableIdentifier tableIdentifier() {
+        return tableIdentifier;
+    }
+
+    @Override
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    @Override
+    public EventType getEventType() {
+        return eventType;
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
new file mode 100644
index 0000000000..70435adf43
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when schema coordination operations fail. This includes 
timeout issues,
+ * coordination conflicts, and coordinator state problems.
+ */
+public class SchemaCoordinationException extends SchemaEvolutionException {
+
+    public SchemaCoordinationException(SchemaEvolutionErrorCode errorCode, 
String errorMessage) {
+        super(errorCode, errorMessage);
+    }
+
+    public SchemaCoordinationException(
+            SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+    }
+
+    public SchemaCoordinationException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId) {
+        super(errorCode, errorMessage, tableIdentifier, jobId);
+    }
+
+    public SchemaCoordinationException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId,
+            Throwable cause) {
+        super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+    }
+
+    /** Create a timeout exception for schema changes */
+    public static SchemaCoordinationException timeout(
+            TableIdentifier tableIdentifier, String jobId, long 
timeoutSeconds, Throwable cause) {
+        String message =
+                String.format("Schema change operation timed out after %d 
seconds", timeoutSeconds);
+        return new SchemaCoordinationException(
+                SchemaEvolutionErrorCode.SCHEMA_CHANGE_TIMEOUT,
+                message,
+                tableIdentifier,
+                jobId,
+                cause);
+    }
+
+    /** Create an exception for schema change conflicts */
+    public static SchemaCoordinationException conflict(
+            TableIdentifier tableIdentifier, String currentJobId, String 
conflictingJobId) {
+        String message =
+                String.format(
+                        "Schema change already in progress for table. Current 
job: %s, conflicting job: %s",
+                        currentJobId, conflictingJobId);
+        return new SchemaCoordinationException(
+                SchemaEvolutionErrorCode.SCHEMA_CHANGE_ALREADY_IN_PROGRESS,
+                message,
+                tableIdentifier,
+                currentJobId);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
new file mode 100644
index 0000000000..08c614b952
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SchemaEvolutionErrorCode implements SeaTunnelErrorCode {
+
+    // Schema Coordination Errors
+    SCHEMA_COORDINATOR_NOT_INITIALIZED("SE-01", "Schema coordinator is not 
initialized"),
+    SCHEMA_CHANGE_ALREADY_IN_PROGRESS(
+            "SE-02", "Schema change is already in progress for the table"),
+    SCHEMA_CHANGE_TIMEOUT("SE-03", "Schema change operation timed out"),
+    SCHEMA_CHANGE_COORDINATION_FAILED("SE-04", "Schema change coordination 
failed"),
+
+    // Schema Validation Errors
+    INVALID_SCHEMA_STRUCTURE("SE-05", "Invalid schema structure provided"),
+    SCHEMA_INCOMPATIBLE("SE-06", "Schema change is incompatible with current 
schema"),
+    OUTDATED_SCHEMA_EVENT("SE-07", "Schema change event is outdated"),
+    UNSUPPORTED_SCHEMA_CHANGE_TYPE("SE-08", "Schema change type is not 
supported"),
+
+    // Sink Writer Errors
+    SCHEMA_CHANGE_APPLICATION_FAILED("SE-09", "Failed to apply schema change 
to sink writer"),
+    FLUSH_OPERATION_FAILED("SE-10", "Flush operation failed during schema 
evolution"),
+    SCHEMA_ROLLBACK_FAILED("SE-11", "Failed to rollback schema change"),
+
+    // Table and Database Errors
+    TABLE_SCHEMA_UPDATE_FAILED("SE-12", "Failed to update table schema in 
database"),
+    TABLE_NOT_FOUND("SE-13", "Target table not found"),
+    INSUFFICIENT_PERMISSIONS("SE-14", "Insufficient permissions to modify 
table schema"),
+
+    // Event Processing Errors
+    SCHEMA_EVENT_PROCESSING_FAILED("SE-15", "Failed to process schema change 
event"),
+    FLUSH_EVENT_PROCESSING_FAILED("SE-16", "Failed to process flush event"),
+    SCHEMA_EVENT_DESERIALIZATION_FAILED("SE-17", "Failed to deserialize schema 
change event"),
+
+    // Resource Management Errors
+    RESOURCE_CLEANUP_FAILED("SE-18", "Failed to cleanup resources after schema 
change"),
+    CONNECTION_POOL_EXHAUSTED("SE-19", "Database connection pool exhausted 
during schema change"),
+    MEMORY_ALLOCATION_FAILED("SE-20", "Memory allocation failed during schema 
processing");
+
+    private final String code;
+    private final String description;
+
+    SchemaEvolutionErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
new file mode 100644
index 0000000000..6d864c291a
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import lombok.Getter;
+
+/**
+ * Base exception class for schema evolution related errors. This exception 
provides detailed
+ * context about schema evolution failures.
+ */
+@Getter
+public class SchemaEvolutionException extends SeaTunnelRuntimeException {
+
+    private final TableIdentifier tableIdentifier;
+
+    private final String jobId;
+
+    public SchemaEvolutionException(SchemaEvolutionErrorCode errorCode, String 
errorMessage) {
+        super(errorCode, errorMessage);
+        this.tableIdentifier = null;
+        this.jobId = null;
+    }
+
+    public SchemaEvolutionException(
+            SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+        this.tableIdentifier = null;
+        this.jobId = null;
+    }
+
+    public SchemaEvolutionException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId) {
+        super(errorCode, enrichErrorMessage(errorMessage, tableIdentifier, 
jobId));
+        this.tableIdentifier = tableIdentifier;
+        this.jobId = jobId;
+    }
+
+    public SchemaEvolutionException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId,
+            Throwable cause) {
+        super(errorCode, enrichErrorMessage(errorMessage, tableIdentifier, 
jobId), cause);
+        this.tableIdentifier = tableIdentifier;
+        this.jobId = jobId;
+    }
+
+    private static String enrichErrorMessage(
+            String originalMessage, TableIdentifier tableIdentifier, String 
jobId) {
+        StringBuilder message = new StringBuilder(originalMessage);
+
+        if (tableIdentifier != null) {
+            message.append(" [Table: ").append(tableIdentifier).append("]");
+        }
+
+        if (jobId != null) {
+            message.append(" [Job: ").append(jobId).append("]");
+        }
+
+        return message.toString();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
new file mode 100644
index 0000000000..e213795d6f
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when schema validation fails. This includes invalid schema 
structures,
+ * incompatible changes, and outdated events.
+ */
+public class SchemaValidationException extends SchemaEvolutionException {
+
+    public SchemaValidationException(SchemaEvolutionErrorCode errorCode, 
String errorMessage) {
+        super(errorCode, errorMessage);
+    }
+
+    public SchemaValidationException(
+            SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+    }
+
+    public SchemaValidationException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId) {
+        super(errorCode, errorMessage, tableIdentifier, jobId);
+    }
+
+    public SchemaValidationException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId,
+            Throwable cause) {
+        super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+    }
+
+    /** Create an exception for invalid schema structure */
+    public static SchemaValidationException invalidSchema(
+            TableIdentifier tableIdentifier, String jobId, String reason) {
+        String message = String.format("Invalid schema structure: %s", reason);
+        return new SchemaValidationException(
+                SchemaEvolutionErrorCode.INVALID_SCHEMA_STRUCTURE, message, 
tableIdentifier, jobId);
+    }
+
+    /** Create an exception for unsupported schema change types */
+    public static SchemaValidationException unsupportedChangeType(
+            TableIdentifier tableIdentifier, String jobId) {
+        return new SchemaValidationException(
+                SchemaEvolutionErrorCode.UNSUPPORTED_SCHEMA_CHANGE_TYPE,
+                "Schema change type '%s' is not supported",
+                tableIdentifier,
+                jobId);
+    }
+
+    /** Create an exception for outdated schema events */
+    public static SchemaValidationException outdatedEvent(
+            TableIdentifier tableIdentifier, String jobId, long eventTime, 
long lastProcessedTime) {
+        String message =
+                String.format(
+                        "Schema change event is outdated. Event time: %d, last 
processed: %d",
+                        eventTime, lastProcessedTime);
+        return new SchemaValidationException(
+                SchemaEvolutionErrorCode.OUTDATED_SCHEMA_EVENT, message, 
tableIdentifier, jobId);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
new file mode 100644
index 0000000000..864d79af0c
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when sink writer schema operations fail. This includes 
schema application
+ * failures, flush failures, and rollback issues.
+ */
+public class SinkWriterSchemaException extends SchemaEvolutionException {
+
+    public SinkWriterSchemaException(SchemaEvolutionErrorCode errorCode, 
String errorMessage) {
+        super(errorCode, errorMessage);
+    }
+
+    public SinkWriterSchemaException(
+            SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+    }
+
+    public SinkWriterSchemaException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId) {
+        super(errorCode, errorMessage, tableIdentifier, jobId);
+    }
+
+    public SinkWriterSchemaException(
+            SchemaEvolutionErrorCode errorCode,
+            String errorMessage,
+            TableIdentifier tableIdentifier,
+            String jobId,
+            Throwable cause) {
+        super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+    }
+
+    /** Create an exception for schema application failures */
+    public static SinkWriterSchemaException applicationFailed(
+            TableIdentifier tableIdentifier, String jobId, String reason, 
Throwable cause) {
+        String message = String.format("Failed to apply schema change: %s", 
reason);
+        return new SinkWriterSchemaException(
+                SchemaEvolutionErrorCode.SCHEMA_CHANGE_APPLICATION_FAILED,
+                message,
+                tableIdentifier,
+                jobId,
+                cause);
+    }
+
+    /** Create an exception for flush operation failures */
+    public static SinkWriterSchemaException flushFailed(
+            TableIdentifier tableIdentifier, String jobId, String reason, 
Throwable cause) {
+        String message =
+                String.format("Flush operation failed during schema evolution: 
%s", reason);
+        return new SinkWriterSchemaException(
+                SchemaEvolutionErrorCode.FLUSH_OPERATION_FAILED,
+                message,
+                tableIdentifier,
+                jobId,
+                cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 948ca620c4..afae87ecf2 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -22,7 +22,10 @@ import 
org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
 import 
org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventDispatcher;
 import 
org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventHandler;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
@@ -47,6 +51,7 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     private final AtomicLong rowCounter = new AtomicLong(0);
     private final SinkWriter.Context context;
     private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
+    private SchemaCoordinator schemaCoordinator;
 
     boolean isPrintData = true;
     int delayMs = 0;
@@ -65,14 +70,62 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     }
 
     @Override
-    public void applySchemaChange(SchemaChangeEvent event) {
+    public void applySchemaChange(SchemaChangeEvent event) throws IOException {
         log.info("changed rowType before: {}", fieldsInfo(seaTunnelRowType));
-        seaTunnelRowType = 
dataTypeChangeEventHandler.reset(seaTunnelRowType).apply(event);
-        log.info("changed rowType after: {}", fieldsInfo(seaTunnelRowType));
+        try {
+            seaTunnelRowType = 
dataTypeChangeEventHandler.reset(seaTunnelRowType).apply(event);
+            log.info("changed rowType after: {}", 
fieldsInfo(seaTunnelRowType));
+        } catch (Exception e) {
+            log.error(
+                    "ConsoleSinkWriter failed to apply schema change for 
table: {}",
+                    event.tableIdentifier(),
+                    e);
+            throw SinkWriterSchemaException.applicationFailed(
+                    event.tableIdentifier(),
+                    event.getJobId(),
+                    "Console sink writer schema change application failed",
+                    e);
+        }
+    }
+
+    @Override
+    public void handleFlushEvent(FlushEvent event) throws IOException {
+        log.info("ConsoleSinkWriter handling FlushEvent for table: {}", 
event.tableIdentifier());
+        try {
+            flushData();
+            log.info("ConsoleSinkWriter flush completed for table: {}", 
event.tableIdentifier());
+            sendFlushSuccessful(event);
+        } catch (Exception e) {
+            log.error("ConsoleSinkWriter flush failed for table: {}", 
event.tableIdentifier(), e);
+            throw SinkWriterSchemaException.flushFailed(
+                    event.tableIdentifier(), event.getJobId(), "Console flush 
operation failed", e);
+        }
+    }
+
+    @Override
+    public void sendFlushSuccessful(FlushEvent event) throws IOException {
+        log.info(
+                "ConsoleSinkWriter reporting flush success for table: {}", 
event.tableIdentifier());
+        SupportSchemaEvolutionSinkWriter.super.sendFlushSuccessful(event);
+    }
+
+    @Override
+    public SchemaCoordinator getSchemaCoordinator() {
+        return schemaCoordinator;
     }
 
     @Override
     public void write(SeaTunnelRow element) {
+        if (element != null && element.getOptions() != null) {
+            if (element.getOptions().containsKey("flush_event")
+                    || 
element.getOptions().containsKey("schema_change_event")) {
+                log.info(
+                        "ConsoleSinkWriter skipping schema event row: {}",
+                        element.getOptions().keySet());
+                return;
+            }
+        }
+
         String[] arr = new String[seaTunnelRowType.getTotalFields()];
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         Object[] fields = element.getFields();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index d3f8862bc4..c97941f31e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import 
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -131,12 +132,53 @@ public class JdbcExactlyOnceSinkWriter extends 
AbstractJdbcSinkWriter<Void> {
 
     @Override
     public void write(SeaTunnelRow element) {
+        if (element != null && element.getOptions() != null) {
+            if (element.getOptions().containsKey("flush_event")
+                    || 
element.getOptions().containsKey("schema_change_event")) {
+                LOG.debug("Skipping schema change event row: {}", 
element.getOptions().keySet());
+                return;
+            }
+        }
+
         tryOpen();
         checkState(currentXid != null, "current xid must not be null");
         SeaTunnelRow copy = SerializationUtils.clone(element);
         outputFormat.writeRecord(copy);
     }
 
+    @Override
+    public void flushData() throws IOException {
+        tryOpen();
+        outputFormat.checkFlushException();
+        outputFormat.flush();
+    }
+
+    @Override
+    public void 
handleFlushEvent(org.apache.seatunnel.api.table.schema.event.FlushEvent event)
+            throws IOException {
+        LOG.info(
+                "JdbcExactlyOnceSinkWriter handling FlushEvent for table: {}",
+                event.tableIdentifier());
+        try {
+            tryOpen();
+            flushData();
+            LOG.info(
+                    "JdbcExactlyOnceSinkWriter flush completed for table: {}",
+                    event.tableIdentifier());
+            sendFlushSuccessful(event);
+        } catch (Exception e) {
+            LOG.error(
+                    "JdbcExactlyOnceSinkWriter flush failed for table: {}",
+                    event.tableIdentifier(),
+                    e);
+            throw SinkWriterSchemaException.flushFailed(
+                    event.tableIdentifier(),
+                    event.getJobId(),
+                    "Exactly-once JDBC flush operation failed",
+                    e);
+        }
+    }
+
     @Override
     public Optional<XidInfo> prepareCommit() throws IOException {
         tryOpen();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 3137f427e8..d367acb006 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -22,6 +22,9 @@ import 
org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import 
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -44,6 +47,7 @@ import java.util.Optional;
 @Slf4j
 public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager> {
     private final Integer primaryKeyIndex;
+    private SchemaCoordinator schemaCoordinator;
 
     public JdbcSinkWriter(
             TablePath sinkTablePath,
@@ -136,10 +140,44 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (element != null && element.getOptions() != null) {
+            if (element.getOptions().containsKey("flush_event")
+                    || 
element.getOptions().containsKey("schema_change_event")) {
+                log.debug("Skipping schema change event row: {}", 
element.getOptions().keySet());
+                return;
+            }
+        }
+
         tryOpen();
         outputFormat.writeRecord(element);
     }
 
+    @Override
+    public void handleFlushEvent(FlushEvent event) throws IOException {
+        log.info("JdbcSinkWriter handling FlushEvent for table: {}", 
event.tableIdentifier());
+        try {
+            flushData();
+            log.info("JdbcSinkWriter flush completed for table: {}", 
event.tableIdentifier());
+            sendFlushSuccessful(event);
+        } catch (Exception e) {
+            log.error("JdbcSinkWriter flush failed for table: {}", 
event.tableIdentifier(), e);
+            throw SinkWriterSchemaException.flushFailed(
+                    event.tableIdentifier(), event.getJobId(), "JDBC flush 
operation failed", e);
+        }
+    }
+
+    @Override
+    public void flushData() throws IOException {
+        tryOpen();
+        outputFormat.checkFlushException();
+        outputFormat.flush();
+    }
+
+    @Override
+    public SchemaCoordinator getSchemaCoordinator() {
+        return schemaCoordinator;
+    }
+
     @Override
     public Optional<XidInfo> prepareCommit() throws IOException {
         tryOpen();
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
new file mode 100644
index 0000000000..0302a98957
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.coordinator.SchemaResponse;
+import org.apache.seatunnel.api.table.schema.SchemaChangeType;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.schema.event.TableEvent;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaCoordinationException;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaValidationException;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** operators added to the source and transformer pipelines to handle schema 
evolution */
+@Slf4j
+public class SchemaOperator extends AbstractStreamOperator<SeaTunnelRow>
+        implements OneInputStreamOperator<SeaTunnelRow, SeaTunnelRow> {
+    private final Map<TableIdentifier, CatalogTable> localSchemaState;
+    private String jobId;
+    private final SupportSchemaEvolution source;
+    private final Config pluginConfig;
+    private SchemaCoordinator schemaCoordinator;
+    private final AtomicReference<CompletableFuture<SchemaResponse>> 
currentSchemaChangeFuture =
+            new AtomicReference<>();
+    private volatile Long lastProcessedEventTime;
+
+    public SchemaOperator(String jobId, SupportSchemaEvolution source, Config 
pluginConfig) {
+        this.jobId = jobId;
+        this.source = source;
+        this.pluginConfig = pluginConfig;
+        this.localSchemaState = new ConcurrentHashMap<>();
+        this.schemaCoordinator = SchemaCoordinator.getOrCreateInstance(jobId);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        try {
+            String flinkJobId = getRuntimeContext().getJobId().toString();
+            if (!flinkJobId.equals(this.jobId)) {
+                log.info(
+                        "Updating SchemaCoordinator from SeaTunnel jobId {} to 
Flink jobId {}",
+                        this.jobId,
+                        flinkJobId);
+                this.jobId = flinkJobId;
+                this.schemaCoordinator = 
SchemaCoordinator.getOrCreateInstance(flinkJobId);
+            }
+        } catch (Exception e) {
+            log.warn("Failed to get Flink jobId, using SeaTunnel jobId: {}", 
this.jobId, e);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<SeaTunnelRow> streamRecord) throws 
Exception {
+        SeaTunnelRow element = streamRecord.getValue();
+
+        if (!isSchemaEvolutionEnabled(pluginConfig)) {
+            output.collect(new StreamRecord<>(element, 
streamRecord.getTimestamp()));
+            return;
+        }
+
+        if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
+                && element.getOptions() != null) {
+            Object object = element.getOptions().get("schema_change_event");
+            if (object instanceof SchemaChangeEvent) {
+                handleSchemaChangeEvent((SchemaChangeEvent) object);
+                return;
+            }
+        }
+
+        output.collect(new StreamRecord<>(element, 
streamRecord.getTimestamp()));
+    }
+
+    private void handleSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) 
throws Exception {
+        List<SchemaChangeType> supportedTypes = source.supports();
+        if (supportedTypes == null || supportedTypes.isEmpty()) {
+            log.info(
+                    "Source: {} does not support any schema change types, 
skipping schema change event",
+                    source);
+            return;
+        }
+
+        if (!isSchemaChangeSupported(schemaChangeEvent, supportedTypes)) {
+            log.warn(
+                    "Schema change type {} not supported by source {}, 
skipping",
+                    schemaChangeEvent.getEventType(),
+                    source);
+            return;
+        }
+
+        processSchemaChangeEvent(schemaChangeEvent);
+    }
+
+    private boolean isSchemaEvolutionEnabled(Config pluginConfig) {
+        if (pluginConfig.hasPath("schema-changes.enabled")) {
+            return pluginConfig.getBoolean("schema-changes.enabled");
+        }
+
+        if (pluginConfig.hasPath("debezium")) {
+            Config debeziumConfig = pluginConfig.getConfig("debezium");
+            if (debeziumConfig.hasPath("schema.changes.enabled")) {
+                return debeziumConfig.getBoolean("schema.changes.enabled");
+            }
+        }
+
+        return false;
+    }
+
+    private boolean isSchemaChangeSupported(
+            SchemaChangeEvent event, List<SchemaChangeType> supportedTypes) {
+        switch (event.getEventType()) {
+            case SCHEMA_CHANGE_ADD_COLUMN:
+                return supportedTypes.contains(SchemaChangeType.ADD_COLUMN);
+            case SCHEMA_CHANGE_DROP_COLUMN:
+                return supportedTypes.contains(SchemaChangeType.DROP_COLUMN);
+            case SCHEMA_CHANGE_MODIFY_COLUMN:
+                return supportedTypes.contains(SchemaChangeType.UPDATE_COLUMN);
+            case SCHEMA_CHANGE_CHANGE_COLUMN:
+                return supportedTypes.contains(SchemaChangeType.RENAME_COLUMN);
+            case SCHEMA_CHANGE_UPDATE_COLUMNS:
+                return supportedTypes.contains(SchemaChangeType.ADD_COLUMN)
+                        || 
supportedTypes.contains(SchemaChangeType.DROP_COLUMN)
+                        || 
supportedTypes.contains(SchemaChangeType.UPDATE_COLUMN)
+                        || 
supportedTypes.contains(SchemaChangeType.RENAME_COLUMN);
+            default:
+                log.error("Unknown schema change event type: {}", 
event.getEventType());
+                throw SchemaValidationException.unsupportedChangeType(
+                        event.tableIdentifier(), jobId);
+        }
+    }
+
+    private void processSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) 
throws Exception {
+        TableIdentifier tableId = schemaChangeEvent.tableIdentifier();
+        long eventTime = schemaChangeEvent.getCreatedTime();
+
+        if (lastProcessedEventTime != null && eventTime <= 
lastProcessedEventTime) {
+            throw SchemaValidationException.outdatedEvent(
+                    tableId, jobId, eventTime, lastProcessedEventTime);
+        }
+
+        // set the jobId for the schema change event to ensure proper 
coordination
+        if (schemaChangeEvent instanceof TableEvent) {
+            schemaChangeEvent.setJobId(jobId);
+        }
+
+        log.info(
+                "Handling schema change event for table: {}, job: {}, event 
time: {}",
+                tableId,
+                jobId,
+                eventTime);
+
+        try {
+            CompletableFuture<SchemaResponse> schemaChangeFuture =
+                    schemaCoordinator.requestSchemaChange(
+                            tableId, jobId, 
schemaChangeEvent.getChangeAfter(), 1);
+            currentSchemaChangeFuture.set(schemaChangeFuture);
+            sendFlushEventToDownstream(schemaChangeEvent);
+
+            SchemaResponse response;
+            try {
+                response = schemaChangeFuture.get(60, TimeUnit.SECONDS);
+            } catch (TimeoutException e) {
+                log.error(
+                        "Schema change timeout for table: {}, directly failing 
the job",
+                        tableId,
+                        e);
+                throw SchemaCoordinationException.timeout(tableId, jobId, 60, 
e);
+            }
+
+            if (!response.isSuccess()) {
+                log.error(
+                        "Schema change failed for table: {}, directly failing 
the job. Error: {}",
+                        tableId,
+                        response.getMessage());
+                throw new SchemaCoordinationException(
+                        
SchemaEvolutionErrorCode.SCHEMA_CHANGE_COORDINATION_FAILED,
+                        "Schema change coordination failed: " + 
response.getMessage(),
+                        tableId,
+                        jobId);
+            }
+
+            sendSchemaChangeEventToDownstream(schemaChangeEvent);
+            updateLocalSchemaState(tableId, 
schemaChangeEvent.getChangeAfter());
+
+            lastProcessedEventTime = eventTime;
+            log.info("Schema change completed successfully for table: {}", 
tableId);
+        } catch (Exception e) {
+            log.error("Schema change failed for table: {}, directly failing 
the job", tableId, e);
+            throw e;
+        } finally {
+            currentSchemaChangeFuture.set(null);
+        }
+    }
+
+    private void sendFlushEventToDownstream(SchemaChangeEvent 
schemaChangeEvent) {
+        log.info("Send FlushEvent to downstream...");
+        FlushEvent flushEvent = new FlushEvent(schemaChangeEvent);
+
+        SeaTunnelRow flushRow = new SeaTunnelRow(0);
+        Map<String, Object> options = new HashMap<>();
+        options.put("flush_event", flushEvent);
+        flushRow.setOptions(options);
+
+        output.collect(new StreamRecord<>(flushRow));
+        log.info(
+                "FlushEvent sent to downstream for table: {}", 
schemaChangeEvent.tableIdentifier());
+    }
+
+    private void sendSchemaChangeEventToDownstream(SchemaChangeEvent 
schemaChangeEvent) {
+        log.info("Send SchemaChangeEvent to downstream...");
+
+        SeaTunnelRow schemaChangeRow = new SeaTunnelRow(0);
+        Map<String, Object> options = new HashMap<>();
+        options.put("schema_change_event", schemaChangeEvent);
+        schemaChangeRow.setOptions(options);
+
+        output.collect(new StreamRecord<>(schemaChangeRow));
+    }
+
+    private void updateLocalSchemaState(TableIdentifier tableId, CatalogTable 
newSchema) {
+        if (newSchema != null) {
+            localSchemaState.put(tableId, newSchema);
+            log.debug("Updated local schema state for table: {}", tableId);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            CompletableFuture<SchemaResponse> future = 
currentSchemaChangeFuture.get();
+            if (future != null && !future.isDone()) {
+                log.info("Cancelling ongoing schema change request during 
close");
+                future.cancel(true);
+            }
+
+            if (jobId != null) {
+                SchemaCoordinator.removeInstance(jobId);
+                log.info("Removed SchemaCoordinator instance for job: {}", 
jobId);
+            }
+        } catch (Exception e) {
+            log.warn("Error during SchemaOperator cleanup", e);
+        } finally {
+            super.close();
+        }
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index fbf3ea4098..f3ea44cbf0 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -122,12 +123,24 @@ public class SinkExecuteProcessor
         DataStreamTableInfo input = 
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
         Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
                 sinkPluginDiscovery::createPluginInstance;
+
         for (int i = 0; i < plugins.size(); i++) {
             Optional<? extends Factory> factory = plugins.get(i);
             Config sinkConfig = pluginConfigs.get(i);
             DataStreamTableInfo stream =
                     fromSourceTable(sinkConfig, 
upstreamDataStreams).orElse(input);
             Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+
+            // calculate sink parallelism
+            boolean sinkParallelism = 
sinkConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
+            boolean envParallelism = 
envConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
+            int parallelism =
+                    sinkParallelism
+                            ? 
sinkConfig.getInt(EnvCommonOptions.PARALLELISM.key())
+                            : envParallelism
+                                    ? 
envConfig.getInt(EnvCommonOptions.PARALLELISM.key())
+                                    : 1;
+
             for (CatalogTable catalogTable : stream.getCatalogTables()) {
                 SeaTunnelSink sink =
                         FactoryUtil.createAndPrepareSink(
@@ -139,20 +152,20 @@ public class SinkExecuteProcessor
                                 ((TableSinkFactory) (factory.orElse(null))));
                 sink.setJobContext(jobContext);
                 handleSaveMode(sink);
+
+                // check if sink supports schema evolution
+                if (sink instanceof SupportSchemaEvolutionSink) {
+                    parallelism = 1;
+                    sinkParallelism = true;
+                }
+
                 TableIdentifier tableId = catalogTable.getTableId();
                 sinks.put(tableId.toTablePath(), sink);
             }
             SeaTunnelSink sink =
                     tryGenerateMultiTableSink(
                             sinks, ReadonlyConfig.fromConfig(sinkConfig), 
classLoader);
-            boolean sinkParallelism = 
sinkConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
-            boolean envParallelism = 
envConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
-            int parallelism =
-                    sinkParallelism
-                            ? 
sinkConfig.getInt(EnvCommonOptions.PARALLELISM.key())
-                            : envParallelism
-                                    ? 
envConfig.getInt(EnvCommonOptions.PARALLELISM.key())
-                                    : 1;
+
             DataStreamSink<SeaTunnelRow> dataStreamSink =
                     stream.getDataStream()
                             .sinkTo(
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 182592e9ca..219e5741f9 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.options.EnvCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -38,6 +39,8 @@ import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDisc
 import org.apache.seatunnel.translation.flink.source.FlinkSource;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -89,11 +92,33 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
                 int parallelism = 
pluginConfig.getInt(EnvCommonOptions.PARALLELISM.key());
                 sourceStream.setParallelism(parallelism);
             }
-            sources.add(
-                    new DataStreamTableInfo(
-                            sourceStream,
-                            sourceTableInfo.getCatalogTables(),
-                            
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+
+            // add schema evolution functionality to cdc source
+            DataStream<SeaTunnelRow> evolvedStream = null;
+            if (sourceTableInfo.getSource() instanceof SupportSchemaEvolution) 
{
+                evolvedStream =
+                        sourceStream.transform(
+                                "schema-evolution",
+                                TypeInformation.of(SeaTunnelRow.class),
+                                new SchemaOperator(
+                                        jobContext.getJobId(),
+                                        (SupportSchemaEvolution) 
sourceTableInfo.getSource(),
+                                        pluginConfig));
+            }
+
+            if (evolvedStream != null) {
+                sources.add(
+                        new DataStreamTableInfo(
+                                evolvedStream,
+                                sourceTableInfo.getCatalogTables(),
+                                
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+            } else {
+                sources.add(
+                        new DataStreamTableInfo(
+                                sourceStream,
+                                sourceTableInfo.getCatalogTables(),
+                                
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+            }
         }
         return sources;
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
new file mode 100644
index 0000000000..7b960bf13c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK},
+        disabledReason =
+                "Currently SPARK do not support cdc. In addition, the Flink 
engine only supports configuring single parallelism in the configuration file, 
and currently does not support multi-parallelism scenarios.")
+public class MysqlCDCWithFlinkSchemaChangeIT extends TestSuiteBase implements 
TestResource {
+    private static final String MYSQL_DATABASE = "shop";
+    private static final String SOURCE_TABLE = "products";
+    private static final String SINK_TABLE = 
"mysql_cdc_e2e_sink_table_with_schema_change";
+    private static final String SINK_TABLE2 =
+            "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once";
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+    private static final String QUERY = "select * from %s.%s";
+    private static final String DESC = "desc %s.%s";
+    private static final String PROJECTION_QUERY =
+            "select 
id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s;";
+
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase shopDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", 
MYSQL_DATABASE);
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+    }
+
+    private String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+    }
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib 
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @Order(1)
+    @TestTemplate
+    public void testMysqlCdcWithSchemaEvolutionCase(TestContainer container) {
+        // Reset database to initial state to avoid issues from previous test 
runs
+        resetDatabaseToInitialState();
+
+        String jobConfigFile = 
"/mysqlcdc_to_mysql_with_flink_schema_change.conf";
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(jobConfigFile);
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // waiting for case1 completed
+        assertSchemaEvolutionForAddColumns(MYSQL_DATABASE, SOURCE_TABLE, 
SINK_TABLE);
+
+        // case2 drop columns with cdc data at same time
+        shopDatabase.setTemplateName("drop_columns").createAndInitialize();
+
+        // waiting for case2 completed
+        assertTableStructureAndData(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE);
+
+        // case3 change column name with cdc data at same time
+        shopDatabase.setTemplateName("change_columns").createAndInitialize();
+
+        // case4 modify column data type with cdc data at same time
+        shopDatabase.setTemplateName("modify_columns").createAndInitialize();
+
+        // waiting for case3/case4 completed
+        assertTableStructureAndData(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE);
+    }
+
+    private void assertSchemaEvolutionForAddColumns(
+            String database, String sourceTable, String sinkTable) {
+        await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertIterableEquals(
+                                        query(String.format(QUERY, database, 
sourceTable)),
+                                        query(String.format(QUERY, database, 
sinkTable))));
+
+        // case1 add columns with cdc data at same time
+        shopDatabase.setTemplateName("add_columns").createAndInitialize();
+        await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertIterableEquals(
+                                        query(String.format(DESC, database, 
sourceTable)),
+                                        query(String.format(DESC, database, 
sinkTable))));
+        await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    query(
+                                            String.format(QUERY, database, 
sourceTable)
+                                                    + " where id >= 128"),
+                                    query(
+                                            String.format(QUERY, database, 
sinkTable)
+                                                    + " where id >= 128"));
+
+                            Assertions.assertIterableEquals(
+                                    query(String.format(PROJECTION_QUERY, 
database, sourceTable)),
+                                    query(String.format(PROJECTION_QUERY, 
database, sinkTable)));
+
+                            // The default value of add_column4 is 
current_timestamp(),so the
+                            // history data of sink table with this column may 
be different from the
+                            // source table because delay of apply schema 
change.
+                            String query =
+                                    String.format(
+                                            "SELECT t1.id AS table1_id, 
t1.add_column4 AS table1_timestamp, "
+                                                    + "t2.id AS table2_id, 
t2.add_column4 AS table2_timestamp, "
+                                                    + 
"ABS(TIMESTAMPDIFF(SECOND, t1.add_column4, t2.add_column4)) AS time_diff "
+                                                    + "FROM %s.%s t1 "
+                                                    + "INNER JOIN %s.%s t2 ON 
t1.id = t2.id",
+                                            database, sourceTable, database, 
sinkTable);
+                            try (Connection jdbcConnection = 
getJdbcConnection();
+                                    Statement statement = 
jdbcConnection.createStatement();
+                                    ResultSet resultSet = 
statement.executeQuery(query); ) {
+                                while (resultSet.next()) {
+                                    int timeDiff = 
resultSet.getInt("time_diff");
+                                    Assertions.assertTrue(
+                                            timeDiff <= 3,
+                                            "Time difference exceeds 3 
seconds: "
+                                                    + timeDiff
+                                                    + " seconds");
+                                }
+                            }
+                        });
+    }
+
+    private void assertTableStructureAndData(
+            String database, String sourceTable, String sinkTable) {
+        await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertIterableEquals(
+                                        query(String.format(DESC, database, 
sourceTable)),
+                                        query(String.format(DESC, database, 
sinkTable))));
+        await().atMost(180000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertIterableEquals(
+                                        query(String.format(QUERY, database, 
sourceTable)),
+                                        query(String.format(QUERY, database, 
sinkTable))));
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(),
+                MYSQL_CONTAINER.getPassword());
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        log.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("Mysql Containers are started");
+        shopDatabase.createAndInitialize();
+        log.info("Mysql ddl execution is complete");
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.close();
+        }
+    }
+
+    private void resetDatabaseToInitialState() {
+        try {
+            log.info("Resetting database to initial state...");
+            // Reset to original template and recreate database
+            shopDatabase.setTemplateName(MYSQL_DATABASE).createAndInitialize();
+            log.info("Database reset to initial state completed");
+        } catch (Exception e) {
+            log.error("Failed to reset database to initial state", e);
+            throw new RuntimeException("Failed to reset database to initial 
state", e);
+        }
+    }
+
+    private List<List<Object>> query(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+            List<List<Object>> result = new ArrayList<>();
+            int columnCount = resultSet.getMetaData().getColumnCount();
+            while (resultSet.next()) {
+                ArrayList<Object> objects = new ArrayList<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    objects.add(resultSet.getObject(i));
+                }
+                log.debug(String.format("Print MySQL-CDC query, sql: %s, data: 
%s", sql, objects));
+                result.add(objects);
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
new file mode 100644
index 0000000000..17c3d05a4f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
@@ -0,0 +1,60 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  $DBNAME$ (schema_evolution_test)
+-- 
----------------------------------------------------------------------------------------------------------------
+
+-- Create the initial products table for schema evolution testing
+CREATE TABLE products (
+  id INT PRIMARY KEY,
+  name VARCHAR(100) NOT NULL,
+  description TEXT,
+  weight DECIMAL(10,2),
+  price DECIMAL(10,2) NOT NULL
+);
+
+-- Create products_on_hand table for testing
+CREATE TABLE products_on_hand (
+  product_id INTEGER NOT NULL PRIMARY KEY,
+  quantity INTEGER NOT NULL,
+  FOREIGN KEY (product_id) REFERENCES products(id)
+);
+
+-- Insert initial test data
+INSERT INTO products VALUES 
+(101, 'scooter', 'Small 2-wheel scooter', 3.14, 15.99),
+(102, 'car battery', '12V car battery', 8.1, 89.99),
+(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 
to #3', 0.8, 19.99),
+(104, 'hammer', '12oz carpenter''s hammer', 0.75, 12.99),
+(105, 'hammer', '14oz carpenter''s hammer', 0.875, 14.99),
+(106, 'hammer', '16oz carpenter''s hammer', 1.0, 16.99),
+(107, 'rocks', 'box of assorted rocks', 5.3, 9.99),
+(108, 'jacket', 'water resistent black wind breaker', 0.1, 39.99),
+(109, 'spare tire', '24 inch spare tire', 22.2, 99.99);
+
+-- Insert initial inventory data
+INSERT INTO products_on_hand VALUES 
+(101, 3),
+(102, 8),
+(103, 18),
+(104, 4),
+(105, 5),
+(106, 0),
+(107, 44),
+(108, 2),
+(109, 5);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
new file mode 100644
index 0000000000..8175e82879
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
@@ -0,0 +1,29 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  $DBNAME$ (schema_evolution_test) - RENAME COLUMNS
+-- 
----------------------------------------------------------------------------------------------------------------
+
+-- Rename columns in products table
+ALTER TABLE products CHANGE COLUMN description product_description TEXT;
+ALTER TABLE products CHANGE COLUMN weight product_weight DECIMAL(10,2);
+
+-- Insert additional test data to verify rename functionality
+INSERT INTO products VALUES 
+(110, 'tablet', 'Android tablet with 10-inch screen', 1.2, 299.99),
+(111, 'keyboard', 'Wireless bluetooth keyboard', 0.8, 59.99);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
new file mode 100644
index 0000000000..9dd595f547
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_sink"
+    password = "mysqlpw"
+    generate_sink_sql = true
+    database = shop
+    table = mysql_cdc_e2e_sink_table_with_schema_change
+    primary_keys = ["id"]
+  }
+}
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
new file mode 100644
index 0000000000..ca7d7b515c
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652
+    username = mysqluser
+    password = mysqlpw
+    table-names = ["schema_test.evolving_table"]
+    base-url = "jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/schema_test"
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/sink_test"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = mysqluser
+    password = mysqlpw
+    generate_sink_sql = true
+    database = sink_test
+    table = evolving_table
+    primary_keys = ["id"]
+    xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+    
+    # Schema evolution configuration
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
new file mode 100644
index 0000000000..6749389427
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5653
+    username = mysqluser
+    password = mysqlpw
+    table-names = ["schema_test.evolving_table"]
+    base-url = 
"jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/schema_test?useSSL=false&serverTimezone=UTC"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  Console {
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 3ee6e3533c..b5dd8567e0 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -23,7 +23,12 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
 import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import 
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.flink.api.connector.sink.Sink;
@@ -35,6 +40,7 @@ import java.io.IOException;
 import java.io.InvalidClassException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -89,6 +95,60 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
             return;
         }
         if (element instanceof SeaTunnelRow) {
+            SeaTunnelRow seaTunnelRow = (SeaTunnelRow) element;
+            Map<String, Object> options = seaTunnelRow.getOptions();
+
+            if (options != null && options.containsKey("flush_event")) {
+                FlushEvent flushEvent = (FlushEvent) 
options.get("flush_event");
+                log.info(
+                        "FlinkSinkWriter detected FlushEvent for table: {}",
+                        flushEvent.tableIdentifier());
+
+                if (sinkWriter instanceof SupportSchemaEvolutionSinkWriter) {
+                    try {
+                        ((SupportSchemaEvolutionSinkWriter) sinkWriter)
+                                .handleFlushEvent(flushEvent);
+                        log.info(
+                                "FlinkSinkWriter handled FlushEvent for table: 
{}",
+                                flushEvent.tableIdentifier());
+                    } catch (Exception e) {
+                        log.error("Failed to handle flush event", e);
+                        throw new SinkWriterSchemaException(
+                                
SchemaEvolutionErrorCode.FLUSH_EVENT_PROCESSING_FAILED,
+                                "Failed to handle flush event in Flink sink 
writer",
+                                flushEvent.tableIdentifier(),
+                                flushEvent.getJobId(),
+                                e);
+                    }
+                }
+            }
+
+            if (options != null && options.containsKey("schema_change_event")) 
{
+                SchemaChangeEvent schemaChangeEvent =
+                        (SchemaChangeEvent) options.get("schema_change_event");
+                log.info(
+                        "FlinkSinkWriter detected SchemaChangeEvent for table: 
{}",
+                        schemaChangeEvent.tableIdentifier());
+
+                if (sinkWriter instanceof SupportSchemaEvolutionSinkWriter) {
+                    try {
+                        ((SupportSchemaEvolutionSinkWriter) sinkWriter)
+                                .applySchemaChange(schemaChangeEvent);
+                        log.info(
+                                "FlinkSinkWriter applied SchemaChangeEvent for 
table: {}",
+                                schemaChangeEvent.tableIdentifier());
+                    } catch (Exception e) {
+                        log.error("Failed to apply schema change", e);
+                        throw new SinkWriterSchemaException(
+                                
SchemaEvolutionErrorCode.SCHEMA_EVENT_PROCESSING_FAILED,
+                                "Failed to apply schema change in Flink sink 
writer",
+                                schemaChangeEvent.tableIdentifier(),
+                                schemaChangeEvent.getJobId(),
+                                e);
+                    }
+                }
+            }
+
             sinkWriter.write((SeaTunnelRow) element);
             sinkWriteCount.inc();
             sinkWriteBytes.inc(((SeaTunnelRow) element).getBytesSize());
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index c7eadfcfb7..355a0f7b2a 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.metrics.Meter;
 import org.apache.seatunnel.api.common.metrics.MetricNames;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
@@ -32,6 +33,8 @@ import org.apache.flink.api.connector.source.ReaderOutput;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.HashMap;
+
 /** The implementation of {@link Collector} for flink engine. */
 @Slf4j
 public class FlinkRowCollector implements Collector<SeaTunnelRow> {
@@ -69,6 +72,16 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
         }
     }
 
+    @Override
+    public void collect(SchemaChangeEvent event) {
+        SeaTunnelRow eventRow = new SeaTunnelRow(0);
+        eventRow.setTableId("__SCHEMA_CHANGE_EVENT__");
+        HashMap<String, Object> options = new HashMap<>();
+        options.put("schema_change_event", event);
+        eventRow.setOptions(options);
+        readerOutput.collect(eventRow);
+    }
+
     @Override
     public Object getCheckpointLock() {
         return this;

Reply via email to