Copilot commented on code in PR #61182:
URL: https://github.com/apache/doris/pull/61182#discussion_r2910697989


##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy:
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Advanced schema-change regression for the PostgreSQL CDC streaming job.
+ *
+ * Key differences from the basic schema-change test:
+ *   - Uses offset=latest (incremental-only, no snapshot) to cover the code 
path where
+ *     tableSchemas are discovered from PG JDBC rather than derived from 
snapshot splits.
+ *     This exercises the feHadNoSchema=true branch in PipelineCoordinator.
+ *
+ * Covers uncommon scenarios:
+ *   1. Simultaneous double ADD – two columns added in PG before any DML 
triggers detection;
+ *      both ALTER TABLEs are generated and executed in a single detection 
event.
+ *   2. DROP + ADD simultaneously (rename guard) – dropping one column while 
adding another
+ *      is treated as a potential rename; no DDL is emitted but the cached 
schema is updated.
+ *   3. UPDATE on existing rows after rename guard – verifies that a row whose 
old column (c1)
+ *      was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream 
load replaces the
+ *      whole row without c1 since PG no longer has it).
+ *   4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is 
passed through to
+ *      Doris and that pre-existing rows automatically receive the default 
value after the DDL.
+ *   5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in 
SchemaChangeHelper
+ *      (col.isOptional()=false → appends NOT NULL) and that Doris accepts the 
DDL when a
+ *      DEFAULT is present (satisfying the NOT NULL constraint for existing 
rows).
+ */
+suite("test_streaming_postgres_job_sc_advanced",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+    def jobName   = "test_streaming_pg_sc_advanced"
+    def currentDb = (sql "select database()")[0][0]
+    def table1    = "user_info_pg_normal1_sc_adv"
+    def pgDB      = "postgres"
+    def pgSchema  = "cdc_test"
+    def pgUser    = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port       = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint   = getS3Endpoint()
+        String bucket        = getS3BucketName()
+        String driver_url    = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // ── helpers 
───────────────────────────────────────────────────────────
+
+        def waitForRow = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int > 0
+            })
+        }
+
+        def waitForRowGone = { String rowName ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'"
+                )[0][0] as int == 0
+            })
+        }
+
+        def waitForColumn = { String colName, boolean shouldExist ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def desc = sql "DESC ${table1}"
+                desc.any { it[0] == colName } == shouldExist
+            })
+        }
+
+        // Comparison is done as strings to avoid JDBC numeric type mismatches.
+        def waitForValue = { String rowName, String colName, Object expected ->
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql "SELECT ${colName} FROM ${table1} WHERE 
name='${rowName}'"
+                rows.size() == 1 && String.valueOf(rows[0][0]) == 
String.valueOf(expected)
+            })
+        }
+
+        def dumpJobState = {
+            log.info("jobs  : " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks : " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+        }
+
+        // ── 0. Pre-create PG table with existing rows 
─────────────────────────
+        // A1, B1 are inserted BEFORE the job starts with offset=latest.
+        // They will NOT appear in Doris (no snapshot taken).
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgSchema}.${table1} (
+                       name VARCHAR(200) PRIMARY KEY,
+                       age  INT4
+                   )"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)"""
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)"""
+        }
+
+        // ── 1. Start streaming job with offset=latest 
─────────────────────────
+        // The Doris table is auto-created from the PG schema at job creation 
time.
+        // Streaming begins from the current WAL LSN — A1 and B1 are not 
captured.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url"       = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url"     = "${driver_url}",
+                    "driver_class"   = "org.postgresql.Driver",
+                    "user"           = "${pgUser}",
+                    "password"       = "${pgPassword}",
+                    "database"       = "${pgDB}",
+                    "schema"         = "${pgSchema}",
+                    "include_tables" = "${table1}",
+                    "offset"         = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                    "table.create.properties.replication_num" = "1"
+                )"""
+
+        assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() 
== 1
+
+        // Wait for job to enter RUNNING state (streaming split established).
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(1, 
SECONDS).until({
+                def rows = sql """select Status from jobs("type"="insert")
+                                   where Name='${jobName}' and 
ExecuteType='STREAMING'"""
+                rows.size() == 1 && rows[0][0] == "RUNNING"
+            })
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // Baseline: insert C1 to verify streaming is active.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)"""
+        }
+
+        try {
+            waitForRow('C1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // A1, B1 must NOT be present (offset=latest, no snapshot).
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as 
int == 0 \
+            : "A1 must not be present (offset=latest)"
+        assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as 
int == 0 \
+            : "B1 must not be present (offset=latest)"
+
+        // Only C1(30) should be in Doris.
+        qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """
+
+        // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4) 
──────────────
+        // Both ALTER TABLEs happen in PG before any DML triggers CDC 
detection.
+        // The single INSERT D1 triggers the detection, which fetches the 
fresh PG schema
+        // (already containing both c1 and c2), and generates two ADD COLUMN 
DDLs in one shot.
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT"""
+            sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4"""
+            sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2) 
VALUES ('D1', 40, 'hello', 42)"""
+        }
+
+        try {
+            waitForColumn('c1', true)
+            waitForColumn('c2', true)
+            waitForRow('D1')
+        } catch (Exception ex) {
+            dumpJobState()
+            throw ex
+        }
+
+        // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), 
Extra(5)
+        def descAfterDoubleAdd = sql "DESC ${table1}"
+        assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1 
must be added as text"
+        assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int'  : "c2 
must be added as int"
+
+        // Pre-double-ADD row C1 must have NULL for both new columns.
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c1 must be NULL"
+        assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null 
: "C1.c2 must be NULL"

Review Comment:
   The second assertion is intended to validate `C1.c2` but it queries `SELECT 
c1` again, so it never verifies `c2`. Update the query to select `c2` to 
actually test the intended behavior.
   ```suggestion
           assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == 
null : "C1.c2 must be NULL"
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue + "'";

Review Comment:
   String default values are wrapped in single quotes without escaping embedded 
quotes. A default like `O'Reilly` will generate invalid SQL (and opens an 
injection surface if defaults can be influenced). Escape single quotes in 
`defaultValue` for SQL literals (e.g., replace `'` with `''`) before quoting.
   ```suggestion
           // Escape single quotes inside the string literal for SQL (e.g., 
O'Reilly -> 'O''Reilly')
           return "'" + defaultValue.replace("'", "''") + "'";
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java:
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MySQL-specific deserializer that handles DDL schema change events.
+ *
+ * <p>When a schema change event is detected, it parses the HistoryRecord, 
computes the diff against
+ * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a 
SCHEMA_CHANGE result.
+ */
+public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class);
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    private String targetDb;
+
+    @Override
+    public void init(Map<String, String> props) {
+        super.init(props);
+        this.targetDb = props.get(DataSourceConfigKeys.DATABASE);
+    }
+
+    @Override
+    public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
+            throws IOException {
+        if (RecordUtils.isSchemaChangeEvent(record)) {
+            return handleSchemaChangeEvent(record, context);
+        }
+        return super.deserialize(context, record);
+    }
+
+    private DeserializeResult handleSchemaChangeEvent(
+            SourceRecord record, Map<String, String> context) {
+        // todo: record has mysql ddl, need to convert doris ddl
+        return DeserializeResult.empty();

Review Comment:
   This method currently drops MySQL schema change events on the floor. With 
MySQL schema changes enabled in the reader, this TODO becomes a runtime 
behavior (schema events silently ignored). Either implement conversion to Doris 
DDL (return `DeserializeResult.schemaChange(...)`) or explicitly disable schema 
change events until supported.
   ```suggestion
           // Currently, MySQL schema change (DDL) events are not supported.
           // Failing fast here prevents silently dropping schema changes when 
they are enabled
           // in the upstream MySQL CDC source (e.g., include.schema.changes = 
true).
           LOG.error(
                   "Received unsupported MySQL schema change event. "
                           + "Disable schema change events in the MySQL CDC 
source configuration "
                           + "(for example, set 'include.schema.changes' to 
false). "
                           + "Record: {}",
                   record);
           throw new UnsupportedOperationException(
                   "MySQL schema change events are not supported by 
MySqlDebeziumJsonDeserializer. "
                           + "Disable schema change events in the MySQL CDC 
source configuration.");
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java:
##########
@@ -52,4 +52,8 @@ protected boolean isRedirectable(String method) {
                 .addInterceptorLast(new RequestContent(true))
                 .build();
     }
+
+    public static String getAuthHeader() {
+        return "Basic YWRtaW46";
+    }

Review Comment:
   This hardcodes a Basic auth header (`admin:`) in code. If this header is 
required, it should be derived from configured credentials (or removed if token 
auth is sufficient) to avoid embedding credentials and to support non-default 
deployments.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -778,7 +775,7 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, 
String> cdcConfig, Str
         configFactory.serverTimeZone(
                 
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
 
-        configFactory.includeSchemaChanges(false);
+        configFactory.includeSchemaChanges(true);

Review Comment:
   Schema change events are now enabled for MySQL, but the MySQL schema-change 
handling path is currently a stub (see 
`MySqlDebeziumJsonDeserializer#handleSchemaChangeEvent` returning empty). If 
MySQL DDL support is not ready, keep `includeSchemaChanges(false)` to avoid 
emitting and processing extra non-DML records (overhead + confusing behavior).
   ```suggestion
           // Disable schema change events because 
MySqlDebeziumJsonDeserializer does not handle them yet.
           configFactory.includeSchemaChanges(false);
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue + "'";
+    }
+
+    /** Escape single quotes inside a COMMENT string. */
+    public static String quoteComment(String comment) {
+        if (comment == null) {
+            return "";
+        }
+        return comment.replaceAll("'", "\\\\'");

Review Comment:
   Escaping comment strings with `\\'` is not standard SQL string literal 
escaping and may be rejected or interpreted differently by Doris. Prefer 
SQL-standard escaping by doubling quotes (`'` → `''`). Also, `replaceAll` is 
regex-based; `replace` is safer and faster for literal replacement.
   ```suggestion
           // Use SQL-standard escaping: single quote -> two single quotes, and 
avoid regex-based replaceAll
           return comment.replace("'", "''");
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -84,6 +86,9 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
 
     public PostgresSourceReader() {
         super();
+        this.setSerializer(
+                new org.apache.doris.cdcclient.source.deserialize
+                        .PostgresDebeziumJsonDeserializer());
     }
 

Review Comment:
   The class is already imported (`PostgresDebeziumJsonDeserializer`), but the 
instantiation uses a fully-qualified name split across lines. Prefer using the 
imported type directly to improve readability and reduce unnecessary verbosity.
   ```suggestion
           this.setSerializer(new PostgresDebeziumJsonDeserializer());
       }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest 
writeRecordRequest) throws Exception
                     }
 
                     // Process data messages
-                    List<String> serializedRecords =
-                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                    if (!CollectionUtils.isEmpty(serializedRecords)) {
-                        String database = writeRecordRequest.getTargetDb();
+                    DeserializeResult result =
+                            sourceReader.deserialize(deserializeContext, 
element);
+
+                    if (result.getType() == 
DeserializeResult.Type.SCHEMA_CHANGE) {
+                        // Flush pending data before DDL
+                        batchStreamLoad.forceFlush();
+                        SchemaChangeManager.executeDdls(feAddr, targetDb, 
token, result.getDdls());
+                        hasExecuteDDL = true;

Review Comment:
   For rename-guard cases, `result.getDdls()` is intentionally empty. Calling 
`executeDdls` will log `No DDL statements to execute` as WARN, which can become 
noisy and look like an operational issue. Consider short-circuiting here when 
`result.getDdls()` is null/empty, or downgrade that log in `executeDdls` to 
INFO/DEBUG for empty lists.
   ```suggestion
                           if (!CollectionUtils.isEmpty(result.getDdls())) {
                               SchemaChangeManager.executeDdls(feAddr, 
targetDb, token, result.getDdls());
                               hasExecuteDDL = true;
                           }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Static utility class for executing DDL schema changes on the Doris FE via 
HTTP. */
+public class SchemaChangeManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeManager.class);
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String COLUMN_EXISTS_MSG = "Can not add column which 
already exists";
+    private static final String COLUMN_NOT_EXISTS_MSG = "Column does not 
exists";

Review Comment:
   Idempotency relies on substring matching of FE error messages. 
`COLUMN_NOT_EXISTS_MSG` uses the grammatically incorrect `does not exists`, 
which is unlikely to match typical server messages (`does not exist`) and would 
cause DROP COLUMN retries to fail instead of being swallowed. Consider matching 
both variants or using a more robust predicate (e.g., case-insensitive contains 
for `column` + `does not exist`).
   ```suggestion
       private static final String COLUMN_NOT_EXISTS_MSG = "Column does not 
exist";
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java:
##########
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.DorisType;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */
+public class SchemaChangeHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+    private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    private SchemaChangeHelper() {}
+
+    // ─── Schema diff result 
────────────────────────────────────────────────────
+
+    /**
+     * Holds the result of a full schema comparison between an after-schema 
and stored TableChange.
+     */
+    public static class SchemaDiff {
+        /** Fields present in afterSchema but absent from stored. */
+        public final List<Field> added;
+
+        /** Column names present in stored but absent from afterSchema. */
+        public final List<String> dropped;
+
+        /** Same-named columns whose Doris type or default value differs. */
+        public final Map<String, Field> modified;
+
+        public SchemaDiff(List<Field> added, List<String> dropped, Map<String, 
Field> modified) {
+            this.added = added;
+            this.dropped = dropped;
+            this.modified = modified;
+        }
+
+        public boolean isEmpty() {
+            return added.isEmpty() && dropped.isEmpty() && modified.isEmpty();
+        }
+    }
+
+    // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) 
──────
+
+    /**
+     * Name-only schema diff: compare field names in {@code afterSchema} 
against the stored {@link
+     * TableChanges.TableChange}, detecting added and dropped columns by name 
only.
+     *
+     * <p>Only support add and drop and not support modify and rename
+     *
+     * <p>When {@code stored} is null or empty, both lists are empty (no 
baseline to diff against).
+     */
+    public static SchemaDiff diffSchemaByName(Schema afterSchema, 
TableChanges.TableChange stored) {
+        List<Field> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+
+        if (afterSchema == null || stored == null || stored.getTable() == 
null) {
+            return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+        }
+
+        // Detect added: fields present in afterSchema but absent from stored
+        for (Field field : afterSchema.fields()) {
+            if (stored.getTable().columnWithName(field.name()) == null) {
+                added.add(field);
+            }
+        }
+
+        // Detect dropped: columns present in stored but absent from 
afterSchema
+        for (Column col : stored.getTable().columns()) {
+            if (afterSchema.field(col.name()) == null) {
+                dropped.add(col.name());
+            }
+        }
+
+        return new SchemaDiff(added, dropped, new LinkedHashMap<>());
+    }
+
+    // ─── Quoting helpers 
──────────────────────────────────────────────────────
+
+    /** Wrap a name in backticks if not already quoted. */
+    public static String identifier(String name) {
+        if (name.startsWith("`") && name.endsWith("`")) {
+            return name;
+        }
+        return "`" + name + "`";
+    }
+
+    /** Return a fully-qualified {@code `db`.`table`} identifier string. */
+    public static String quoteTableIdentifier(String db, String table) {
+        return identifier(db) + "." + identifier(table);
+    }
+
+    /**
+     * Format a default value (already a plain Java string, not a raw SQL 
expression) into a form
+     * suitable for a Doris {@code DEFAULT} clause.
+     *
+     * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. 
obtained from the
+     * Kafka Connect schema via {@code 
field.schema().defaultValue().toString()} — rather than a raw
+     * PG SQL expression. This avoids having to strip PG-specific type casts 
({@code ::text}, etc.).
+     *
+     * <ul>
+     *   <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code 
TRUE}, {@code FALSE}) are
+     *       returned as-is.
+     *   <li>Numeric literals are returned as-is (no quotes).
+     *   <li>Everything else is wrapped in single quotes.
+     * </ul>
+     */
+    public static String quoteDefaultValue(String defaultValue) {
+        if (defaultValue == null) {
+            return null;
+        }
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")
+                || defaultValue.equalsIgnoreCase("true")
+                || defaultValue.equalsIgnoreCase("false")) {
+            return defaultValue;
+        }
+        try {
+            Double.parseDouble(defaultValue);
+            return defaultValue;
+        } catch (NumberFormatException ignored) {
+            // fall through
+        }
+        return "'" + defaultValue + "'";
+    }
+
+    /** Escape single quotes inside a COMMENT string. */
+    public static String quoteComment(String comment) {
+        if (comment == null) {
+            return "";
+        }
+        return comment.replaceAll("'", "\\\\'");
+    }
+
+    // ─── DDL builders 
─────────────────────────────────────────────────────────
+
+    /**
+     * Build {@code ALTER TABLE ... ADD COLUMN} SQL.
+     *
+     * @param db target database
+     * @param table target table
+     * @param colName column name
+     * @param colType Doris column type string (including optional NOT NULL)
+     * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT 
clause
+     * @param comment optional COMMENT; {@code null}/empty = omit COMMENT 
clause
+     */
+    public static String buildAddColumnSql(
+            String db,
+            String table,
+            String colName,
+            String colType,
+            String defaultValue,
+            String comment) {
+        StringBuilder sb =
+                new StringBuilder(
+                        String.format(
+                                ADD_DDL,
+                                quoteTableIdentifier(db, table),
+                                identifier(colName),
+                                colType));
+        if (defaultValue != null) {
+            sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue));
+        }
+        appendComment(sb, comment);
+        return sb.toString();
+    }
+
+    /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */
+    public static String buildDropColumnSql(String db, String table, String 
colName) {
+        return String.format(DROP_DDL, quoteTableIdentifier(db, table), 
identifier(colName));
+    }
+
+    // ─── Type mapping 
─────────────────────────────────────────────────────────
+
+    /** Convert a Debezium Column to a Doris column type string (via PG type 
name). */
+    public static String columnToDorisType(Column column) {
+        return pgTypeNameToDorisType(column.typeName(), column.length(), 
column.scale().orElse(-1));
+    }
+
+    /** Map a PostgreSQL native type name to a Doris type string. */
+    static String pgTypeNameToDorisType(String pgTypeName, int length, int 
scale) {
+        Preconditions.checkNotNull(pgTypeName);
+        switch (pgTypeName.toLowerCase()) {
+            case "bool":
+                return DorisType.BOOLEAN;
+            case "bit":
+                return length == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+            case "int2":
+            case "smallserial":
+                return DorisType.SMALLINT;
+            case "int4":
+            case "serial":
+                return DorisType.INT;
+            case "int8":
+            case "bigserial":
+                return DorisType.BIGINT;
+            case "float4":
+                return DorisType.FLOAT;
+            case "float8":
+                return DorisType.DOUBLE;
+            case "numeric":
+                {
+                    int p = length > 0 ? Math.min(length, 38) : 38;
+                    int s = scale >= 0 ? scale : 9;
+                    return String.format("%s(%d, %d)", DorisType.DECIMAL, p, 
s);
+                }
+            case "bpchar":
+                {
+                    int len = length * 3;
+                    if (len > 255) {
+                        return String.format("%s(%s)", DorisType.VARCHAR, len);
+                    } else {
+                        return String.format("%s(%s)", DorisType.CHAR, len);
+                    }
+                }

Review Comment:
   If `length` is 0 or negative (which can happen when upstream metadata is 
missing/unknown), this produces `CHAR(0)`/`CHAR(-3)` which is invalid DDL. 
Guard for `length <= 0` and fall back to a safe default (e.g., `STRING` or a 
capped VARCHAR) before multiplying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to