This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3473e2d7f1 [Feature][Flink] Support CDC source schema evolution on
Flink engine (#9867)
3473e2d7f1 is described below
commit 3473e2d7f11b35abbd6e84a2c0ba4da1e7c646c0
Author: CloverDew <[email protected]>
AuthorDate: Thu Oct 23 11:27:49 2025 +0800
[Feature][Flink] Support CDC source schema evolution on Flink engine (#9867)
---
.../org/apache/seatunnel/api/event/EventType.java | 1 +
.../api/sink/SupportSchemaEvolutionSinkWriter.java | 46 +++
.../sink/multitablesink/MultiTableSinkWriter.java | 29 ++
.../api/table/coordinator/SchemaCoordinator.java | 329 +++++++++++++++++++++
.../api/table/coordinator/SchemaResponse.java | 53 ++++
.../api/table/schema/event/FlushEvent.java | 63 ++++
.../exception/SchemaCoordinationException.java | 80 +++++
.../schema/exception/SchemaEvolutionErrorCode.java | 74 +++++
.../schema/exception/SchemaEvolutionException.java | 84 ++++++
.../exception/SchemaValidationException.java | 82 +++++
.../exception/SinkWriterSchemaException.java | 78 +++++
.../seatunnel/console/sink/ConsoleSinkWriter.java | 59 +++-
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 42 +++
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 38 +++
.../starter/flink/execution/SchemaOperator.java | 283 ++++++++++++++++++
.../flink/execution/SinkExecuteProcessor.java | 29 +-
.../flink/execution/SourceExecuteProcessor.java | 35 ++-
.../cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java | 280 ++++++++++++++++++
.../src/test/resources/ddl/initial.sql | 60 ++++
.../src/test/resources/ddl/rename_columns.sql | 29 ++
...mysqlcdc_to_mysql_with_flink_schema_change.conf | 53 ++++
.../mysqlcdc_to_mysql_schema_evolution.conf | 52 ++++
.../resources/examples/schema_evolution_test.conf | 42 +++
.../translation/flink/sink/FlinkSinkWriter.java | 60 ++++
.../flink/source/FlinkRowCollector.java | 13 +
25 files changed, 1978 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
index 26ae749f0b..3272f65419 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java
@@ -31,4 +31,5 @@ public enum EventType {
LIFECYCLE_WRITER_CLOSE,
READER_MESSAGE_DELAYED,
JOB_STATUS,
+ SCHEMA_CHANGE_FLUSH,
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
index 54727ec950..47c3839465 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import java.io.IOException;
@@ -30,4 +32,48 @@ public interface SupportSchemaEvolutionSinkWriter {
* @throws IOException
*/
void applySchemaChange(SchemaChangeEvent event) throws IOException;
+
+ /**
+ * handle FlushEvent propagated from upstream
+ *
+ * @param event
+ * @throws IOException
+ */
+ default void handleFlushEvent(FlushEvent event) throws IOException {
+ flushData();
+ sendFlushSuccessful(event);
+ }
+
+ /**
+ * send success event to coordinator upon successful flash
+ *
+ * @param event
+ * @throws IOException
+ */
+ default void sendFlushSuccessful(FlushEvent event) throws IOException {
+ SchemaCoordinator coordinator = getSchemaCoordinator();
+ if (coordinator == null && event != null && event.getJobId() != null) {
+ coordinator =
SchemaCoordinator.getOrCreateInstance(event.getJobId());
+ }
+
+ if (coordinator != null) {
+ coordinator.notifyFlushSuccessful(event.getJobId(),
event.tableIdentifier());
+ }
+ }
+
+ /**
+ * Get the schema coordinator instance for reporting flush completion
+ *
+ * @return the schema coordinator instance, or null if not available
+ */
+ default SchemaCoordinator getSchemaCoordinator() {
+ return null;
+ }
+
+ /**
+ * flush data to other system
+ *
+ * @throws IOException
+ */
+ default void flushData() throws IOException {}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index e05cf4cb8b..3721491ea9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -175,8 +176,36 @@ public class MultiTableSinkWriter
}
}
+ @Override
+ public void handleFlushEvent(FlushEvent event) throws IOException {
+ subSinkErrorCheck();
+ String targetTableId =
event.tableIdentifier().toTablePath().getFullName();
+ for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
+ for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>>
sinkWriterEntry :
+ sinkWritersWithIndex.get(i).entrySet()) {
+ if
(sinkWriterEntry.getKey().getTableIdentifier().equals(targetTableId)) {
+ synchronized (runnable.get(i)) {
+ SinkWriter<SeaTunnelRow, ?, ?> sink =
sinkWriterEntry.getValue();
+ if (sink instanceof SupportSchemaEvolutionSinkWriter) {
+ ((SupportSchemaEvolutionSinkWriter)
sink).handleFlushEvent(event);
+ }
+ }
+ return;
+ }
+ }
+ }
+ }
+
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (element != null && element.getOptions() != null) {
+ if (element.getOptions().containsKey("flush_event")
+ ||
element.getOptions().containsKey("schema_change_event")) {
+ log.debug("Skipping schema change event row: {}",
element.getOptions().keySet());
+ return;
+ }
+ }
+
if (!submitted) {
submitted = true;
runnable.forEach(executorService::submit);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
new file mode 100644
index 0000000000..5b2b54d80e
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaCoordinator.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.coordinator;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaCoordinationException;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaValidationException;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** global unified coordinator for handling schema changes */
+@Slf4j
+public class SchemaCoordinator implements Serializable {
+
+ private static final Map<String, SchemaCoordinator> JOB_COORDINATORS =
+ new ConcurrentHashMap<>();
+ private final Map<TableIdentifier, SchemaChangeState> schemaChangeStates;
+ private final Map<TableIdentifier, Long> schemaVersions;
+ private final Map<TableIdentifier, ReentrantLock> schemaLocks;
+ private final Map<TableIdentifier, Map<String,
CompletableFuture<SchemaResponse>>>
+ pendingRequests;
+ private final Map<TableIdentifier, Long> latestSchemaChangeTime = new
ConcurrentHashMap<>();
+
+ public SchemaCoordinator() {
+ this.schemaChangeStates = new ConcurrentHashMap<>();
+ this.schemaVersions = new ConcurrentHashMap<>();
+ this.schemaLocks = new ConcurrentHashMap<>();
+ this.pendingRequests = new ConcurrentHashMap<>();
+ }
+
+ public static SchemaCoordinator getOrCreateInstance(String jobId) {
+ return JOB_COORDINATORS.computeIfAbsent(
+ jobId,
+ k -> {
+ log.info("Creating new SchemaCoordinator instance for job:
{}", jobId);
+ return new SchemaCoordinator();
+ });
+ }
+
+ public static void removeInstance(String jobId) {
+ SchemaCoordinator removed = JOB_COORDINATORS.remove(jobId);
+ if (removed != null) {
+ log.info("Removed SchemaCoordinator instance for job: {}", jobId);
+ }
+ }
+
+ public void notifyFlushSuccessful(String jobId, TableIdentifier tableId) {
+ log.info("Received flush notification: jobId={}, tableId={}", jobId,
tableId);
+
+ ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ SchemaChangeState state = schemaChangeStates.get(tableId);
+ if (state == null) {
+ log.warn("No schema change state found for table: {}",
tableId);
+ log.info("Available schema change states: {}",
schemaChangeStates.keySet());
+ return;
+ }
+
+ if (!state.getJobId().equals(jobId)) {
+ log.warn(
+ "Job ID mismatch: received jobId={}, expected
jobId={}, tableId={}",
+ jobId,
+ state.getJobId(),
+ tableId);
+ return;
+ }
+
+ // update the schema change status
+ state.incrementFlushCount();
+ log.info(
+ "Received flush successful notification for table {} ({}),
count: {}/{}",
+ tableId,
+ jobId,
+ state.getFlushCount(),
+ state.getTotalOperators());
+
+ if (state.isAllFlushed()) {
+ // all operators have completed flushing. The schema change
can now be finalized
+ log.info("All operators flushed for table {}, completing
schema change", tableId);
+ completeSchemaChange(tableId, jobId);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public CompletableFuture<SchemaResponse> requestSchemaChange(
+ TableIdentifier tableId, String jobId, CatalogTable newSchema, int
totalOperators) {
+
+ ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ SchemaChangeState existingState = schemaChangeStates.get(tableId);
+ if (existingState != null && existingState.getJobId() != null) {
+ log.warn(
+ "Schema change already in progress for table: {},
existing jobId: {}, new jobId: {}",
+ tableId,
+ existingState.getJobId(),
+ jobId);
+ return CompletableFuture.completedFuture(
+ SchemaResponse.failure(
+ new SchemaCoordinationException(
+ SchemaEvolutionErrorCode
+
.SCHEMA_CHANGE_ALREADY_IN_PROGRESS,
+ "Schema change already in
progress",
+ tableId,
+ jobId)
+ .getMessage()));
+ }
+
+ Long lastProcessedTime = latestSchemaChangeTime.get(tableId);
+ if (lastProcessedTime != null && newSchema != null) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime <= lastProcessedTime) {
+ log.warn(
+ "Received outdated schema change event for table:
{}, current time: {}, last processed: {}",
+ tableId,
+ currentTime,
+ lastProcessedTime);
+ return CompletableFuture.completedFuture(
+ SchemaResponse.failure(
+ new SchemaValidationException(
+
SchemaEvolutionErrorCode.OUTDATED_SCHEMA_EVENT,
+ "Schema change event is
outdated",
+ tableId,
+ jobId)
+ .getMessage()));
+ }
+ }
+
+ if (newSchema != null && !isValidSchema(newSchema)) {
+ log.error("Invalid schema structure for table: {}", tableId);
+ return CompletableFuture.completedFuture(
+ SchemaResponse.failure(
+ new SchemaValidationException(
+
SchemaEvolutionErrorCode.INVALID_SCHEMA_STRUCTURE,
+ "Invalid schema structure",
+ tableId,
+ jobId)
+ .getMessage()));
+ }
+ log.info("Processing schema change for table: {}, job: {}",
tableId, jobId);
+
+ CatalogTable currentSchema = null;
+ if (existingState != null) {
+ currentSchema = existingState.getCurrentSchema();
+ }
+
+ SchemaChangeState state =
+ new SchemaChangeState(currentSchema, newSchema, jobId,
totalOperators);
+
+ schemaChangeStates.put(tableId, state);
+ log.info(
+ "SchemaChangeStates tableId: {}, state: {}",
+ tableId,
+ schemaChangeStates.get(tableId));
+
+ // update the latest schema change time
+ if (newSchema != null) {
+ latestSchemaChangeTime.put(tableId,
System.currentTimeMillis());
+ }
+
+ return waitForSchemaChangeConfirmation(tableId, jobId);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public CompletableFuture<SchemaResponse> waitForSchemaChangeConfirmation(
+ TableIdentifier tableId, String jobId) {
+ CompletableFuture<SchemaResponse> future = new CompletableFuture<>();
+
+ // add the request to the waiting queue
+ pendingRequests.computeIfAbsent(tableId, k -> new
ConcurrentHashMap<>()).put(jobId, future);
+ return future;
+ }
+
+ private void completeSchemaChange(TableIdentifier tableId, String jobId) {
+ ReentrantLock lock = schemaLocks.computeIfAbsent(tableId, k -> new
ReentrantLock());
+ lock.lock();
+ try {
+ SchemaChangeState state = schemaChangeStates.get(tableId);
+ if (state == null) {
+ log.warn("No schema change state found for table: {} during
completion", tableId);
+ return;
+ }
+
+ if (!jobId.equals(state.getJobId())) {
+ log.warn(
+ "Job ID mismatch during completion: expected={},
actual={}, table={}",
+ jobId,
+ state.getJobId(),
+ tableId);
+ return;
+ }
+
+ if (!state.isAllFlushed()) {
+ log.warn(
+ "Attempting to complete schema change but not all
operators have flushed: {}/{} for table {}",
+ state.getFlushCount(),
+ state.getTotalOperators(),
+ tableId);
+ return;
+ }
+
+ long newVersion = schemaVersions.getOrDefault(tableId, 0L) + 1;
+ schemaVersions.put(tableId, newVersion);
+
+ SchemaResponse response =
SchemaResponse.success(state.getNewSchema(), newVersion);
+
+ Map<String, CompletableFuture<SchemaResponse>> tableRequests =
+ pendingRequests.get(tableId);
+ if (tableRequests != null) {
+ int completedCount = 0;
+ for (CompletableFuture<SchemaResponse> future :
tableRequests.values()) {
+ if (!future.isDone()) {
+ future.complete(response);
+ completedCount++;
+ }
+ }
+ log.info(
+ "Completed {} pending schema change requests for table
{}",
+ completedCount,
+ tableId);
+ tableRequests.clear();
+ }
+
+ state.setCurrentSchema(state.getNewSchema());
+ state.setNewSchema(null);
+ state.setJobId(null);
+ state.resetFlushCount();
+
+ latestSchemaChangeTime.remove(tableId);
+ log.info("Schema change completed for table {}, version: {}",
tableId, newVersion);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean isValidSchema(CatalogTable schema) {
+ if (schema == null) {
+ return false;
+ }
+
+ if (schema.getTableId() == null) {
+ log.error("Schema has null table identifier");
+ return false;
+ }
+
+ if (schema.getTableSchema() == null) {
+ log.error("Schema has null table schema");
+ return false;
+ }
+
+ if (schema.getTableSchema().getColumns().isEmpty()) {
+ log.error("Schema has no columns");
+ return false;
+ }
+
+ return true;
+ }
+
+ @Getter
+ @Setter
+ @ToString
+ private static class SchemaChangeState {
+ private CatalogTable currentSchema;
+ private CatalogTable newSchema;
+ private String jobId;
+ private final AtomicInteger flushCount;
+ private final int totalOperators;
+
+ public SchemaChangeState(
+ CatalogTable currentSchema,
+ CatalogTable newSchema,
+ String jobId,
+ int totalOperators) {
+ this.currentSchema = currentSchema;
+ this.newSchema = newSchema;
+ this.jobId = jobId;
+ this.flushCount = new AtomicInteger(0);
+ this.totalOperators = totalOperators;
+ }
+
+ public int getFlushCount() {
+ return flushCount.get();
+ }
+
+ public void incrementFlushCount() {
+ flushCount.incrementAndGet();
+ }
+
+ public void resetFlushCount() {
+ flushCount.set(0);
+ }
+
+ public boolean isAllFlushed() {
+ return flushCount.get() >= totalOperators;
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
new file mode 100644
index 0000000000..62cd458292
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/coordinator/SchemaResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.coordinator;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+
+/** used to return change response information */
+@Getter
+public class SchemaResponse {
+ private final boolean success;
+ private final String message;
+ private final CatalogTable currentSchema;
+ private final Long schemaVersion;
+
+ private SchemaResponse(
+ boolean success, String message, CatalogTable currentSchema, Long
schemaVersion) {
+ this.success = success;
+ this.message = message;
+ this.currentSchema = currentSchema;
+ this.schemaVersion = schemaVersion;
+ }
+
+ public static SchemaResponse success(CatalogTable currentSchema, Long
schemaVersion) {
+ String message = "Schema change completed successfully, current
version: " + schemaVersion;
+ return new SchemaResponse(true, message, currentSchema, schemaVersion);
+ }
+
+ public static SchemaResponse failure(String errorMessage) {
+ return new SchemaResponse(false, errorMessage, null, null);
+ }
+
+ @Deprecated
+ public String getErrorMessage() {
+ return message;
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
new file mode 100644
index 0000000000..449c017eaf
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/FlushEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.event;
+
+import org.apache.seatunnel.api.event.EventType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/** Used for communication between SchemaEvolutionOperator and sink */
+@Getter
+@Setter
+@ToString
+public class FlushEvent implements SchemaChangeEvent {
+ private final TableIdentifier tableIdentifier;
+ @Getter @Setter private CatalogTable changeAfter;
+ @Getter @Setter private String jobId;
+ @Getter @Setter private String statement;
+ @Getter @Setter protected String sourceDialectName;
+ private final Long createdTime;
+ private final SchemaChangeEvent schemaChangeEvent;
+ private final EventType eventType = EventType.SCHEMA_CHANGE_FLUSH;
+
+ public FlushEvent(SchemaChangeEvent schemaChangeEvent) {
+ this.schemaChangeEvent = schemaChangeEvent;
+ this.tableIdentifier = schemaChangeEvent.tableIdentifier();
+ this.jobId = schemaChangeEvent.getJobId();
+ this.createdTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public TableIdentifier tableIdentifier() {
+ return tableIdentifier;
+ }
+
+ @Override
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ @Override
+ public EventType getEventType() {
+ return eventType;
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
new file mode 100644
index 0000000000..70435adf43
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaCoordinationException.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when schema coordination operations fail. This includes
timeout issues,
+ * coordination conflicts, and coordinator state problems.
+ */
+public class SchemaCoordinationException extends SchemaEvolutionException {
+
+ public SchemaCoordinationException(SchemaEvolutionErrorCode errorCode,
String errorMessage) {
+ super(errorCode, errorMessage);
+ }
+
+ public SchemaCoordinationException(
+ SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ }
+
+ public SchemaCoordinationException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId) {
+ super(errorCode, errorMessage, tableIdentifier, jobId);
+ }
+
+ public SchemaCoordinationException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId,
+ Throwable cause) {
+ super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+ }
+
+ /** Create a timeout exception for schema changes */
+ public static SchemaCoordinationException timeout(
+ TableIdentifier tableIdentifier, String jobId, long
timeoutSeconds, Throwable cause) {
+ String message =
+ String.format("Schema change operation timed out after %d
seconds", timeoutSeconds);
+ return new SchemaCoordinationException(
+ SchemaEvolutionErrorCode.SCHEMA_CHANGE_TIMEOUT,
+ message,
+ tableIdentifier,
+ jobId,
+ cause);
+ }
+
+ /** Create an exception for schema change conflicts */
+ public static SchemaCoordinationException conflict(
+ TableIdentifier tableIdentifier, String currentJobId, String
conflictingJobId) {
+ String message =
+ String.format(
+ "Schema change already in progress for table. Current
job: %s, conflicting job: %s",
+ currentJobId, conflictingJobId);
+ return new SchemaCoordinationException(
+ SchemaEvolutionErrorCode.SCHEMA_CHANGE_ALREADY_IN_PROGRESS,
+ message,
+ tableIdentifier,
+ currentJobId);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
new file mode 100644
index 0000000000..08c614b952
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionErrorCode.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum SchemaEvolutionErrorCode implements SeaTunnelErrorCode {
+
+ // Schema Coordination Errors
+ SCHEMA_COORDINATOR_NOT_INITIALIZED("SE-01", "Schema coordinator is not
initialized"),
+ SCHEMA_CHANGE_ALREADY_IN_PROGRESS(
+ "SE-02", "Schema change is already in progress for the table"),
+ SCHEMA_CHANGE_TIMEOUT("SE-03", "Schema change operation timed out"),
+ SCHEMA_CHANGE_COORDINATION_FAILED("SE-04", "Schema change coordination
failed"),
+
+ // Schema Validation Errors
+ INVALID_SCHEMA_STRUCTURE("SE-05", "Invalid schema structure provided"),
+ SCHEMA_INCOMPATIBLE("SE-06", "Schema change is incompatible with current
schema"),
+ OUTDATED_SCHEMA_EVENT("SE-07", "Schema change event is outdated"),
+ UNSUPPORTED_SCHEMA_CHANGE_TYPE("SE-08", "Schema change type is not
supported"),
+
+ // Sink Writer Errors
+ SCHEMA_CHANGE_APPLICATION_FAILED("SE-09", "Failed to apply schema change
to sink writer"),
+ FLUSH_OPERATION_FAILED("SE-10", "Flush operation failed during schema
evolution"),
+ SCHEMA_ROLLBACK_FAILED("SE-11", "Failed to rollback schema change"),
+
+ // Table and Database Errors
+ TABLE_SCHEMA_UPDATE_FAILED("SE-12", "Failed to update table schema in
database"),
+ TABLE_NOT_FOUND("SE-13", "Target table not found"),
+ INSUFFICIENT_PERMISSIONS("SE-14", "Insufficient permissions to modify
table schema"),
+
+ // Event Processing Errors
+ SCHEMA_EVENT_PROCESSING_FAILED("SE-15", "Failed to process schema change
event"),
+ FLUSH_EVENT_PROCESSING_FAILED("SE-16", "Failed to process flush event"),
+ SCHEMA_EVENT_DESERIALIZATION_FAILED("SE-17", "Failed to deserialize schema
change event"),
+
+ // Resource Management Errors
+ RESOURCE_CLEANUP_FAILED("SE-18", "Failed to cleanup resources after schema
change"),
+ CONNECTION_POOL_EXHAUSTED("SE-19", "Database connection pool exhausted
during schema change"),
+ MEMORY_ALLOCATION_FAILED("SE-20", "Memory allocation failed during schema
processing");
+
+ private final String code;
+ private final String description;
+
+ SchemaEvolutionErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
new file mode 100644
index 0000000000..6d864c291a
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaEvolutionException.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import lombok.Getter;
+
+/**
+ * Base exception class for schema evolution related errors. This exception
provides detailed
+ * context about schema evolution failures.
+ */
+@Getter
+public class SchemaEvolutionException extends SeaTunnelRuntimeException {
+
+ private final TableIdentifier tableIdentifier;
+
+ private final String jobId;
+
+ public SchemaEvolutionException(SchemaEvolutionErrorCode errorCode, String
errorMessage) {
+ super(errorCode, errorMessage);
+ this.tableIdentifier = null;
+ this.jobId = null;
+ }
+
+ public SchemaEvolutionException(
+ SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ this.tableIdentifier = null;
+ this.jobId = null;
+ }
+
+ public SchemaEvolutionException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId) {
+ super(errorCode, enrichErrorMessage(errorMessage, tableIdentifier,
jobId));
+ this.tableIdentifier = tableIdentifier;
+ this.jobId = jobId;
+ }
+
+ public SchemaEvolutionException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId,
+ Throwable cause) {
+ super(errorCode, enrichErrorMessage(errorMessage, tableIdentifier,
jobId), cause);
+ this.tableIdentifier = tableIdentifier;
+ this.jobId = jobId;
+ }
+
+ private static String enrichErrorMessage(
+ String originalMessage, TableIdentifier tableIdentifier, String
jobId) {
+ StringBuilder message = new StringBuilder(originalMessage);
+
+ if (tableIdentifier != null) {
+ message.append(" [Table: ").append(tableIdentifier).append("]");
+ }
+
+ if (jobId != null) {
+ message.append(" [Job: ").append(jobId).append("]");
+ }
+
+ return message.toString();
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
new file mode 100644
index 0000000000..e213795d6f
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SchemaValidationException.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when schema validation fails. This includes invalid schema
structures,
+ * incompatible changes, and outdated events.
+ */
+public class SchemaValidationException extends SchemaEvolutionException {
+
+ public SchemaValidationException(SchemaEvolutionErrorCode errorCode,
String errorMessage) {
+ super(errorCode, errorMessage);
+ }
+
+ public SchemaValidationException(
+ SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ }
+
+ public SchemaValidationException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId) {
+ super(errorCode, errorMessage, tableIdentifier, jobId);
+ }
+
+ public SchemaValidationException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId,
+ Throwable cause) {
+ super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+ }
+
+ /** Create an exception for invalid schema structure */
+ public static SchemaValidationException invalidSchema(
+ TableIdentifier tableIdentifier, String jobId, String reason) {
+ String message = String.format("Invalid schema structure: %s", reason);
+ return new SchemaValidationException(
+ SchemaEvolutionErrorCode.INVALID_SCHEMA_STRUCTURE, message,
tableIdentifier, jobId);
+ }
+
+ /** Create an exception for unsupported schema change types */
+ public static SchemaValidationException unsupportedChangeType(
+ TableIdentifier tableIdentifier, String jobId) {
+ return new SchemaValidationException(
+ SchemaEvolutionErrorCode.UNSUPPORTED_SCHEMA_CHANGE_TYPE,
+ "Schema change type '%s' is not supported",
+ tableIdentifier,
+ jobId);
+ }
+
+ /** Create an exception for outdated schema events */
+ public static SchemaValidationException outdatedEvent(
+ TableIdentifier tableIdentifier, String jobId, long eventTime,
long lastProcessedTime) {
+ String message =
+ String.format(
+ "Schema change event is outdated. Event time: %d, last
processed: %d",
+ eventTime, lastProcessedTime);
+ return new SchemaValidationException(
+ SchemaEvolutionErrorCode.OUTDATED_SCHEMA_EVENT, message,
tableIdentifier, jobId);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
new file mode 100644
index 0000000000..864d79af0c
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/exception/SinkWriterSchemaException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.schema.exception;
+
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+
+/**
+ * Exception thrown when sink writer schema operations fail. This includes
schema application
+ * failures, flush failures, and rollback issues.
+ */
+public class SinkWriterSchemaException extends SchemaEvolutionException {
+
+ public SinkWriterSchemaException(SchemaEvolutionErrorCode errorCode,
String errorMessage) {
+ super(errorCode, errorMessage);
+ }
+
+ public SinkWriterSchemaException(
+ SchemaEvolutionErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ }
+
+ public SinkWriterSchemaException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId) {
+ super(errorCode, errorMessage, tableIdentifier, jobId);
+ }
+
+ public SinkWriterSchemaException(
+ SchemaEvolutionErrorCode errorCode,
+ String errorMessage,
+ TableIdentifier tableIdentifier,
+ String jobId,
+ Throwable cause) {
+ super(errorCode, errorMessage, tableIdentifier, jobId, cause);
+ }
+
+ /** Create an exception for schema application failures */
+ public static SinkWriterSchemaException applicationFailed(
+ TableIdentifier tableIdentifier, String jobId, String reason,
Throwable cause) {
+ String message = String.format("Failed to apply schema change: %s",
reason);
+ return new SinkWriterSchemaException(
+ SchemaEvolutionErrorCode.SCHEMA_CHANGE_APPLICATION_FAILED,
+ message,
+ tableIdentifier,
+ jobId,
+ cause);
+ }
+
+ /** Create an exception for flush operation failures */
+ public static SinkWriterSchemaException flushFailed(
+ TableIdentifier tableIdentifier, String jobId, String reason,
Throwable cause) {
+ String message =
+ String.format("Flush operation failed during schema evolution:
%s", reason);
+ return new SinkWriterSchemaException(
+ SchemaEvolutionErrorCode.FLUSH_OPERATION_FAILED,
+ message,
+ tableIdentifier,
+ jobId,
+ cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 948ca620c4..afae87ecf2 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -22,7 +22,10 @@ import
org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
import
org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventDispatcher;
import
org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
@@ -47,6 +51,7 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final AtomicLong rowCounter = new AtomicLong(0);
private final SinkWriter.Context context;
private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
+ private SchemaCoordinator schemaCoordinator;
boolean isPrintData = true;
int delayMs = 0;
@@ -65,14 +70,62 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
@Override
- public void applySchemaChange(SchemaChangeEvent event) {
+ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
log.info("changed rowType before: {}", fieldsInfo(seaTunnelRowType));
- seaTunnelRowType =
dataTypeChangeEventHandler.reset(seaTunnelRowType).apply(event);
- log.info("changed rowType after: {}", fieldsInfo(seaTunnelRowType));
+ try {
+ seaTunnelRowType =
dataTypeChangeEventHandler.reset(seaTunnelRowType).apply(event);
+ log.info("changed rowType after: {}",
fieldsInfo(seaTunnelRowType));
+ } catch (Exception e) {
+ log.error(
+ "ConsoleSinkWriter failed to apply schema change for
table: {}",
+ event.tableIdentifier(),
+ e);
+ throw SinkWriterSchemaException.applicationFailed(
+ event.tableIdentifier(),
+ event.getJobId(),
+ "Console sink writer schema change application failed",
+ e);
+ }
+ }
+
+ @Override
+ public void handleFlushEvent(FlushEvent event) throws IOException {
+ log.info("ConsoleSinkWriter handling FlushEvent for table: {}",
event.tableIdentifier());
+ try {
+ flushData();
+ log.info("ConsoleSinkWriter flush completed for table: {}",
event.tableIdentifier());
+ sendFlushSuccessful(event);
+ } catch (Exception e) {
+ log.error("ConsoleSinkWriter flush failed for table: {}",
event.tableIdentifier(), e);
+ throw SinkWriterSchemaException.flushFailed(
+ event.tableIdentifier(), event.getJobId(), "Console flush
operation failed", e);
+ }
+ }
+
+ @Override
+ public void sendFlushSuccessful(FlushEvent event) throws IOException {
+ log.info(
+ "ConsoleSinkWriter reporting flush success for table: {}",
event.tableIdentifier());
+ SupportSchemaEvolutionSinkWriter.super.sendFlushSuccessful(event);
+ }
+
+ @Override
+ public SchemaCoordinator getSchemaCoordinator() {
+ return schemaCoordinator;
}
@Override
public void write(SeaTunnelRow element) {
+ if (element != null && element.getOptions() != null) {
+ if (element.getOptions().containsKey("flush_event")
+ ||
element.getOptions().containsKey("schema_change_event")) {
+ log.info(
+ "ConsoleSinkWriter skipping schema event row: {}",
+ element.getOptions().keySet());
+ return;
+ }
+ }
+
String[] arr = new String[seaTunnelRowType.getTotalFields()];
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
Object[] fields = element.getFields();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index d3f8862bc4..c97941f31e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -131,12 +132,53 @@ public class JdbcExactlyOnceSinkWriter extends
AbstractJdbcSinkWriter<Void> {
@Override
public void write(SeaTunnelRow element) {
+ if (element != null && element.getOptions() != null) {
+ if (element.getOptions().containsKey("flush_event")
+ ||
element.getOptions().containsKey("schema_change_event")) {
+ LOG.debug("Skipping schema change event row: {}",
element.getOptions().keySet());
+ return;
+ }
+ }
+
tryOpen();
checkState(currentXid != null, "current xid must not be null");
SeaTunnelRow copy = SerializationUtils.clone(element);
outputFormat.writeRecord(copy);
}
+ @Override
+ public void flushData() throws IOException {
+ tryOpen();
+ outputFormat.checkFlushException();
+ outputFormat.flush();
+ }
+
+ @Override
+ public void
handleFlushEvent(org.apache.seatunnel.api.table.schema.event.FlushEvent event)
+ throws IOException {
+ LOG.info(
+ "JdbcExactlyOnceSinkWriter handling FlushEvent for table: {}",
+ event.tableIdentifier());
+ try {
+ tryOpen();
+ flushData();
+ LOG.info(
+ "JdbcExactlyOnceSinkWriter flush completed for table: {}",
+ event.tableIdentifier());
+ sendFlushSuccessful(event);
+ } catch (Exception e) {
+ LOG.error(
+ "JdbcExactlyOnceSinkWriter flush failed for table: {}",
+ event.tableIdentifier(),
+ e);
+ throw SinkWriterSchemaException.flushFailed(
+ event.tableIdentifier(),
+ event.getJobId(),
+ "Exactly-once JDBC flush operation failed",
+ e);
+ }
+ }
+
@Override
public Optional<XidInfo> prepareCommit() throws IOException {
tryOpen();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 3137f427e8..d367acb006 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -22,6 +22,9 @@ import
org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -44,6 +47,7 @@ import java.util.Optional;
@Slf4j
public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager> {
private final Integer primaryKeyIndex;
+ private SchemaCoordinator schemaCoordinator;
public JdbcSinkWriter(
TablePath sinkTablePath,
@@ -136,10 +140,44 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (element != null && element.getOptions() != null) {
+ if (element.getOptions().containsKey("flush_event")
+ ||
element.getOptions().containsKey("schema_change_event")) {
+ log.debug("Skipping schema change event row: {}",
element.getOptions().keySet());
+ return;
+ }
+ }
+
tryOpen();
outputFormat.writeRecord(element);
}
+ @Override
+ public void handleFlushEvent(FlushEvent event) throws IOException {
+ log.info("JdbcSinkWriter handling FlushEvent for table: {}",
event.tableIdentifier());
+ try {
+ flushData();
+ log.info("JdbcSinkWriter flush completed for table: {}",
event.tableIdentifier());
+ sendFlushSuccessful(event);
+ } catch (Exception e) {
+ log.error("JdbcSinkWriter flush failed for table: {}",
event.tableIdentifier(), e);
+ throw SinkWriterSchemaException.flushFailed(
+ event.tableIdentifier(), event.getJobId(), "JDBC flush
operation failed", e);
+ }
+ }
+
+ @Override
+ public void flushData() throws IOException {
+ tryOpen();
+ outputFormat.checkFlushException();
+ outputFormat.flush();
+ }
+
+ @Override
+ public SchemaCoordinator getSchemaCoordinator() {
+ return schemaCoordinator;
+ }
+
@Override
public Optional<XidInfo> prepareCommit() throws IOException {
tryOpen();
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
new file mode 100644
index 0000000000..0302a98957
--- /dev/null
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SchemaOperator.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.execution;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
+import org.apache.seatunnel.api.table.coordinator.SchemaResponse;
+import org.apache.seatunnel.api.table.schema.SchemaChangeType;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.schema.event.TableEvent;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaCoordinationException;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaValidationException;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** operators added to the source and transformer pipelines to handle schema
evolution */
+@Slf4j
+public class SchemaOperator extends AbstractStreamOperator<SeaTunnelRow>
+ implements OneInputStreamOperator<SeaTunnelRow, SeaTunnelRow> {
+ private final Map<TableIdentifier, CatalogTable> localSchemaState;
+ private String jobId;
+ private final SupportSchemaEvolution source;
+ private final Config pluginConfig;
+ private SchemaCoordinator schemaCoordinator;
+ private final AtomicReference<CompletableFuture<SchemaResponse>>
currentSchemaChangeFuture =
+ new AtomicReference<>();
+ private volatile Long lastProcessedEventTime;
+
+ public SchemaOperator(String jobId, SupportSchemaEvolution source, Config
pluginConfig) {
+ this.jobId = jobId;
+ this.source = source;
+ this.pluginConfig = pluginConfig;
+ this.localSchemaState = new ConcurrentHashMap<>();
+ this.schemaCoordinator = SchemaCoordinator.getOrCreateInstance(jobId);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ try {
+ String flinkJobId = getRuntimeContext().getJobId().toString();
+ if (!flinkJobId.equals(this.jobId)) {
+ log.info(
+ "Updating SchemaCoordinator from SeaTunnel jobId {} to
Flink jobId {}",
+ this.jobId,
+ flinkJobId);
+ this.jobId = flinkJobId;
+ this.schemaCoordinator =
SchemaCoordinator.getOrCreateInstance(flinkJobId);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to get Flink jobId, using SeaTunnel jobId: {}",
this.jobId, e);
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<SeaTunnelRow> streamRecord) throws
Exception {
+ SeaTunnelRow element = streamRecord.getValue();
+
+ if (!isSchemaEvolutionEnabled(pluginConfig)) {
+ output.collect(new StreamRecord<>(element,
streamRecord.getTimestamp()));
+ return;
+ }
+
+ if ("__SCHEMA_CHANGE_EVENT__".equals(element.getTableId())
+ && element.getOptions() != null) {
+ Object object = element.getOptions().get("schema_change_event");
+ if (object instanceof SchemaChangeEvent) {
+ handleSchemaChangeEvent((SchemaChangeEvent) object);
+ return;
+ }
+ }
+
+ output.collect(new StreamRecord<>(element,
streamRecord.getTimestamp()));
+ }
+
+ private void handleSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent)
throws Exception {
+ List<SchemaChangeType> supportedTypes = source.supports();
+ if (supportedTypes == null || supportedTypes.isEmpty()) {
+ log.info(
+ "Source: {} does not support any schema change types,
skipping schema change event",
+ source);
+ return;
+ }
+
+ if (!isSchemaChangeSupported(schemaChangeEvent, supportedTypes)) {
+ log.warn(
+ "Schema change type {} not supported by source {},
skipping",
+ schemaChangeEvent.getEventType(),
+ source);
+ return;
+ }
+
+ processSchemaChangeEvent(schemaChangeEvent);
+ }
+
+ private boolean isSchemaEvolutionEnabled(Config pluginConfig) {
+ if (pluginConfig.hasPath("schema-changes.enabled")) {
+ return pluginConfig.getBoolean("schema-changes.enabled");
+ }
+
+ if (pluginConfig.hasPath("debezium")) {
+ Config debeziumConfig = pluginConfig.getConfig("debezium");
+ if (debeziumConfig.hasPath("schema.changes.enabled")) {
+ return debeziumConfig.getBoolean("schema.changes.enabled");
+ }
+ }
+
+ return false;
+ }
+
+ private boolean isSchemaChangeSupported(
+ SchemaChangeEvent event, List<SchemaChangeType> supportedTypes) {
+ switch (event.getEventType()) {
+ case SCHEMA_CHANGE_ADD_COLUMN:
+ return supportedTypes.contains(SchemaChangeType.ADD_COLUMN);
+ case SCHEMA_CHANGE_DROP_COLUMN:
+ return supportedTypes.contains(SchemaChangeType.DROP_COLUMN);
+ case SCHEMA_CHANGE_MODIFY_COLUMN:
+ return supportedTypes.contains(SchemaChangeType.UPDATE_COLUMN);
+ case SCHEMA_CHANGE_CHANGE_COLUMN:
+ return supportedTypes.contains(SchemaChangeType.RENAME_COLUMN);
+ case SCHEMA_CHANGE_UPDATE_COLUMNS:
+ return supportedTypes.contains(SchemaChangeType.ADD_COLUMN)
+ ||
supportedTypes.contains(SchemaChangeType.DROP_COLUMN)
+ ||
supportedTypes.contains(SchemaChangeType.UPDATE_COLUMN)
+ ||
supportedTypes.contains(SchemaChangeType.RENAME_COLUMN);
+ default:
+ log.error("Unknown schema change event type: {}",
event.getEventType());
+ throw SchemaValidationException.unsupportedChangeType(
+ event.tableIdentifier(), jobId);
+ }
+ }
+
+ private void processSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent)
throws Exception {
+ TableIdentifier tableId = schemaChangeEvent.tableIdentifier();
+ long eventTime = schemaChangeEvent.getCreatedTime();
+
+ if (lastProcessedEventTime != null && eventTime <=
lastProcessedEventTime) {
+ throw SchemaValidationException.outdatedEvent(
+ tableId, jobId, eventTime, lastProcessedEventTime);
+ }
+
+ // set the jobId for the schema change event to ensure proper
coordination
+ if (schemaChangeEvent instanceof TableEvent) {
+ schemaChangeEvent.setJobId(jobId);
+ }
+
+ log.info(
+ "Handling schema change event for table: {}, job: {}, event
time: {}",
+ tableId,
+ jobId,
+ eventTime);
+
+ try {
+ CompletableFuture<SchemaResponse> schemaChangeFuture =
+ schemaCoordinator.requestSchemaChange(
+ tableId, jobId,
schemaChangeEvent.getChangeAfter(), 1);
+ currentSchemaChangeFuture.set(schemaChangeFuture);
+ sendFlushEventToDownstream(schemaChangeEvent);
+
+ SchemaResponse response;
+ try {
+ response = schemaChangeFuture.get(60, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ log.error(
+ "Schema change timeout for table: {}, directly failing
the job",
+ tableId,
+ e);
+ throw SchemaCoordinationException.timeout(tableId, jobId, 60,
e);
+ }
+
+ if (!response.isSuccess()) {
+ log.error(
+ "Schema change failed for table: {}, directly failing
the job. Error: {}",
+ tableId,
+ response.getMessage());
+ throw new SchemaCoordinationException(
+
SchemaEvolutionErrorCode.SCHEMA_CHANGE_COORDINATION_FAILED,
+ "Schema change coordination failed: " +
response.getMessage(),
+ tableId,
+ jobId);
+ }
+
+ sendSchemaChangeEventToDownstream(schemaChangeEvent);
+ updateLocalSchemaState(tableId,
schemaChangeEvent.getChangeAfter());
+
+ lastProcessedEventTime = eventTime;
+ log.info("Schema change completed successfully for table: {}",
tableId);
+ } catch (Exception e) {
+ log.error("Schema change failed for table: {}, directly failing
the job", tableId, e);
+ throw e;
+ } finally {
+ currentSchemaChangeFuture.set(null);
+ }
+ }
+
+ private void sendFlushEventToDownstream(SchemaChangeEvent
schemaChangeEvent) {
+ log.info("Send FlushEvent to downstream...");
+ FlushEvent flushEvent = new FlushEvent(schemaChangeEvent);
+
+ SeaTunnelRow flushRow = new SeaTunnelRow(0);
+ Map<String, Object> options = new HashMap<>();
+ options.put("flush_event", flushEvent);
+ flushRow.setOptions(options);
+
+ output.collect(new StreamRecord<>(flushRow));
+ log.info(
+ "FlushEvent sent to downstream for table: {}",
schemaChangeEvent.tableIdentifier());
+ }
+
+ private void sendSchemaChangeEventToDownstream(SchemaChangeEvent
schemaChangeEvent) {
+ log.info("Send SchemaChangeEvent to downstream...");
+
+ SeaTunnelRow schemaChangeRow = new SeaTunnelRow(0);
+ Map<String, Object> options = new HashMap<>();
+ options.put("schema_change_event", schemaChangeEvent);
+ schemaChangeRow.setOptions(options);
+
+ output.collect(new StreamRecord<>(schemaChangeRow));
+ }
+
+ private void updateLocalSchemaState(TableIdentifier tableId, CatalogTable
newSchema) {
+ if (newSchema != null) {
+ localSchemaState.put(tableId, newSchema);
+ log.debug("Updated local schema state for table: {}", tableId);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ CompletableFuture<SchemaResponse> future =
currentSchemaChangeFuture.get();
+ if (future != null && !future.isDone()) {
+ log.info("Cancelling ongoing schema change request during
close");
+ future.cancel(true);
+ }
+
+ if (jobId != null) {
+ SchemaCoordinator.removeInstance(jobId);
+ log.info("Removed SchemaCoordinator instance for job: {}",
jobId);
+ }
+ } catch (Exception e) {
+ log.warn("Error during SchemaOperator cleanup", e);
+ } finally {
+ super.close();
+ }
+ }
+}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index fbf3ea4098..f3ea44cbf0 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -122,12 +123,24 @@ public class SinkExecuteProcessor
DataStreamTableInfo input =
upstreamDataStreams.get(upstreamDataStreams.size() - 1);
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
+
for (int i = 0; i < plugins.size(); i++) {
Optional<? extends Factory> factory = plugins.get(i);
Config sinkConfig = pluginConfigs.get(i);
DataStreamTableInfo stream =
fromSourceTable(sinkConfig,
upstreamDataStreams).orElse(input);
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+
+ // calculate sink parallelism
+ boolean sinkParallelism =
sinkConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
+ boolean envParallelism =
envConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
+ int parallelism =
+ sinkParallelism
+ ?
sinkConfig.getInt(EnvCommonOptions.PARALLELISM.key())
+ : envParallelism
+ ?
envConfig.getInt(EnvCommonOptions.PARALLELISM.key())
+ : 1;
+
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink sink =
FactoryUtil.createAndPrepareSink(
@@ -139,20 +152,20 @@ public class SinkExecuteProcessor
((TableSinkFactory) (factory.orElse(null))));
sink.setJobContext(jobContext);
handleSaveMode(sink);
+
+ // check if sink supports schema evolution
+ if (sink instanceof SupportSchemaEvolutionSink) {
+ parallelism = 1;
+ sinkParallelism = true;
+ }
+
TableIdentifier tableId = catalogTable.getTableId();
sinks.put(tableId.toTablePath(), sink);
}
SeaTunnelSink sink =
tryGenerateMultiTableSink(
sinks, ReadonlyConfig.fromConfig(sinkConfig),
classLoader);
- boolean sinkParallelism =
sinkConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
- boolean envParallelism =
envConfig.hasPath(EnvCommonOptions.PARALLELISM.key());
- int parallelism =
- sinkParallelism
- ?
sinkConfig.getInt(EnvCommonOptions.PARALLELISM.key())
- : envParallelism
- ?
envConfig.getInt(EnvCommonOptions.PARALLELISM.key())
- : 1;
+
DataStreamSink<SeaTunnelRow> dataStreamSink =
stream.getDataStream()
.sinkTo(
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 182592e9ca..219e5741f9 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -38,6 +39,8 @@ import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDisc
import org.apache.seatunnel.translation.flink.source.FlinkSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -89,11 +92,33 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
int parallelism =
pluginConfig.getInt(EnvCommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
- sources.add(
- new DataStreamTableInfo(
- sourceStream,
- sourceTableInfo.getCatalogTables(),
-
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+
+ // add schema evolution functionality to cdc source
+ DataStream<SeaTunnelRow> evolvedStream = null;
+ if (sourceTableInfo.getSource() instanceof SupportSchemaEvolution)
{
+ evolvedStream =
+ sourceStream.transform(
+ "schema-evolution",
+ TypeInformation.of(SeaTunnelRow.class),
+ new SchemaOperator(
+ jobContext.getJobId(),
+ (SupportSchemaEvolution)
sourceTableInfo.getSource(),
+ pluginConfig));
+ }
+
+ if (evolvedStream != null) {
+ sources.add(
+ new DataStreamTableInfo(
+ evolvedStream,
+ sourceTableInfo.getCatalogTables(),
+
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+ } else {
+ sources.add(
+ new DataStreamTableInfo(
+ sourceStream,
+ sourceTableInfo.getCatalogTables(),
+
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
+ }
}
return sources;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
new file mode 100644
index 0000000000..7b960bf13c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithFlinkSchemaChangeIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason =
+ "Currently SPARK do not support cdc. In addition, the Flink
engine only supports configuring single parallelism in the configuration file,
and currently does not support multi-parallelism scenarios.")
+public class MysqlCDCWithFlinkSchemaChangeIT extends TestSuiteBase implements
TestResource {
+ private static final String MYSQL_DATABASE = "shop";
+ private static final String SOURCE_TABLE = "products";
+ private static final String SINK_TABLE =
"mysql_cdc_e2e_sink_table_with_schema_change";
+ private static final String SINK_TABLE2 =
+ "mysql_cdc_e2e_sink_table_with_schema_change_exactly_once";
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+
+ private static final String QUERY = "select * from %s.%s";
+ private static final String DESC = "desc %s.%s";
+ private static final String PROJECTION_QUERY =
+ "select
id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s;";
+
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase shopDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+ }
+
+ private String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @Order(1)
+ @TestTemplate
+ public void testMysqlCdcWithSchemaEvolutionCase(TestContainer container) {
+ // Reset database to initial state to avoid issues from previous test
runs
+ resetDatabaseToInitialState();
+
+ String jobConfigFile =
"/mysqlcdc_to_mysql_with_flink_schema_change.conf";
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(jobConfigFile);
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // waiting for case1 completed
+ assertSchemaEvolutionForAddColumns(MYSQL_DATABASE, SOURCE_TABLE,
SINK_TABLE);
+
+ // case2 drop columns with cdc data at same time
+ shopDatabase.setTemplateName("drop_columns").createAndInitialize();
+
+ // waiting for case2 completed
+ assertTableStructureAndData(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE);
+
+ // case3 change column name with cdc data at same time
+ shopDatabase.setTemplateName("change_columns").createAndInitialize();
+
+ // case4 modify column data type with cdc data at same time
+ shopDatabase.setTemplateName("modify_columns").createAndInitialize();
+
+ // waiting for case3/case4 completed
+ assertTableStructureAndData(MYSQL_DATABASE, SOURCE_TABLE, SINK_TABLE);
+ }
+
+ private void assertSchemaEvolutionForAddColumns(
+ String database, String sourceTable, String sinkTable) {
+ await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertIterableEquals(
+ query(String.format(QUERY, database,
sourceTable)),
+ query(String.format(QUERY, database,
sinkTable))));
+
+ // case1 add columns with cdc data at same time
+ shopDatabase.setTemplateName("add_columns").createAndInitialize();
+ await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertIterableEquals(
+ query(String.format(DESC, database,
sourceTable)),
+ query(String.format(DESC, database,
sinkTable))));
+ await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(
+ String.format(QUERY, database,
sourceTable)
+ + " where id >= 128"),
+ query(
+ String.format(QUERY, database,
sinkTable)
+ + " where id >= 128"));
+
+ Assertions.assertIterableEquals(
+ query(String.format(PROJECTION_QUERY,
database, sourceTable)),
+ query(String.format(PROJECTION_QUERY,
database, sinkTable)));
+
+ // The default value of add_column4 is
current_timestamp(),so the
+ // history data of sink table with this column may
be different from the
+ // source table because delay of apply schema
change.
+ String query =
+ String.format(
+ "SELECT t1.id AS table1_id,
t1.add_column4 AS table1_timestamp, "
+ + "t2.id AS table2_id,
t2.add_column4 AS table2_timestamp, "
+ +
"ABS(TIMESTAMPDIFF(SECOND, t1.add_column4, t2.add_column4)) AS time_diff "
+ + "FROM %s.%s t1 "
+ + "INNER JOIN %s.%s t2 ON
t1.id = t2.id",
+ database, sourceTable, database,
sinkTable);
+ try (Connection jdbcConnection =
getJdbcConnection();
+ Statement statement =
jdbcConnection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(query); ) {
+ while (resultSet.next()) {
+ int timeDiff =
resultSet.getInt("time_diff");
+ Assertions.assertTrue(
+ timeDiff <= 3,
+ "Time difference exceeds 3
seconds: "
+ + timeDiff
+ + " seconds");
+ }
+ }
+ });
+ }
+
+ private void assertTableStructureAndData(
+ String database, String sourceTable, String sinkTable) {
+ await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertIterableEquals(
+ query(String.format(DESC, database,
sourceTable)),
+ query(String.format(DESC, database,
sinkTable))));
+ await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertIterableEquals(
+ query(String.format(QUERY, database,
sourceTable)),
+ query(String.format(QUERY, database,
sinkTable))));
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ shopDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ }
+
+ private void resetDatabaseToInitialState() {
+ try {
+ log.info("Resetting database to initial state...");
+ // Reset to original template and recreate database
+ shopDatabase.setTemplateName(MYSQL_DATABASE).createAndInitialize();
+ log.info("Database reset to initial state completed");
+ } catch (Exception e) {
+ log.error("Failed to reset database to initial state", e);
+ throw new RuntimeException("Failed to reset database to initial
state", e);
+ }
+ }
+
+ private List<List<Object>> query(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ ResultSet resultSet =
connection.createStatement().executeQuery(sql);
+ List<List<Object>> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList<Object> objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getObject(i));
+ }
+ log.debug(String.format("Print MySQL-CDC query, sql: %s, data:
%s", sql, objects));
+ result.add(objects);
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
new file mode 100644
index 0000000000..17c3d05a4f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/initial.sql
@@ -0,0 +1,60 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: $DBNAME$ (schema_evolution_test)
+--
----------------------------------------------------------------------------------------------------------------
+
+-- Create the initial products table for schema evolution testing
+CREATE TABLE products (
+ id INT PRIMARY KEY,
+ name VARCHAR(100) NOT NULL,
+ description TEXT,
+ weight DECIMAL(10,2),
+ price DECIMAL(10,2) NOT NULL
+);
+
+-- Create products_on_hand table for testing
+CREATE TABLE products_on_hand (
+ product_id INTEGER NOT NULL PRIMARY KEY,
+ quantity INTEGER NOT NULL,
+ FOREIGN KEY (product_id) REFERENCES products(id)
+);
+
+-- Insert initial test data
+INSERT INTO products VALUES
+(101, 'scooter', 'Small 2-wheel scooter', 3.14, 15.99),
+(102, 'car battery', '12V car battery', 8.1, 89.99),
+(103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40
to #3', 0.8, 19.99),
+(104, 'hammer', '12oz carpenter''s hammer', 0.75, 12.99),
+(105, 'hammer', '14oz carpenter''s hammer', 0.875, 14.99),
+(106, 'hammer', '16oz carpenter''s hammer', 1.0, 16.99),
+(107, 'rocks', 'box of assorted rocks', 5.3, 9.99),
+(108, 'jacket', 'water resistent black wind breaker', 0.1, 39.99),
+(109, 'spare tire', '24 inch spare tire', 22.2, 99.99);
+
+-- Insert initial inventory data
+INSERT INTO products_on_hand VALUES
+(101, 3),
+(102, 8),
+(103, 18),
+(104, 4),
+(105, 5),
+(106, 0),
+(107, 44),
+(108, 2),
+(109, 5);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
new file mode 100644
index 0000000000..8175e82879
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/rename_columns.sql
@@ -0,0 +1,29 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: $DBNAME$ (schema_evolution_test) - RENAME COLUMNS
+--
----------------------------------------------------------------------------------------------------------------
+
+-- Rename columns in products table
+ALTER TABLE products CHANGE COLUMN description product_description TEXT;
+ALTER TABLE products CHANGE COLUMN weight product_weight DECIMAL(10,2);
+
+-- Insert additional test data to verify rename functionality
+INSERT INTO products VALUES
+(110, 'tablet', 'Android tablet with 10-inch screen', 1.2, 299.99),
+(111, 'keyboard', 'Wireless bluetooth keyboard', 0.8, 59.99);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
new file mode 100644
index 0000000000..9dd595f547
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_sink"
+ password = "mysqlpw"
+ generate_sink_sql = true
+ database = shop
+ table = mysql_cdc_e2e_sink_table_with_schema_change
+ primary_keys = ["id"]
+ }
+}
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
new file mode 100644
index 0000000000..ca7d7b515c
--- /dev/null
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/mysqlcdc_to_mysql_schema_evolution.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652
+ username = mysqluser
+ password = mysqlpw
+ table-names = ["schema_test.evolving_table"]
+ base-url = "jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/schema_test"
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/sink_test"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = mysqluser
+ password = mysqlpw
+ generate_sink_sql = true
+ database = sink_test
+ table = evolving_table
+ primary_keys = ["id"]
+ xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
+
+ # Schema evolution configuration
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
new file mode 100644
index 0000000000..6749389427
--- /dev/null
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/schema_evolution_test.conf
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5653
+ username = mysqluser
+ password = mysqlpw
+ table-names = ["schema_test.evolving_table"]
+ base-url =
"jdbc:mysql://mysql_cdc_schema_evolution_e2e:3306/schema_test?useSSL=false&serverTimezone=UTC"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ Console {
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 3ee6e3533c..b5dd8567e0 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -23,7 +23,12 @@ import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SupportResourceShare;
+import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
+import org.apache.seatunnel.api.table.schema.event.FlushEvent;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import
org.apache.seatunnel.api.table.schema.exception.SchemaEvolutionErrorCode;
+import
org.apache.seatunnel.api.table.schema.exception.SinkWriterSchemaException;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.flink.api.connector.sink.Sink;
@@ -35,6 +40,7 @@ import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -89,6 +95,60 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
return;
}
if (element instanceof SeaTunnelRow) {
+ SeaTunnelRow seaTunnelRow = (SeaTunnelRow) element;
+ Map<String, Object> options = seaTunnelRow.getOptions();
+
+ if (options != null && options.containsKey("flush_event")) {
+ FlushEvent flushEvent = (FlushEvent)
options.get("flush_event");
+ log.info(
+ "FlinkSinkWriter detected FlushEvent for table: {}",
+ flushEvent.tableIdentifier());
+
+ if (sinkWriter instanceof SupportSchemaEvolutionSinkWriter) {
+ try {
+ ((SupportSchemaEvolutionSinkWriter) sinkWriter)
+ .handleFlushEvent(flushEvent);
+ log.info(
+ "FlinkSinkWriter handled FlushEvent for table:
{}",
+ flushEvent.tableIdentifier());
+ } catch (Exception e) {
+ log.error("Failed to handle flush event", e);
+ throw new SinkWriterSchemaException(
+
SchemaEvolutionErrorCode.FLUSH_EVENT_PROCESSING_FAILED,
+ "Failed to handle flush event in Flink sink
writer",
+ flushEvent.tableIdentifier(),
+ flushEvent.getJobId(),
+ e);
+ }
+ }
+ }
+
+ if (options != null && options.containsKey("schema_change_event"))
{
+ SchemaChangeEvent schemaChangeEvent =
+ (SchemaChangeEvent) options.get("schema_change_event");
+ log.info(
+ "FlinkSinkWriter detected SchemaChangeEvent for table:
{}",
+ schemaChangeEvent.tableIdentifier());
+
+ if (sinkWriter instanceof SupportSchemaEvolutionSinkWriter) {
+ try {
+ ((SupportSchemaEvolutionSinkWriter) sinkWriter)
+ .applySchemaChange(schemaChangeEvent);
+ log.info(
+ "FlinkSinkWriter applied SchemaChangeEvent for
table: {}",
+ schemaChangeEvent.tableIdentifier());
+ } catch (Exception e) {
+ log.error("Failed to apply schema change", e);
+ throw new SinkWriterSchemaException(
+
SchemaEvolutionErrorCode.SCHEMA_EVENT_PROCESSING_FAILED,
+ "Failed to apply schema change in Flink sink
writer",
+ schemaChangeEvent.tableIdentifier(),
+ schemaChangeEvent.getJobId(),
+ e);
+ }
+ }
+ }
+
sinkWriter.write((SeaTunnelRow) element);
sinkWriteCount.inc();
sinkWriteBytes.inc(((SeaTunnelRow) element).getBytesSize());
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index c7eadfcfb7..355a0f7b2a 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricNames;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
@@ -32,6 +33,8 @@ import org.apache.flink.api.connector.source.ReaderOutput;
import lombok.extern.slf4j.Slf4j;
+import java.util.HashMap;
+
/** The implementation of {@link Collector} for flink engine. */
@Slf4j
public class FlinkRowCollector implements Collector<SeaTunnelRow> {
@@ -69,6 +72,16 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
}
}
+ @Override
+ public void collect(SchemaChangeEvent event) {
+ SeaTunnelRow eventRow = new SeaTunnelRow(0);
+ eventRow.setTableId("__SCHEMA_CHANGE_EVENT__");
+ HashMap<String, Object> options = new HashMap<>();
+ options.put("schema_change_event", event);
+ eventRow.setOptions(options);
+ readerOutput.collect(eventRow);
+ }
+
@Override
public Object getCheckpointLock() {
return this;