This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 497adc82612 branch-4.1: [Improve](StreamingJob)  Support schema change 
for PostgreSQL streaming job #61182 (#61453)
497adc82612 is described below

commit 497adc826122b0848ca6efecd21deb2c26631acd
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 19 18:41:30 2026 +0800

    branch-4.1: [Improve](StreamingJob)  Support schema change for PostgreSQL 
streaming job #61182 (#61453)
    
    Cherry-picked from #61182
    
    Co-authored-by: wudi <[email protected]>
---
 .../doris/job/cdc/request/CommitOffsetRequest.java |  17 +-
 .../job/cdc/request/JobBaseRecordRequest.java      |   1 +
 .../insert/streaming/StreamingInsertJob.java       |   4 +
 .../insert/streaming/StreamingMultiTblTask.java    |   6 +
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |   4 +
 .../apache/doris/cdcclient/common/Constants.java   |   2 +
 .../apache/doris/cdcclient/common/DorisType.java   |  47 +++
 .../cdcclient/service/PipelineCoordinator.java     |  61 +++-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java |  25 +-
 .../doris/cdcclient/sink/HttpPutBuilder.java       |   3 +-
 .../deserialize/DebeziumJsonDeserializer.java      |  18 +-
 .../source/deserialize/DeserializeResult.java      |  92 ++++++
 .../deserialize/MySqlDebeziumJsonDeserializer.java |  66 ++++
 .../PostgresDebeziumJsonDeserializer.java          | 248 +++++++++++++++
 .../deserialize/SourceRecordDeserializer.java      |   5 +
 .../source/reader/AbstractCdcSourceReader.java     | 171 ++++++++++
 .../source/reader/JdbcIncrementalSourceReader.java |  24 +-
 .../cdcclient/source/reader/SourceReader.java      |  18 +-
 .../source/reader/mysql/MySqlSourceReader.java     |  21 +-
 .../reader/postgres/PostgresSourceReader.java      |  32 ++
 .../org/apache/doris/cdcclient/utils/HttpUtil.java |   4 +
 .../doris/cdcclient/utils/SchemaChangeHelper.java  | 291 +++++++++++++++++
 .../doris/cdcclient/utils/SchemaChangeManager.java | 149 +++++++++
 .../cdcclient/utils/SchemaChangeHelperTest.java    | 194 ++++++++++++
 .../cdc/test_streaming_postgres_job_sc.out         |  32 ++
 .../test_streaming_postgres_job_sc_advanced.out    |  31 ++
 .../cdc/test_streaming_postgres_job_sc.groovy      | 269 ++++++++++++++++
 .../test_streaming_postgres_job_sc_advanced.groovy | 344 +++++++++++++++++++++
 28 files changed, 2132 insertions(+), 47 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
index 3d2d221ea49..747e82b4fee 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
@@ -22,12 +22,10 @@ import lombok.Builder;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
-import lombok.ToString;
 
 @Getter
 @Setter
 @NoArgsConstructor
-@ToString
 @AllArgsConstructor
 @Builder
 public class CommitOffsetRequest {
@@ -38,4 +36,19 @@ public class CommitOffsetRequest {
     public long filteredRows;
     public long loadedRows;
     public long loadBytes;
+    public String tableSchemas;
+
+    @Override
+    public String toString() {
+        return "CommitOffsetRequest{"
+                + "jobId=" + jobId
+                + ", taskId=" + taskId
+                + ", offset='" + offset + "'"
+                + ", scannedRows=" + scannedRows
+                + ", filteredRows=" + filteredRows
+                + ", loadedRows=" + loadedRows
+                + ", loadBytes=" + loadBytes
+                + ", tableSchemasSize=" + (tableSchemas != null ? 
tableSchemas.length() : 0)
+                + "}";
+    }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
index 282913e2dd2..27784b1701b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
@@ -28,4 +28,5 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 public abstract class JobBaseRecordRequest extends JobBaseConfig {
     protected Map<String, Object> meta;
+    protected String tableSchemas;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f2b07e02ab1..ea5298c026a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1177,6 +1177,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                     JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
                     op.setHasMoreData(false);
                 }
+                if (offsetRequest.getTableSchemas() != null) {
+                    JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
+                    op.setTableSchemas(offsetRequest.getTableSchemas());
+                }
                 persistOffsetProviderIfNeed();
                 log.info("Streaming multi table job {} task {} commit offset 
successfully, offset: {}",
                         getJobId(), offsetRequest.getTaskId(), 
offsetRequest.getOffset());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index cf9a9b905be..45d2cf2ffbb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -198,6 +198,12 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
         request.setFrontendAddress(feAddr);
         request.setMaxInterval(jobProperties.getMaxIntervalSecond());
+        if (offsetProvider instanceof JdbcSourceOffsetProvider) {
+            String schemas = ((JdbcSourceOffsetProvider) 
offsetProvider).getTableSchemas();
+            if (schemas != null) {
+                request.setTableSchemas(schemas);
+            }
+        }
         return request;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index d04245317b5..d8959086fa5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -87,6 +87,9 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     @SerializedName("bop")
     Map<String, String> binlogOffsetPersist;
 
+    @SerializedName("ts")
+    String tableSchemas;
+
     volatile boolean hasMoreData = true;
 
     public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, 
Map<String, String> sourceProperties) {
@@ -355,6 +358,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                     JdbcSourceOffsetProvider.class);
             this.binlogOffsetPersist = 
replayFromPersist.getBinlogOffsetPersist();
             this.chunkHighWatermarkMap = 
replayFromPersist.getChunkHighWatermarkMap();
+            this.tableSchemas = replayFromPersist.getTableSchemas();
             log.info("Replaying offset provider for job {}, binlogOffset size 
{}, chunkHighWatermark size {}",
                     getJobId(),
                     binlogOffsetPersist == null ? 0 : 
binlogOffsetPersist.size(),
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
index 04ec118a6c2..953903a8032 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
@@ -23,4 +23,6 @@ public class Constants {
 
     // Debezium default properties
     public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
+
+    public static final String DORIS_TARGET_DB = "doris_target_db";
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
new file mode 100644
index 00000000000..3aad97bb0cd
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.common;
+
+public class DorisType {
+    public static final String BOOLEAN = "BOOLEAN";
+    public static final String TINYINT = "TINYINT";
+    public static final String SMALLINT = "SMALLINT";
+    public static final String INT = "INT";
+    public static final String BIGINT = "BIGINT";
+    public static final String LARGEINT = "LARGEINT";
+    // largeint is bigint unsigned in information_schema.COLUMNS
+    public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String FLOAT = "FLOAT";
+    public static final String DOUBLE = "DOUBLE";
+    public static final String DECIMAL = "DECIMAL";
+    public static final String DATE = "DATE";
+    public static final String DATETIME = "DATETIME";
+    public static final String CHAR = "CHAR";
+    public static final String VARCHAR = "VARCHAR";
+    public static final String STRING = "STRING";
+    public static final String HLL = "HLL";
+    public static final String BITMAP = "BITMAP";
+    public static final String ARRAY = "ARRAY";
+    public static final String JSONB = "JSONB";
+    public static final String JSON = "JSON";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
+    public static final String VARIANT = "VARIANT";
+    public static final String IPV4 = "IPV4";
+    public static final String IPV6 = "IPV6";
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 614c506619f..97aa4b7f5f2 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -21,8 +21,10 @@ import org.apache.doris.cdcclient.common.Constants;
 import org.apache.doris.cdcclient.common.Env;
 import org.apache.doris.cdcclient.model.response.RecordWithMeta;
 import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
@@ -166,11 +168,12 @@ public class PipelineCoordinator {
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
+                    DeserializeResult result =
                             sourceReader.deserialize(fetchRecord.getConfig(), 
element);
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                    if (result.getType() == DeserializeResult.Type.DML
+                            && !CollectionUtils.isEmpty(result.getRecords())) {
                         recordCount++;
-                        recordResponse.getRecords().addAll(serializedRecords);
+                        
recordResponse.getRecords().addAll(result.getRecords());
                         hasReceivedData = true;
                         lastMessageIsHeartbeat = false;
                     }
@@ -236,21 +239,34 @@ public class PipelineCoordinator {
      * <p>Heartbeat events will carry the latest offset.
      */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
+        // Extract connection parameters up front for use throughout this 
method
+        String feAddr = writeRecordRequest.getFrontendAddress();
+        String targetDb = writeRecordRequest.getTargetDb();
+        String token = writeRecordRequest.getToken();
+
+        // Enrich the source config with the Doris target DB so the 
deserializer can build
+        // DDL referencing the correct Doris database, not the upstream source 
database.
+        Map<String, String> deserializeContext = new 
HashMap<>(writeRecordRequest.getConfig());
+        deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);
+
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
         int heartbeatCount = 0;
         SplitReadResult readResult = null;
+        boolean hasExecuteDDL = false;
+        boolean isSnapshotSplit = false;
         try {
             // 1. submit split async
             readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
             batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
 
-            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
+            isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
             boolean shouldStop = false;
             boolean lastMessageIsHeartbeat = false;
+
             LOG.info(
                     "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
                     writeRecordRequest.getJobId(),
@@ -309,15 +325,22 @@ public class PipelineCoordinator {
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
-                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
-                        String database = writeRecordRequest.getTargetDb();
+                    DeserializeResult result =
+                            sourceReader.deserialize(deserializeContext, 
element);
+
+                    if (result.getType() == 
DeserializeResult.Type.SCHEMA_CHANGE) {
+                        // Flush pending data before DDL
+                        batchStreamLoad.forceFlush();
+                        SchemaChangeManager.executeDdls(feAddr, targetDb, 
token, result.getDdls());
+                        hasExecuteDDL = true;
+                        
sourceReader.applySchemaChange(result.getUpdatedSchemas());
+                        lastMessageIsHeartbeat = false;
+                    }
+                    if (!CollectionUtils.isEmpty(result.getRecords())) {
                         String table = extractTable(element);
-                        for (String record : serializedRecords) {
+                        for (String record : result.getRecords()) {
                             scannedRows++;
-                            batchStreamLoad.writeRecord(database, table, 
record.getBytes());
+                            batchStreamLoad.writeRecord(targetDb, table, 
record.getBytes());
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
@@ -346,8 +369,22 @@ public class PipelineCoordinator {
         // The offset must be reset before commitOffset to prevent the next 
taskId from being create
         // by the fe.
         batchStreamLoad.resetTaskId();
+
+        // Serialize tableSchemas back to FE when:
+        // 1. A DDL was executed (in-memory schema was updated), OR
+        // 2. It's a binlog split AND FE had no schema (FE tableSchemas was 
null) — this covers
+        //    incremental-only startup and the first binlog round after 
snapshot completes.
+        String tableSchemas = null;
+        boolean feHadNoSchema = writeRecordRequest.getTableSchemas() == null;
+        if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
+            tableSchemas = sourceReader.serializeTableSchemas();
+        }
         batchStreamLoad.commitOffset(
-                currentTaskId, metaResponse, scannedRows, 
batchStreamLoad.getLoadStatistic());
+                currentTaskId,
+                metaResponse,
+                scannedRows,
+                batchStreamLoad.getLoadStatistic(),
+                tableSchemas);
     }
 
     public static boolean isHeartbeatEvent(SourceRecord record) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 92a2f9db2b6..72e84c4413c 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Getter;
 import lombok.Setter;
@@ -503,7 +504,8 @@ public class DorisBatchStreamLoad implements Serializable {
             String taskId,
             List<Map<String, String>> meta,
             long scannedRows,
-            LoadStatistic loadStatistic) {
+            LoadStatistic loadStatistic,
+            String tableSchemas) {
         try {
             String url = String.format(COMMIT_URL_PATTERN, frontendAddress, 
targetDb);
             CommitOffsetRequest commitRequest =
@@ -515,6 +517,7 @@ public class DorisBatchStreamLoad implements Serializable {
                             .filteredRows(loadStatistic.getFilteredRows())
                             .loadedRows(loadStatistic.getLoadedRows())
                             .loadBytes(loadStatistic.getLoadBytes())
+                            .tableSchemas(tableSchemas)
                             .build();
             String param = OBJECT_MAPPER.writeValueAsString(commitRequest);
 
@@ -527,7 +530,11 @@ public class DorisBatchStreamLoad implements Serializable {
                             .commit()
                             .setEntity(new StringEntity(param));
 
-            LOG.info("commit offset for jobId {} taskId {}, params {}", jobId, 
taskId, param);
+            LOG.info(
+                    "commit offset for jobId {} taskId {}, commitRequest {}",
+                    jobId,
+                    taskId,
+                    commitRequest.toString());
             Throwable resEx = null;
             int retry = 0;
             while (retry <= RETRY) {
@@ -541,11 +548,15 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                         : "";
                         LOG.info("commit result {}", responseBody);
                         if (statusCode == 200) {
-                            LOG.info("commit offset for jobId {} taskId {}", 
jobId, taskId);
-                            // A 200 response indicates that the request was 
successful, and
-                            // information such as offset and statistics may 
have already been
-                            // updated. Retrying may result in repeated 
updates.
-                            return;
+                            JsonNode root = 
OBJECT_MAPPER.readTree(responseBody);
+                            JsonNode code = root.get("code");
+                            if (code != null && code.asInt() == 0) {
+                                LOG.info(
+                                        "commit offset for jobId {} taskId {} 
successfully",
+                                        jobId,
+                                        taskId);
+                                return;
+                            }
                         }
                         LOG.error(
                                 "commit offset failed with {}, reason {}, to 
retry",
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
index 3abd9eaabc2..d24f61397a2 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.doris.cdcclient.sink;
 
 import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.HttpUtil;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.MapUtils;
@@ -69,7 +70,7 @@ public class HttpPutBuilder {
     }
 
     public HttpPutBuilder addTokenAuth(String token) {
-        header.put(HttpHeaders.AUTHORIZATION, "Basic YWRtaW46");
+        header.put(HttpHeaders.AUTHORIZATION, HttpUtil.getAuthHeader());
         header.put("token", token);
         return this;
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 556c186b5d4..065a3da2c09 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -38,7 +38,6 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -58,6 +57,8 @@ import io.debezium.data.VariableScaleDecimal;
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.NanoTime;
@@ -65,17 +66,19 @@ import io.debezium.time.NanoTimestamp;
 import io.debezium.time.Time;
 import io.debezium.time.Timestamp;
 import io.debezium.time.ZonedTimestamp;
+import lombok.Getter;
 import lombok.Setter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** SourceRecord ==> [{},{}] */
+/** SourceRecord ==> DeserializeResult */
 public class DebeziumJsonDeserializer
-        implements SourceRecordDeserializer<SourceRecord, List<String>> {
+        implements SourceRecordDeserializer<SourceRecord, DeserializeResult> {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumJsonDeserializer.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
     @Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
+    @Getter @Setter protected Map<TableId, TableChanges.TableChange> 
tableSchemas;
 
     public DebeziumJsonDeserializer() {}
 
@@ -86,15 +89,14 @@ public class DebeziumJsonDeserializer
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> context, SourceRecord 
record)
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
         if (RecordUtils.isDataChangeRecord(record)) {
             LOG.trace("Process data change record: {}", record);
-            return deserializeDataChangeRecord(record);
-        } else if (RecordUtils.isSchemaChangeEvent(record)) {
-            return Collections.emptyList();
+            List<String> rows = deserializeDataChangeRecord(record);
+            return DeserializeResult.dml(rows);
         } else {
-            return Collections.emptyList();
+            return DeserializeResult.empty();
         }
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
new file mode 100644
index 00000000000..c1e69c77e5a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
+/** Result of deserializing a SourceRecord. */
+public class DeserializeResult {
+
+    public enum Type {
+        DML,
+        SCHEMA_CHANGE,
+        EMPTY
+    }
+
+    private final Type type;
+    private final List<String> records;
+    private final List<String> ddls;
+    private final Map<TableId, TableChanges.TableChange> updatedSchemas;
+
+    private DeserializeResult(
+            Type type,
+            List<String> records,
+            List<String> ddls,
+            Map<TableId, TableChanges.TableChange> updatedSchemas) {
+        this.type = type;
+        this.records = records;
+        this.ddls = ddls;
+        this.updatedSchemas = updatedSchemas;
+    }
+
+    public static DeserializeResult dml(List<String> records) {
+        return new DeserializeResult(Type.DML, records, null, null);
+    }
+
+    public static DeserializeResult schemaChange(
+            List<String> ddls, Map<TableId, TableChanges.TableChange> 
updatedSchemas) {
+        return new DeserializeResult(
+                Type.SCHEMA_CHANGE, Collections.emptyList(), ddls, 
updatedSchemas);
+    }
+
+    /**
+     * Schema change result that also carries DML records from the triggering 
record. The
+     * coordinator should execute DDLs first, then write the records.
+     */
+    public static DeserializeResult schemaChange(
+            List<String> ddls,
+            Map<TableId, TableChanges.TableChange> updatedSchemas,
+            List<String> records) {
+        return new DeserializeResult(Type.SCHEMA_CHANGE, records, ddls, 
updatedSchemas);
+    }
+
+    public static DeserializeResult empty() {
+        return new DeserializeResult(Type.EMPTY, Collections.emptyList(), 
null, null);
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public List<String> getRecords() {
+        return records;
+    }
+
+    public List<String> getDdls() {
+        return ddls;
+    }
+
+    public Map<TableId, TableChanges.TableChange> getUpdatedSchemas() {
+        return updatedSchemas;
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..b64c7186983
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MySQL-specific deserializer that handles DDL schema change events.
+ *
+ * <p>When a schema change event is detected, it parses the HistoryRecord, 
computes the diff against
+ * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a 
SCHEMA_CHANGE result.
+ */
+public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class);
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private String targetDb;
+
+    @Override
+    public void init(Map<String, String> props) {
+        super.init(props);
+        this.targetDb = props.get(DataSourceConfigKeys.DATABASE);
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (RecordUtils.isSchemaChangeEvent(record)) {
+            return handleSchemaChangeEvent(record, context);
+        }
+        return super.deserialize(context, record);
+    }
+
+    private DeserializeResult handleSchemaChangeEvent(
+            SourceRecord record, Map<String, String> context) {
+        // todo: record has mysql ddl, need to convert doris ddl
+        return DeserializeResult.empty();
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
new file mode 100644
index 00000000000..2dc2310054b
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.cdcclient.common.Constants;
+import org.apache.doris.cdcclient.utils.SchemaChangeHelper;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP 
column only) by comparing
+ * the record's Kafka Connect schema field names with stored tableSchemas.
+ *
+ * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema 
detection is done by
+ * comparing the "after" struct field names in each DML record against the 
known column set.
+ *
+ * <p>Type comparison is intentionally skipped to avoid false positives caused 
by Kafka Connect type
+ * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column 
add or drop is
+ * detected, the accurate column types are fetched directly from PostgreSQL 
via the injected {@link
+ * #pgSchemaRefresher} callback.
+ *
+ * <p>MODIFY column type is not supported — users must manually execute ALTER 
TABLE ... MODIFY
+ * COLUMN in Doris when a PG column type changes.
+ */
+public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer 
{
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
+
+    /**
+     * Callback to fetch the current PG table schema for a single table via 
JDBC. Injected by {@link
+     * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} 
after initialization.
+     */
+    private transient Function<TableId, TableChanges.TableChange> 
pgSchemaRefresher;
+
+    public void setPgSchemaRefresher(Function<TableId, 
TableChanges.TableChange> refresher) {
+        this.pgSchemaRefresher = refresher;
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (!RecordUtils.isDataChangeRecord(record)) {
+            return DeserializeResult.empty();
+        }
+
+        Schema valueSchema = record.valueSchema();
+        if (valueSchema == null) {
+            return super.deserialize(context, record);
+        }
+
+        Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
+        if (afterField == null) {
+            return super.deserialize(context, record);
+        }
+
+        Schema afterSchema = afterField.schema();
+        TableId tableId = extractTableId(record);
+        TableChanges.TableChange stored = tableSchemas != null ? 
tableSchemas.get(tableId) : null;
+
+        // No baseline schema available — cannot detect changes, fall through 
to normal
+        // deserialization
+        if (stored == null || stored.getTable() == null) {
+            LOG.debug(
+                    "No stored schema for table {}, skipping schema change 
detection.",
+                    tableId.identifier());
+            return super.deserialize(context, record);
+        }
+
+        // First pass: name-only diff — fast, in-memory, no type comparison, 
no false positives
+        SchemaChangeHelper.SchemaDiff nameDiff =
+                SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
+        if (nameDiff.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        Preconditions.checkNotNull(
+                pgSchemaRefresher,
+                "pgSchemaRefresher callback is not set. Cannot fetch fresh PG 
schema for change detection.");
+
+        // the last fresh schema
+        TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
+        if (fresh == null || fresh.getTable() == null) {
+            // Cannot proceed: DDL must be executed before the triggering DML 
record is written,
+            // otherwise new column data in this record would be silently 
dropped.
+            // Throwing here causes the batch to be retried from the same 
offset.
+            throw new IOException(
+                    "Failed to fetch fresh schema for table "
+                            + tableId.identifier()
+                            + "; cannot apply schema change safely. Will 
retry.");
+        }
+
+        // Second diff: use afterSchema as the source of truth for which 
columns the current WAL
+        // record is aware of. Only process additions/drops visible in 
afterSchema — columns that
+        // exist in fresh (JDBC) but are absent from afterSchema belong to a 
later DDL that has not
+        // yet produced a DML record, and will be processed when that DML 
record arrives.
+        //
+        // pgAdded: present in afterSchema but absent from stored → look up 
Column in fresh for
+        //          accurate PG type metadata. If fresh doesn't have the 
column yet (shouldn't
+        //          happen normally), skip it.
+        // pgDropped: present in stored but absent from afterSchema.
+        List<Column> pgAdded = new ArrayList<>();
+        List<String> pgDropped = new ArrayList<>();
+
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                Column freshCol = 
fresh.getTable().columnWithName(field.name());
+                if (freshCol != null) {
+                    pgAdded.add(freshCol);
+                }
+            }
+        }
+
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                pgDropped.add(col.name());
+            }
+        }
+
+        // Second diff is empty: nameDiff was a false positive (PG schema 
unchanged vs stored).
+        // This happens when pgSchemaRefresher returns a schema ahead of the 
current WAL position
+        // (e.g. a later DDL was already applied in PG while we're still 
consuming older DML
+        // records).
+        // No DDL needed, no tableSchema update, no extra stream load — just 
process the DML
+        // normally.
+        if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
+            return super.deserialize(context, record);
+        }
+
+        // Build updatedSchemas from fresh filtered to afterSchema columns 
only, so that the stored
+        // cache does not jump ahead to include columns not yet seen by any 
DML record. Those
+        // unseen columns will trigger their own schema change when their 
first DML record arrives.
+        TableEditor editor = fresh.getTable().edit();
+        for (Column col : fresh.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                editor.removeColumn(col.name());
+            }
+        }
+        TableChanges.TableChange filteredChange =
+                new 
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
+        Map<TableId, TableChanges.TableChange> updatedSchemas = new 
HashMap<>();
+        updatedSchemas.put(tableId, filteredChange);
+
+        // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip 
DDL to avoid data loss.
+        // Users must manually RENAME the column in Doris.
+        if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
+            LOG.warn(
+                    "[SCHEMA-CHANGE-SKIPPED] Table: {}\n"
+                            + "Potential RENAME detected (simultaneous 
DROP+ADD).\n"
+                            + "Dropped columns: {}\n"
+                            + "Added columns:   {}\n"
+                            + "No DDL emitted to prevent data loss.\n"
+                            + "Action required: manually RENAME column(s) in 
Doris,"
+                            + " then data will resume.",
+                    tableId.identifier(),
+                    pgDropped,
+                    
pgAdded.stream().map(Column::name).collect(Collectors.toList()));
+            List<String> dmlRecords = super.deserialize(context, 
record).getRecords();
+            return DeserializeResult.schemaChange(
+                    Collections.emptyList(), updatedSchemas, dmlRecords);
+        }
+
+        // Generate DDLs using accurate PG column types
+        String db = context.get(Constants.DORIS_TARGET_DB);
+        List<String> ddls = new ArrayList<>();
+
+        for (String colName : pgDropped) {
+            ddls.add(SchemaChangeHelper.buildDropColumnSql(db, 
tableId.table(), colName));
+        }
+
+        for (Column col : pgAdded) {
+            String colType = SchemaChangeHelper.columnToDorisType(col);
+            String nullable = col.isOptional() ? "" : " NOT NULL";
+            // pgAdded only contains columns present in afterSchema, so field 
lookup is safe.
+            // afterSchema.defaultValue() returns an already-deserialized Java 
object
+            // (e.g. String "hello", Integer 42) — no PG SQL cast suffix to 
strip.
+            // PG WAL DML records do not carry column comment metadata.
+            Object defaultObj = 
afterSchema.field(col.name()).schema().defaultValue();
+            ddls.add(
+                    SchemaChangeHelper.buildAddColumnSql(
+                            db,
+                            tableId.table(),
+                            col.name(),
+                            colType + nullable,
+                            defaultObj != null ? String.valueOf(defaultObj) : 
null,
+                            null));
+        }
+
+        List<String> dmlRecords = super.deserialize(context, 
record).getRecords();
+
+        LOG.info(
+                "Postgres schema change detected for table {}: added={}, 
dropped={}. DDLs: {}",
+                tableId.identifier(),
+                
pgAdded.stream().map(Column::name).collect(Collectors.toList()),
+                pgDropped,
+                ddls);
+
+        return DeserializeResult.schemaChange(ddls, updatedSchemas, 
dmlRecords);
+    }
+
+    private TableId extractTableId(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        String schemaName = source.getString(SCHEMA_NAME_KEY);
+        String tableName = source.getString(TABLE_NAME_KEY);
+        return new TableId(null, schemaName, tableName);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
index f93567a230a..cc0a519da07 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java
@@ -21,8 +21,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
 public interface SourceRecordDeserializer<T, C> extends Serializable {
     void init(Map<String, String> props);
 
     C deserialize(Map<String, String> context, T record) throws IOException;
+
+    default void setTableSchemas(Map<TableId, TableChanges.TableChange> 
tableSchemas) {}
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
new file mode 100644
index 00000000000..6ebf75a99aa
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
+
+import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.debezium.document.Document;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class providing common schema-tracking functionality for CDC 
source readers.
+ *
+ * <p>Handles serialization/deserialization of {@code tableSchemas} between FE 
and cdc_client, and
+ * provides a helper to load schemas from the incoming {@link 
JobBaseRecordRequest}.
+ */
+@Getter
+@Setter
+public abstract class AbstractCdcSourceReader implements SourceReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCdcSourceReader.class);
+
+    protected static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
+    protected Map<TableId, TableChanges.TableChange> tableSchemas;
+
+    /**
+     * Load tableSchemas from a JSON string (produced by {@link 
#serializeTableSchemas()}). Used
+     * when a binlog/stream split is resumed from FE-persisted state.
+     *
+     * <p>Format: {@code 
[{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    public void loadTableSchemasFromJson(String json) throws IOException {
+        if (json == null || json.isEmpty()) {
+            return;
+        }
+        JsonNode root = OBJECT_MAPPER.readTree(json);
+        Map<TableId, TableChanges.TableChange> map = new ConcurrentHashMap<>();
+        DocumentReader docReader = DocumentReader.defaultReader();
+        for (JsonNode entry : root) {
+            boolean uc = entry.path("uc").asBoolean(false);
+            TableId tableId = TableId.parse(entry.get("i").asText(), uc);
+            Document doc = 
docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c")));
+            TableChanges.TableChange change = 
FlinkJsonTableChangeSerializer.fromDocument(doc, uc);
+            map.put(tableId, change);
+        }
+        this.tableSchemas = map;
+        this.serializer.setTableSchemas(map);
+        LOG.info("Loaded {} table schemas from JSON", map.size());
+    }
+
+    /**
+     * Serialize current tableSchemas to a compact JSON string for FE 
persistence.
+     *
+     * <p>Stores the Debezium document as a nested JSON object (not a string) 
to avoid redundant
+     * escaping. Format: {@code
+     * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}.
+     */
+    @Override
+    public String serializeTableSchemas() {
+        if (tableSchemas == null || tableSchemas.isEmpty()) {
+            return null;
+        }
+        try {
+            DocumentWriter docWriter = DocumentWriter.defaultWriter();
+            ArrayNode result = OBJECT_MAPPER.createArrayNode();
+            for (Map.Entry<TableId, TableChanges.TableChange> e : 
tableSchemas.entrySet()) {
+                TableId tableId = e.getKey();
+                // useCatalogBeforeSchema: false when catalog is null but 
schema is set (e.g. PG)
+                boolean uc = 
SerializerUtils.shouldUseCatalogBeforeSchema(tableId);
+                ObjectNode entry = OBJECT_MAPPER.createObjectNode();
+                entry.put("i", tableId.toDoubleQuotedString());
+                entry.put("uc", uc);
+                // parse compact doc JSON into a JsonNode so "c" is a nested 
object, not a string
+                entry.set(
+                        "c",
+                        OBJECT_MAPPER.readTree(
+                                
docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue()))));
+                result.add(entry);
+            }
+            return OBJECT_MAPPER.writeValueAsString(result);
+        } catch (Exception e) {
+            // Return null so the current batch is not failed — data keeps 
flowing and
+            // schema persistence will be retried on the next DDL or 
feHadNoSchema batch.
+            // For PostgreSQL this is safe: WAL records carry afterSchema so 
the next DML
+            // will re-trigger schema-change detection and self-heal.
+            // WARNING: for MySQL (schema change not yet implemented), 
returning null here
+            // is dangerous — MySQL binlog has no inline schema, so loading a 
stale
+            // pre-DDL schema from FE on the next task would cause column 
mismatches
+            // (flink-cdc#732). When MySQL schema change is implemented, this 
must throw
+            // instead of returning null to prevent committing the offset with 
a stale schema.
+            LOG.error(
+                    "Failed to serialize tableSchemas, schema will not be 
persisted to FE"
+                            + " in this cycle. Will retry on next DDL or 
batch.",
+                    e);
+            return null;
+        }
+    }
+
+    /** Apply schema changes to in-memory tableSchemas and notify the 
serializer. */
+    @Override
+    public void applySchemaChange(Map<TableId, TableChanges.TableChange> 
updatedSchemas) {
+        if (updatedSchemas == null || updatedSchemas.isEmpty()) {
+            return;
+        }
+        if (tableSchemas == null) {
+            tableSchemas = new ConcurrentHashMap<>(updatedSchemas);
+        } else {
+            tableSchemas.putAll(updatedSchemas);
+        }
+        serializer.setTableSchemas(tableSchemas);
+    }
+
+    /**
+     * Load FE-persisted tableSchemas into memory from the incoming request.
+     *
+     * <p>FE's schema and offset are always committed together, so FE's schema 
always corresponds to
+     * the starting offset of the current batch. Loading it unconditionally 
ensures the deserializer
+     * uses the correct baseline — particularly critical for MySQL: Flink CDC 
only retains the
+     * latest schema in memory, so if a previous batch executed a DDL but 
failed to commit the
+     * offset, retrying from the pre-DDL offset with a stale post-DDL cache 
would cause
+     * schema-mismatch errors on every retry (see flink-cdc#732). PostgreSQL 
is unaffected by this
+     * because WAL records carry the schema at the time they were written, but 
loading FE's schema
+     * unconditionally is still correct: any re-detected DDL will be handled 
idempotently by {@link
+     * SchemaChangeManager}.
+     *
+     * <p>Call this at the start of preparing a binlog/stream split.
+     */
+    protected void tryLoadTableSchemasFromRequest(JobBaseRecordRequest 
baseReq) throws IOException {
+        loadTableSchemasFromJson(baseReq.getTableSchemas());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 36111d0fbf4..5b8e343faae 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -18,7 +18,7 @@
 package org.apache.doris.cdcclient.source.reader;
 
 import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -83,11 +83,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public abstract class JdbcIncrementalSourceReader implements SourceReader {
+public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
-    private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
-    private Map<TableId, TableChanges.TableChange> tableSchemas;
 
     // Support for multiple snapshot splits
     private List<
@@ -334,6 +332,22 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
     /** Prepare stream split */
     private SplitReadResult prepareStreamSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
+        // Load tableSchemas from FE if available (avoids re-discover on 
restart)
+        tryLoadTableSchemasFromRequest(baseReq);
+        // If still null (incremental-only startup, or snapshot→binlog 
transition where FE never
+        // persisted schema), do a JDBC discover so the deserializer has a 
baseline to diff against.
+        if (this.tableSchemas == null) {
+            LOG.info(
+                    "No tableSchemas available for stream split, discovering 
via JDBC for job {}",
+                    baseReq.getJobId());
+            Map<TableId, TableChanges.TableChange> discovered = 
getTableSchemas(baseReq);
+            this.tableSchemas = new 
java.util.concurrent.ConcurrentHashMap<>(discovered);
+            this.serializer.setTableSchemas(this.tableSchemas);
+            LOG.info(
+                    "Discovered {} table schema(s) for job {}",
+                    discovered.size(),
+                    baseReq.getJobId());
+        }
         Tuple2<SourceSplitBase, Boolean> splitFlag = 
createStreamSplit(offsetMeta, baseReq);
         this.streamSplit = splitFlag.f0.asStreamSplit();
         this.streamReader = getBinlogSplitReader(baseReq);
@@ -908,7 +922,7 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> config, SourceRecord 
element)
+    public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
         return serializer.deserialize(config, element);
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index 6c1a018dde3..fa4578d509b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.cdcclient.source.reader;
 
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -32,6 +33,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+
 /** Source Reader Interface */
 public interface SourceReader {
     String SPLIT_ID = "splitId";
@@ -75,7 +79,19 @@ public interface SourceReader {
     /** Called when closing */
     void close(JobBaseConfig jobConfig);
 
-    List<String> deserialize(Map<String, String> config, SourceRecord element) 
throws IOException;
+    DeserializeResult deserialize(Map<String, String> config, SourceRecord 
element)
+            throws IOException;
+
+    /**
+     * Apply schema changes to the in-memory tableSchemas. Called after schema 
change is executed on
+     * Doris.
+     */
+    default void applySchemaChange(Map<TableId, TableChanges.TableChange> 
updatedSchemas) {}
+
+    /** Serialize current tableSchemas to JSON for persistence via 
commitOffset. */
+    default String serializeTableSchemas() {
+        return null;
+    }
 
     /**
      * Commits the given offset with the source database. Used by some source 
like Postgres to
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 83c9b349e4e..11e5007894d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.cdcclient.source.reader.mysql;
 
-import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer;
-import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
+import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
+import 
org.apache.doris.cdcclient.source.deserialize.MySqlDebeziumJsonDeserializer;
 import org.apache.doris.cdcclient.source.factory.DataSource;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
 import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext;
-import org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.source.reader.SplitRecords;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -62,7 +62,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
-import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -110,13 +109,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Data
-public class MySqlSourceReader implements SourceReader {
+public class MySqlSourceReader extends AbstractCdcSourceReader {
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlSourceReader.class);
     private static ObjectMapper objectMapper = new ObjectMapper();
-    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
-            new FlinkJsonTableChangeSerializer();
-    private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
-    private Map<TableId, TableChanges.TableChange> tableSchemas;
 
     // Support for multiple snapshot splits with Round-Robin polling
     private List<
@@ -135,7 +130,7 @@ public class MySqlSourceReader implements SourceReader {
     private MySqlBinlogSplitState binlogSplitState;
 
     public MySqlSourceReader() {
-        this.serializer = new DebeziumJsonDeserializer();
+        this.serializer = new MySqlDebeziumJsonDeserializer();
         this.snapshotReaderContexts = new ArrayList<>();
     }
 
@@ -339,6 +334,8 @@ public class MySqlSourceReader implements SourceReader {
     /** Prepare binlog split */
     private SplitReadResult prepareBinlogSplit(
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
+        // Load tableSchemas from FE if available (avoids re-discover on 
restart)
+        tryLoadTableSchemasFromRequest(baseReq);
         Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, 
baseReq);
         this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
         this.binlogReader = getBinlogSplitReader(baseReq);
@@ -778,6 +775,8 @@ public class MySqlSourceReader implements SourceReader {
         configFactory.serverTimeZone(
                 
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
 
+        // Schema change handling for MySQL is not yet implemented; keep 
disabled to avoid
+        // unnecessary processing overhead until DDL support is added.
         configFactory.includeSchemaChanges(false);
 
         String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
@@ -992,7 +991,7 @@ public class MySqlSourceReader implements SourceReader {
     }
 
     @Override
-    public List<String> deserialize(Map<String, String> config, SourceRecord 
element)
+    public DeserializeResult deserialize(Map<String, String> config, 
SourceRecord element)
             throws IOException {
         return serializer.deserialize(config, element);
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index a0bb57ad120..737e36045d9 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -19,6 +19,7 @@ package org.apache.doris.cdcclient.source.reader.postgres;
 
 import org.apache.doris.cdcclient.common.Constants;
 import org.apache.doris.cdcclient.exception.CdcClientException;
+import 
org.apache.doris.cdcclient.source.deserialize.PostgresDebeziumJsonDeserializer;
 import org.apache.doris.cdcclient.source.factory.DataSource;
 import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
@@ -55,6 +56,7 @@ import org.apache.flink.table.types.DataType;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +86,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
 
     public PostgresSourceReader() {
         super();
+        this.setSerializer(new PostgresDebeziumJsonDeserializer());
     }
 
     @Override
@@ -95,6 +98,12 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
             createSlotForGlobalStreamSplit(dialect);
         }
         super.initialize(jobId, dataSource, config);
+        // Inject PG schema refresher so the deserializer can fetch accurate 
column types on DDL
+        if (serializer instanceof PostgresDebeziumJsonDeserializer) {
+            ((PostgresDebeziumJsonDeserializer) serializer)
+                    .setPgSchemaRefresher(
+                            tableId -> refreshSingleTableSchema(tableId, 
config, jobId));
+        }
     }
 
     /**
@@ -359,6 +368,29 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    /**
+     * Fetch the current schema for a single table directly from PostgreSQL 
via JDBC.
+     *
+     * <p>Called by {@link PostgresDebeziumJsonDeserializer} when a schema 
change (ADD/DROP column)
+     * is detected, to obtain accurate PG column types for DDL generation.
+     *
+     * @return the fresh {@link TableChanges.TableChange}
+     */
+    private TableChanges.TableChange refreshSingleTableSchema(
+            TableId tableId, Map<String, String> config, long jobId) {
+        PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+            CustomPostgresSchema customPostgresSchema =
+                    new CustomPostgresSchema((PostgresConnection) 
jdbcConnection, sourceConfig);
+            Map<TableId, TableChanges.TableChange> schemas =
+                    
customPostgresSchema.getTableSchema(Collections.singletonList(tableId));
+            return schemas.get(tableId);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected FetchTask<SourceSplitBase> createFetchTaskFromSplit(
             JobBaseConfig jobConfig, SourceSplitBase split) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
index 4d1356003fb..05407b2c89d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
@@ -52,4 +52,8 @@ public class HttpUtil {
                 .addInterceptorLast(new RequestContent(true))
                 .build();
     }
+
+    public static String getAuthHeader() {
+        return "Basic YWRtaW46";
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
new file mode 100644
index 00000000000..5eea4f1f16f
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue.replace("'", "''") + "'";
+    }
+
+    /** Escape single quotes inside a COMMENT string. */
+    public static String quoteComment(String comment) {
+        if (comment == null) {
+            return "";
+        }
+        return comment.replace("'", "''");
+    }
+
+    // ─── DDL builders 
─────────────────────────────────────────────────────────
+
+    /**
+     * Build {@code ALTER TABLE ... ADD COLUMN} SQL.
+     *
+     * @param db target database
+     * @param table target table
+     * @param colName column name
+     * @param colType Doris column type string (including optional NOT NULL)
+     * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT 
clause
+     * @param comment optional COMMENT; {@code null}/empty = omit COMMENT 
clause
+     */
+    public static String buildAddColumnSql(
+            String db,
+            String table,
+            String colName,
+            String colType,
+            String defaultValue,
+            String comment) {
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                ADD_DDL,
+                                quoteTableIdentifier(db, table),
+                                identifier(colName),
+                                colType));
+        if (defaultValue != null) {
+            sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue));
+        }
+        appendComment(sb, comment);
+        return sb.toString();
+    }
+
+    /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */
+    public static String buildDropColumnSql(String db, String table, String 
colName) {
+        return String.format(DROP_DDL, quoteTableIdentifier(db, table), 
identifier(colName));
+    }
+
+    // ─── Type mapping 
─────────────────────────────────────────────────────────
+
+    /** Convert a Debezium Column to a Doris column type string (via PG type 
name). */
+    public static String columnToDorisType(Column column) {
+        return pgTypeNameToDorisType(column.typeName(), column.length(), 
column.scale().orElse(-1));
+    }
+
+    /** Map a PostgreSQL native type name to a Doris type string. */
+    static String pgTypeNameToDorisType(String pgTypeName, int length, int 
scale) {
+        Preconditions.checkNotNull(pgTypeName);
+        switch (pgTypeName.toLowerCase()) {
+            case "bool":
+                return DorisType.BOOLEAN;
+            case "bit":
+                return length == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+            case "int2":
+            case "smallserial":
+                return DorisType.SMALLINT;
+            case "int4":
+            case "serial":
+                return DorisType.INT;
+            case "int8":
+            case "bigserial":
+                return DorisType.BIGINT;
+            case "float4":
+                return DorisType.FLOAT;
+            case "float8":
+                return DorisType.DOUBLE;
+            case "numeric":
+                {
+                    int p = length > 0 ? Math.min(length, 38) : 38;
+                    int s = scale >= 0 ? scale : 9;
+                    return String.format("%s(%d, %d)", DorisType.DECIMAL, p, 
s);
+                }
+            case "bpchar":
+                {
+                    if (length <= 0) {
+                        return DorisType.STRING;
+                    }
+                    int len = length * 3;
+                    if (len > 255) {
+                        return String.format("%s(%s)", DorisType.VARCHAR, len);
+                    } else {
+                        return String.format("%s(%s)", DorisType.CHAR, len);
+                    }
+                }
+            case "date":
+                return DorisType.DATE;
+            case "timestamp":
+            case "timestamptz":
+                {
+                    int s = (scale >= 0 && scale <= 6) ? scale : 6;
+                    return String.format("%s(%d)", DorisType.DATETIME, s);
+                }
+                // All remaining types map to STRING (aligned with 
JdbcPostgreSQLClient)
+            case "point":
+            case "line":
+            case "lseg":
+            case "box":
+            case "path":
+            case "polygon":
+            case "circle":
+            case "varchar":
+            case "text":
+            case "time":
+            case "timetz":
+            case "interval":
+            case "cidr":
+            case "inet":
+            case "macaddr":
+            case "varbit":
+            case "uuid":
+            case "bytea":
+                return DorisType.STRING;
+            case "json":
+            case "jsonb":
+                return DorisType.JSON;
+            default:
+                LOG.warn("Unrecognized PostgreSQL type '{}', defaulting to 
STRING", pgTypeName);
+                return DorisType.STRING;
+        }
+    }
+
+    // ─── Internal helpers 
─────────────────────────────────────────────────────
+
+    private static void appendComment(StringBuilder sb, String comment) {
+        if (comment != null && !comment.isEmpty()) {
+            sb.append(" COMMENT '").append(quoteComment(comment)).append("'");
+        }
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
new file mode 100644
index 00000000000..b392df9cfcd
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via 
HTTP. */
+public class SchemaChangeManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String COLUMN_EXISTS_MSG = "Can not add column which 
already exists";
+    private static final String COLUMN_NOT_EXISTS_MSG = "Column does not 
exists";
+
+    private SchemaChangeManager() {}
+
+    /**
+     * Execute a list of DDL statements on FE. Each statement is sent 
independently.
+     *
+     * <p>Idempotent errors (ADD COLUMN when column already exists, DROP 
COLUMN when column does not
+     * exist) are logged as warnings and silently skipped, so retries on a 
different BE after a
+     * failed commitOffset do not cause infinite failures.
+     *
+     * @param feAddr Doris FE address (host:port)
+     * @param db target database
+     * @param token FE auth token
+     * @param sqls DDL statements to execute
+     */
+    public static void executeDdls(String feAddr, String db, String token, 
List<String> sqls)
+            throws IOException {
+        if (sqls == null || sqls.isEmpty()) {
+            LOG.info("No DDL statements to execute");
+            return;
+        }
+        for (String stmt : sqls) {
+            stmt = stmt.trim();
+            if (stmt.isEmpty()) {
+                continue;
+            }
+            LOG.info("Executing DDL on FE {}: {}", feAddr, stmt);
+            execute(feAddr, db, token, stmt);
+        }
+    }
+
+    /**
+     * Execute a single SQL statement via the FE query API.
+     *
+     * <p>Idempotent errors are swallowed with a warning; all other errors 
throw {@link
+     * IOException}.
+     */
+    public static void execute(String feAddr, String db, String token, String 
sql)
+            throws IOException {
+        HttpPost post = buildHttpPost(feAddr, db, token, sql);
+        String responseBody = handleResponse(post);
+        LOG.info("Executed DDL {} with response: {}", sql, responseBody);
+        parseResponse(sql, responseBody);
+    }
+
+    // ─── Internal helpers 
─────────────────────────────────────────────────────
+
+    private static HttpPost buildHttpPost(String feAddr, String db, String 
token, String sql)
+            throws IOException {
+        String url = String.format(SCHEMA_CHANGE_API, feAddr, db);
+        Map<String, Object> bodyMap = new HashMap<>();
+        bodyMap.put("stmt", sql);
+        String body = OBJECT_MAPPER.writeValueAsString(bodyMap);
+
+        HttpPost post = new HttpPost(url);
+        post.setHeader("Content-Type", "application/json;charset=UTF-8");
+        post.setHeader("Authorization", HttpUtil.getAuthHeader());
+        post.setHeader("token", token);
+        post.setEntity(new StringEntity(body, "UTF-8"));
+        return post;
+    }
+
+    private static String handleResponse(HttpPost request) throws IOException {
+        try (CloseableHttpClient client = HttpUtil.getHttpClient();
+                CloseableHttpResponse response = client.execute(request)) {
+            String responseBody =
+                    response.getEntity() != null ? 
EntityUtils.toString(response.getEntity()) : "";
+            LOG.debug("HTTP [{}]: {}", request.getURI(), responseBody);
+            return responseBody;
+        }
+    }
+
+    /**
+     * Parse the FE response. Idempotent errors are logged as warnings and 
skipped; all other errors
+     * throw.
+     *
+     * <p>Idempotent conditions (can occur when a previous commitOffset failed 
and a fresh BE
+     * re-detects and re-executes the same DDL):
+     *
+     * <ul>
+     *   <li>ADD COLUMN — "Can not add column which already exists": column 
was already added.
+     *   <li>DROP COLUMN — "Column does not exists": column was already 
dropped.
+     * </ul>
+     */
+    private static void parseResponse(String sql, String responseBody) throws 
IOException {
+        JsonNode root = OBJECT_MAPPER.readTree(responseBody);
+        JsonNode code = root.get("code");
+        if (code != null && code.asInt() == 0) {
+            return;
+        }
+
+        String msg = root.path("msg").asText("");
+
+        if (msg.contains(COLUMN_EXISTS_MSG)) {
+            LOG.warn("[DDL-IDEMPOTENT] Skipped ADD COLUMN (column already 
exists). SQL: {}", sql);
+            return;
+        }
+        if (msg.contains(COLUMN_NOT_EXISTS_MSG)) {
+            LOG.warn("[DDL-IDEMPOTENT] Skipped DROP COLUMN (column already 
absent). SQL: {}", sql);
+            return;
+        }
+
+        LOG.warn("DDL execution failed. SQL: {}. Response: {}", sql, 
responseBody);
+        throw new IOException("Failed to execute schema change: " + 
responseBody);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
new file mode 100644
index 00000000000..b71fe609d4a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit tests for {@link SchemaChangeHelper#pgTypeNameToDorisType}. */
+class SchemaChangeHelperTest {
+
+    // ─── Integer types 
────────────────────────────────────────────────────────
+
+    @Test
+    void integerTypes() {
+        assertEquals(DorisType.SMALLINT, map("int2", -1, -1));
+        assertEquals(DorisType.SMALLINT, map("smallserial", -1, -1));
+        assertEquals(DorisType.INT,      map("int4", -1, -1));
+        assertEquals(DorisType.INT,      map("serial", -1, -1));
+        assertEquals(DorisType.BIGINT,   map("int8", -1, -1));
+        assertEquals(DorisType.BIGINT,   map("bigserial", -1, -1));
+    }
+
+    @Test
+    void floatTypes() {
+        assertEquals(DorisType.FLOAT,  map("float4", -1, -1));
+        assertEquals(DorisType.DOUBLE, map("float8", -1, -1));
+    }
+
+    // ─── Boolean / bit 
───────────────────────────────────────────────────────
+
+    @Test
+    void boolType() {
+        assertEquals(DorisType.BOOLEAN, map("bool", -1, -1));
+    }
+
+    @Test
+    void bitType_singleBit_isBoolean() {
+        assertEquals(DorisType.BOOLEAN, map("bit", 1, -1));
+    }
+
+    @Test
+    void bitType_multiBit_isString() {
+        assertEquals(DorisType.STRING, map("bit", 8, -1));
+        assertEquals(DorisType.STRING, map("bit", 64, -1));
+    }
+
+    // ─── Numeric / decimal 
───────────────────────────────────────────────────
+
+    @Test
+    void numericType_defaultPrecisionScale() {
+        // length <= 0, scale < 0 → DECIMAL(38, 9)
+        assertEquals("DECIMAL(38, 9)", map("numeric", 0, -1));
+        assertEquals("DECIMAL(38, 9)", map("numeric", -1, -1));
+    }
+
+    @Test
+    void numericType_explicitPrecisionScale() {
+        assertEquals("DECIMAL(10, 2)", map("numeric", 10, 2));
+        assertEquals("DECIMAL(5, 0)",  map("numeric", 5, 0));
+    }
+
+    @Test
+    void numericType_precisionCappedAt38() {
+        assertEquals("DECIMAL(38, 4)", map("numeric", 50, 4));
+        assertEquals("DECIMAL(38, 9)", map("numeric", 100, -1));
+    }
+
+    // ─── Char types 
──────────────────────────────────────────────────────────
+
+    @Test
+    void bpchar_shortLength_isChar() {
+        // length=10 → 10*3=30 ≤ 255 → CHAR(30)
+        assertEquals("CHAR(30)", map("bpchar", 10, -1));
+        assertEquals("CHAR(3)",  map("bpchar", 1, -1));
+    }
+
+    @Test
+    void bpchar_longLength_isVarchar() {
+        // length=100 → 100*3=300 > 255 → VARCHAR(300)
+        assertEquals("VARCHAR(300)", map("bpchar", 100, -1));
+        assertEquals("VARCHAR(768)", map("bpchar", 256, -1));
+    }
+
+    @Test
+    void varcharAndText_isString() {
+        assertEquals(DorisType.STRING, map("varchar", 50, -1));
+        assertEquals(DorisType.STRING, map("varchar", -1, -1));
+        assertEquals(DorisType.STRING, map("text", -1, -1));
+    }
+
+    // ─── Date / time 
─────────────────────────────────────────────────────────
+
+    @Test
+    void dateType() {
+        assertEquals(DorisType.DATE, map("date", -1, -1));
+    }
+
+    @Test
+    void timestampType_defaultScale_isDatetime6() {
+        // scale < 0 or > 6 → default to 6
+        assertEquals("DATETIME(6)", map("timestamp", -1, -1));
+        assertEquals("DATETIME(6)", map("timestamptz", -1, -1));
+        assertEquals("DATETIME(6)", map("timestamp", -1, 7));
+    }
+
+    @Test
+    void timestampType_explicitScale() {
+        assertEquals("DATETIME(3)", map("timestamp", -1, 3));
+        assertEquals("DATETIME(0)", map("timestamptz", -1, 0));
+        assertEquals("DATETIME(6)", map("timestamp", -1, 6));
+    }
+
+    @Test
+    void timeTypes_isString() {
+        assertEquals(DorisType.STRING, map("time", -1, -1));
+        assertEquals(DorisType.STRING, map("timetz", -1, -1));
+        assertEquals(DorisType.STRING, map("interval", -1, -1));
+    }
+
+    // ─── JSON 
────────────────────────────────────────────────────────────────
+
+    @Test
+    void jsonTypes() {
+        assertEquals(DorisType.JSON, map("json", -1, -1));
+        assertEquals(DorisType.JSON, map("jsonb", -1, -1));
+    }
+
+    // ─── Geometric / network / misc types (all map to STRING) 
────────────────
+
+    @Test
+    void networkAndMiscTypes_isString() {
+        assertEquals(DorisType.STRING, map("inet", -1, -1));
+        assertEquals(DorisType.STRING, map("cidr", -1, -1));
+        assertEquals(DorisType.STRING, map("macaddr", -1, -1));
+        assertEquals(DorisType.STRING, map("uuid", -1, -1));
+        assertEquals(DorisType.STRING, map("bytea", -1, -1));
+        assertEquals(DorisType.STRING, map("varbit", -1, -1));
+    }
+
+    @Test
+    void geometricTypes_isString() {
+        assertEquals(DorisType.STRING, map("point", -1, -1));
+        assertEquals(DorisType.STRING, map("line", -1, -1));
+        assertEquals(DorisType.STRING, map("lseg", -1, -1));
+        assertEquals(DorisType.STRING, map("box", -1, -1));
+        assertEquals(DorisType.STRING, map("path", -1, -1));
+        assertEquals(DorisType.STRING, map("polygon", -1, -1));
+        assertEquals(DorisType.STRING, map("circle", -1, -1));
+    }
+
+    // ─── Unknown type fallback 
───────────────────────────────────────────────
+
+    @Test
+    void unknownType_defaultsToString() {
+        assertEquals(DorisType.STRING, map("custom_type", -1, -1));
+        assertEquals(DorisType.STRING, map("user_defined_enum", -1, -1));
+    }
+
+    // ─── Case-insensitive matching 
────────────────────────────────────────────
+
+    @Test
+    void caseInsensitive() {
+        assertEquals(DorisType.INT,     map("INT4", -1, -1));
+        assertEquals(DorisType.BIGINT,  map("INT8", -1, -1));
+        assertEquals(DorisType.BOOLEAN, map("BOOL", -1, -1));
+        assertEquals(DorisType.FLOAT,   map("FLOAT4", -1, -1));
+        assertEquals(DorisType.JSON,    map("JSON", -1, -1));
+        assertEquals(DorisType.STRING,  map("TEXT", -1, -1));
+    }
+
+    // ─── helper 
──────────────────────────────────────────────────────────────
+
+    private static String map(String pgType, int length, int scale) {
+        return SchemaChangeHelper.pgTypeNameToDorisType(pgType, length, scale);
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
new file mode 100644
index 00000000000..a0f35f02943
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out
@@ -0,0 +1,32 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot --
+A1     1
+B1     2
+
+-- !add_column --
+A1     1       \N
+B1     2       \N
+C1     10      hello
+
+-- !add_column_dml --
+B1     99      updated
+C1     10      world
+
+-- !drop_column --
+B1     99
+C1     10
+D1     20
+
+-- !rename --
+B1     99
+C1     10
+D1     20
+E1     \N
+
+-- !modify --
+B1     99
+C1     10
+D1     20
+E1     \N
+F1     \N
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
new file mode 100644
index 00000000000..24907705c46
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out
@@ -0,0 +1,31 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !baseline --
+C1     30
+
+-- !double_add --
+C1     30      \N      \N
+D1     40      hello   42
+
+-- !rename_guard --
+C1     30      \N      \N
+D1     40      hello   42
+E1     50      \N      10
+
+-- !rename_guard_update --
+C1     30      \N      \N
+D1     99      \N      42
+E1     50      \N      10
+
+-- !default_col --
+C1     30      default_val
+D1     99      default_val
+E1     50      default_val
+F1     60      default_val
+
+-- !not_null_col --
+C1     30      default_val     required
+D1     99      default_val     required
+E1     50      default_val     required
+F1     60      default_val     required
+G1     70      g1c4    explicit
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
new file mode 100644
index 00000000000..bd590e1d97f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Covers four scenarios in sequence on a single table:
+ *   1. ADD COLUMN    – column added in PG → DDL executed in Doris, new data 
lands correctly.
+ *                      Also verifies: pre-ADD rows get NULL for the new 
column (existing-data
+ *                      correctness), and UPDATE/DELETE right after ADD COLUMN 
are propagated.
+ *   2. DROP COLUMN   – column dropped in PG → DDL executed in Doris, 
subsequent data lands correctly.
+ *   3. RENAME COLUMN – rename detected as simultaneous ADD+DROP (rename 
guard) →
+ *                      no DDL in Doris, 'age' column remains, new rows get 
age=NULL.
+ *   4. MODIFY COLUMN – type-only change is invisible to the name-based diff →
+ *                      no DDL in Doris, data continues to flow.
+ */
+suite("test_streaming_postgres_job_sc", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName     = "test_streaming_postgres_job_name_sc"
+    def currentDb   = (sql "select database()")[0][0]
+    def table1      = "user_info_pg_normal1_sc"
+    def pgDB        = "postgres"
+    def pgSchema    = "cdc_test"
+    def pgUser      = "postgres"
+    def pgPassword  = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── helpers 
───────────────────────────────────────────────────────────
+
+        // Wait until a specific row appears in the Doris target table.
+        def waitForRow = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int > 0
+            })
+        }
+
+        // Wait until a column either exists or no longer exists in the Doris 
table.
+        def waitForColumn = { String colName, boolean shouldExist ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def desc = sql "DESC ${table1}"
+                desc.any { it[0] == colName } == shouldExist
+            })
+        }
+
+        // Wait until a specific row disappears from the Doris target table.
+        def waitForRowGone = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int == 0
+            })
+        }
+
+        // Wait until a specific column value matches the expected value for a 
row.
+        // Comparison is done as strings to avoid JDBC numeric type mismatches 
(e.g. Short vs Integer).
+        def waitForValue = { String rowName, String colName, Object expected ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql "SELECT ${colName} FROM ${table1} WHERE 
name='${rowName}'"
+                rows.size() == 1 && String.valueOf(rows[0][0]) == 
String.valueOf(expected)
+            })
+        }
+
+        // Dump job/task state on assertion failures for easier debugging.
+        def dumpJobState = {
+            log.info("jobs  : " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks : " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+        }
+
+        // ── 0. Create PG table and insert snapshot rows 
───────────────────────
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgSchema}.${table1} (
+                       name VARCHAR(200) PRIMARY KEY,
+                       age  INT2
+                   )"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 2)"""
+        }
+
+        // ── 1. Start streaming job 
────────────────────────────────────────────
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        // Verify the table was auto-created with the expected initial schema.
+        assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() 
== 1
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def initDesc = sql "DESC ${currentDb}.${table1}"
+        assert initDesc.find { it[0] == 'name' }[1] == 'varchar(65533)' : 
"name must be varchar(65533)"
+        assert initDesc.find { it[0] == 'age'  }[1] == 'smallint'       : "age 
must be smallint"
+        assert initDesc.find { it[0] == 'name' }[3] == 'true'           : 
"name must be primary key"
+
+        // Wait for snapshot to finish (job completes ≥ 2 tasks).
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(1, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert")
+                                  where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                cnt.size() == 1 && cnt[0][0] as int >= 2
+            })
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Snapshot data: A1(1), B1(2)
+        qt_snapshot """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 1: ADD COLUMN c1 
────────────────────────────────────────────
+        // PG adds VARCHAR column c1; CDC detects ADD via name diff and 
executes
+        // ALTER TABLE … ADD COLUMN c1 on Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 
VARCHAR(50)"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1) VALUES 
('C1', 10, 'hello')"""
+        }
+
+        try {
+            waitForColumn('c1', true)
+            waitForRow('C1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Verify c1 was added to Doris and the new row is present.
+        assert (sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must 
exist in Doris after ADD COLUMN"
+
+        // Pre-ADD rows must have NULL for the new column (existing-data 
correctness).
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='A1'")[0][0] == null 
: "A1.c1 must be NULL (pre-ADD row)"
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='B1'")[0][0] == null 
: "B1.c1 must be NULL (pre-ADD row)"
+
+        // A1(1,null), B1(2,null), C1(10,'hello')
+        qt_add_column """ SELECT name, age, c1 FROM ${table1} ORDER BY name """
+
+        // ── Phase 1b: UPDATE / DELETE immediately after ADD COLUMN 
───────────
+        // Verifies that UPDATE (touching the new column) and DELETE on 
pre-existing rows
+        // are correctly propagated to Doris after the schema change.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // Update the new column on the just-inserted row.
+            sql """UPDATE ${pgSchema}.${table1} SET c1='world' WHERE 
name='C1'"""
+            // Update both an old column and the new column on a pre-existing 
row.
+            sql """UPDATE ${pgSchema}.${table1} SET age=99, c1='updated' WHERE 
name='B1'"""
+            // Delete a pre-existing row.
+            sql """DELETE FROM ${pgSchema}.${table1} WHERE name='A1'"""
+        }
+
+        try {
+            waitForRowGone('A1')
+            waitForValue('B1', 'age', 99)
+            waitForValue('C1', 'c1', 'world')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // A1 deleted; B1(99,'updated'); C1(10,'world')
+        qt_add_column_dml """ SELECT name, age, c1 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 2: DROP COLUMN c1 
───────────────────────────────────────────
+        // PG drops c1; CDC detects DROP and executes ALTER TABLE … DROP 
COLUMN c1 on Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age) VALUES ('D1', 
20)"""
+        }
+
+        try {
+            waitForColumn('c1', false)
+            waitForRow('D1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Verify c1 was removed from Doris and data flows without it.
+        assert !(sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must 
be gone from Doris after DROP COLUMN"
+        // B1(99), C1(10), D1(20)  [A1 was deleted in Phase 1b]
+        qt_drop_column """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 3: RENAME COLUMN age → age2 (rename guard) 
─────────────────
+        // PG rename looks like a simultaneous ADD(age2) + DROP(age) to the 
name diff.
+        // The rename guard detects this and emits a WARN with no DDL, so 
Doris schema
+        // is unchanged.  New PG rows carry 'age2' which has no matching 
column in Doris,
+        // so 'age' is NULL for those rows.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} RENAME COLUMN age TO 
age2"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES 
('E1', 30)"""
+        }
+
+        try {
+            waitForRow('E1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // 'age' must still exist; 'age2' must NOT have been added.
+        def descAfterRename = sql "DESC ${table1}"
+        assert  descAfterRename.any { it[0] == 'age'  } : "'age' column must 
remain after rename guard"
+        assert !descAfterRename.any { it[0] == 'age2' } : "'age2' must NOT be 
added (rename guard, no DDL)"
+        // B1(99), C1(10), D1(20), E1(null) — age=NULL because PG sends age2 
which Doris ignores
+        qt_rename """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 4: MODIFY COLUMN type (name-only diff, no DDL) 
─────────────
+        // Type-only change is invisible to the name-based diff, so no DDL is 
emitted.
+        // Data continues to flow; age2 values still have no mapping in Doris 
→ age=NULL.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ALTER COLUMN age2 TYPE 
INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES 
('F1', 50)"""
+        }
+
+        try {
+            waitForRow('F1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Doris 'age' column type must remain smallint (mapped from PG int2).
+        assert (sql "DESC ${table1}").find { it[0] == 'age' }[1] == 'smallint' 
\
+            : "Doris 'age' type must remain smallint after type-only change in 
PG"
+        // B1(99), C1(10), D1(20), E1(null), F1(null)
+        qt_modify """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        assert (sql """select * from jobs("type"="insert") where 
Name='${jobName}'""")[0][5] == "RUNNING"
+
+        // ── Cleanup 
───────────────────────────────────────────────────────────
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        assert (sql """select count(1) from jobs("type"="insert") where 
Name='${jobName}'""")[0][0] == 0
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
new file mode 100644
index 00000000000..6593f1cf4f2
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Advanced schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Key differences from the basic schema-change test:
+ *   - Uses offset=latest (incremental-only, no snapshot) to cover the code 
path where
+ *     tableSchemas are discovered from PG JDBC rather than derived from 
snapshot splits.
+ *     This exercises the feHadNoSchema=true branch in PipelineCoordinator.
+ *
+ * Covers uncommon scenarios:
+ *   1. Simultaneous double ADD – two columns added in PG before any DML 
triggers detection;
+ *      both ALTER TABLEs are generated and executed in a single detection 
event.
+ *   2. DROP + ADD simultaneously (rename guard) – dropping one column while 
adding another
+ *      is treated as a potential rename; no DDL is emitted but the cached 
schema is updated.
+ *   3. UPDATE on existing rows after rename guard – verifies that a row whose 
old column (c1)
+ *      was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream 
load replaces the
+ *      whole row without c1 since PG no longer has it).
+ *   4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is 
passed through to
+ *      Doris and that pre-existing rows automatically receive the default 
value after the DDL.
+ *   5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in 
SchemaChangeHelper
+ *      (col.isOptional()=false → appends NOT NULL) and that Doris accepts the 
DDL when a
+ *      DEFAULT is present (satisfying the NOT NULL constraint for existing 
rows).
+ */
+suite("test_streaming_postgres_job_sc_advanced",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+    def jobName   = "test_streaming_pg_sc_advanced"
+    def currentDb = (sql "select database()")[0][0]
+    def table1    = "user_info_pg_normal1_sc_adv"
+    def pgDB      = "postgres"
+    def pgSchema  = "cdc_test"
+    def pgUser    = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── helpers 
───────────────────────────────────────────────────────────
+
+        def waitForRow = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int > 0
+            })
+        }
+
+        def waitForRowGone = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int == 0
+            })
+        }
+
+        def waitForColumn = { String colName, boolean shouldExist ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def desc = sql "DESC ${table1}"
+                desc.any { it[0] == colName } == shouldExist
+            })
+        }
+
+        // Comparison is done as strings to avoid JDBC numeric type mismatches.
+        def waitForValue = { String rowName, String colName, Object expected ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql "SELECT ${colName} FROM ${table1} WHERE 
name='${rowName}'"
+                rows.size() == 1 && String.valueOf(rows[0][0]) == 
String.valueOf(expected)
+            })
+        }
+
+        def dumpJobState = {
+            log.info("jobs  : " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks : " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+        }
+
+        // ── 0. Pre-create PG table with existing rows 
─────────────────────────
+        // A1, B1 are inserted BEFORE the job starts with offset=latest.
+        // They will NOT appear in Doris (no snapshot taken).
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgSchema}.${table1} (
+                       name VARCHAR(200) PRIMARY KEY,
+                       age  INT4
+                   )"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)"""
+        }
+
+        // ── 1. Start streaming job with offset=latest 
─────────────────────────
+        // The Doris table is auto-created from the PG schema at job creation 
time.
+        // Streaming begins from the current WAL LSN — A1 and B1 are not 
captured.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() 
== 1
+
+        // Wait for job to enter RUNNING state (streaming split established).
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+                def rows = sql """select Status from jobs("type"="insert")
+                                   where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                rows.size() == 1 && rows[0][0] == "RUNNING"
+            })
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Baseline: insert C1 to verify streaming is active.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)"""
+        }
+
+        try {
+            waitForRow('C1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // A1, B1 must NOT be present (offset=latest, no snapshot).
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as 
int == 0 \
+            : "A1 must not be present (offset=latest)"
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as 
int == 0 \
+            : "B1 must not be present (offset=latest)"
+
+        // Only C1(30) should be in Doris.
+        qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4) 
──────────────
+        // Both ALTER TABLEs happen in PG before any DML triggers CDC 
detection.
+        // The single INSERT D1 triggers the detection, which fetches the 
fresh PG schema
+        // (already containing both c1 and c2), and generates two ADD COLUMN 
DDLs in one shot.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT"""
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2) 
VALUES ('D1', 40, 'hello', 42)"""
+        }
+
+        try {
+            waitForColumn('c1', true)
+            waitForColumn('c2', true)
+            waitForRow('D1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterDoubleAdd = sql "DESC ${table1}"
+        assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1 
must be added as text"
+        assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int'  : "c2 
must be added as int"
+
+        // Pre-double-ADD row C1 must have NULL for both new columns.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c1 must be NULL"
+        assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c2 must be NULL"
+
+        // C1(30,null,null), D1(40,'hello',42)
+        qt_double_add """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 2: DROP c1 + ADD c3 simultaneously (rename guard) 
──────────
+        // Dropping c1 and adding c3 in the same batch looks like a rename to 
the CDC detector:
+        // simultaneous ADD+DROP triggers the guard → no DDL emitted, cached 
schema updated to
+        // reflect the fresh PG state (c1 gone, c3 present).
+        // Doris table is left with c1 still present; c3 is never added.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1"""
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c3 INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) 
VALUES ('E1', 50, 10, 99)"""
+        }
+
+        try {
+            waitForRow('E1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        def descAfterRenameGuard = sql "DESC ${table1}"
+        assert  descAfterRenameGuard.any { it[0] == 'c1' } : "c1 must remain 
(rename guard prevented DROP)"
+        assert !descAfterRenameGuard.any { it[0] == 'c3' } : "c3 must NOT be 
added (rename guard prevented ADD)"
+
+        // E1.c1=NULL: PG has c3 (not c1), Doris ignores c3 and writes NULL 
for c1.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='E1'")[0][0] == null 
: "E1.c1 must be NULL"
+
+        // C1(30,null,null), D1(40,'hello',42), E1(50,null,10)
+        qt_rename_guard """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY 
name """
+
+        // ── Phase 3: UPDATE existing row after rename guard 
───────────────────
+        // D1 had c1='hello' at insert time. After the rename guard fires, the 
cached schema
+        // reflects PG reality (c1 gone, c3 present). When D1 is updated in PG 
(only c3 exists
+        // for non-key columns), the DML record carries no c1 field. Stream 
load replaces the
+        // entire row → D1.c1 becomes NULL in Doris.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG now has columns: name, age, c2, c3 (c1 was dropped)
+            sql """UPDATE ${pgSchema}.${table1} SET age=99, c3=88 WHERE 
name='D1'"""
+        }
+
+        try {
+            waitForValue('D1', 'age', 99)
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // D1.c1 was 'hello' but after UPDATE the stream load has no c1 field →
+        // Doris replaces the row without c1 → c1=NULL.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='D1'")[0][0] == null \
+            : "D1.c1 must be NULL after UPDATE (c1 dropped from PG, not in 
stream load record)"
+
+        // C1(30,null,null), D1(99,null,null), E1(50,null,10)
+        qt_rename_guard_update """ SELECT name, age, c1, c2 FROM ${table1} 
ORDER BY name """
+
+        // ── Phase 4: ADD COLUMN with DEFAULT value 
────────────────────────────
+        // PG adds a nullable TEXT column with a DEFAULT value.
+        // buildAddColumnSql picks up col.defaultValueExpression() and appends 
DEFAULT 'default_val'
+        // to the Doris ALTER TABLE.  After the DDL, Doris fills the default 
for all pre-existing
+        // rows (metadata operation), so C1/D1/E1 all get c4='default_val' 
without any DML replay.
+        // F1 is inserted without an explicit c4 value → PG fills in the 
default → WAL record
+        // already carries c4='default_val', so Doris writes 'default_val' for 
F1 as well.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG current cols: name, age, c2, c3
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c4 TEXT 
DEFAULT 'default_val'"""
+            // Trigger schema-change detection; omit c4 → PG fills default in 
WAL record.
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) 
VALUES ('F1', 60, 20, 77)"""
+        }
+
+        try {
+            waitForColumn('c4', true)
+            waitForRow('F1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterDefaultAdd = sql "DESC ${table1}"
+        def c4Row = descAfterDefaultAdd.find { it[0] == 'c4' }
+        assert c4Row != null : "c4 must be added"
+        assert c4Row[4] == 'default_val' : "c4 must carry DEFAULT 
'default_val', got: ${c4Row[4]}"
+
+        // Pre-existing rows receive the default value from Doris's ALTER 
TABLE (not from DML replay).
+        try {
+            waitForValue('C1', 'c4', 'default_val')
+            waitForValue('D1', 'c4', 'default_val')
+            waitForValue('E1', 'c4', 'default_val')
+            waitForValue('F1', 'c4', 'default_val')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // C1(30,_,default_val), D1(99,_,default_val), E1(50,_,default_val), 
F1(60,_,default_val)
+        qt_default_col """ SELECT name, age, c4 FROM ${table1} ORDER BY name 
"""
+
+        // ── Phase 5: ADD COLUMN NOT NULL with DEFAULT 
─────────────────────────
+        // In PG, adding a NOT NULL column to a non-empty table requires a 
DEFAULT so existing rows
+        // satisfy the constraint.  Debezium captures col.isOptional()=false, 
so SchemaChangeHelper
+        // appends NOT NULL to the Doris column type, and the DEFAULT clause 
is also passed through.
+        // With both NOT NULL and DEFAULT, Doris can apply the DDL: existing 
rows get the default
+        // value (satisfying NOT NULL), and new rows must supply a value or 
receive the default.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // PG current cols: name, age, c2, c3, c4
+            sql """ALTER TABLE ${pgSchema}.${table1}
+                       ADD COLUMN c5 TEXT NOT NULL DEFAULT 'required'"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3, c4, 
c5)
+                       VALUES ('G1', 70, 30, 66, 'g1c4', 'explicit')"""
+        }
+
+        try {
+            waitForColumn('c5', true)
+            waitForRow('G1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterNotNullAdd = sql "DESC ${table1}"
+        def c5Row = descAfterNotNullAdd.find { it[0] == 'c5' }
+        assert c5Row != null : "c5 must be added"
+        assert c5Row[4] == 'required' : "c5 must carry DEFAULT 'required', 
got: ${c5Row[4]}"
+
+        // Pre-existing rows must have the default value (Doris ALTER TABLE 
fills it).
+        // G1 was inserted with an explicit 'explicit' value.
+        try {
+            waitForValue('C1', 'c5', 'required')
+            waitForValue('D1', 'c5', 'required')
+            waitForValue('G1', 'c5', 'explicit')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // C1(_,default_val,required), D1(_,default_val,required), 
...G1(_,g1c4,explicit)
+        qt_not_null_col """ SELECT name, age, c4, c5 FROM ${table1} ORDER BY 
name """
+
+        assert (sql """select * from jobs("type"="insert") where 
Name='${jobName}'""")[0][5] == "RUNNING"
+
+        // ── Cleanup 
───────────────────────────────────────────────────────────
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        assert (sql """select count(1) from jobs("type"="insert") where 
Name='${jobName}'""")[0][0] == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to