Copilot commented on code in PR #61182: URL: https://github.com/apache/doris/pull/61182#discussion_r2910697989
########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy: ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Advanced schema-change regression for the PostgreSQL CDC streaming job. + * + * Key differences from the basic schema-change test: + * - Uses offset=latest (incremental-only, no snapshot) to cover the code path where + * tableSchemas are discovered from PG JDBC rather than derived from snapshot splits. + * This exercises the feHadNoSchema=true branch in PipelineCoordinator. + * + * Covers uncommon scenarios: + * 1. Simultaneous double ADD – two columns added in PG before any DML triggers detection; + * both ALTER TABLEs are generated and executed in a single detection event. + * 2. DROP + ADD simultaneously (rename guard) – dropping one column while adding another + * is treated as a potential rename; no DDL is emitted but the cached schema is updated. + * 3. UPDATE on existing rows after rename guard – verifies that a row whose old column (c1) + * was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream load replaces the + * whole row without c1 since PG no longer has it). + * 4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is passed through to + * Doris and that pre-existing rows automatically receive the default value after the DDL. + * 5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in SchemaChangeHelper + * (col.isOptional()=false → appends NOT NULL) and that Doris accepts the DDL when a + * DEFAULT is present (satisfying the NOT NULL constraint for existing rows). + */ +suite("test_streaming_postgres_job_sc_advanced", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + + def jobName = "test_streaming_pg_sc_advanced" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1_sc_adv" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // ── helpers ─────────────────────────────────────────────────────────── + + def waitForRow = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int > 0 + }) + } + + def waitForRowGone = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int == 0 + }) + } + + def waitForColumn = { String colName, boolean shouldExist -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def desc = sql "DESC ${table1}" + desc.any { it[0] == colName } == shouldExist + }) + } + + // Comparison is done as strings to avoid JDBC numeric type mismatches. + def waitForValue = { String rowName, String colName, Object expected -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql "SELECT ${colName} FROM ${table1} WHERE name='${rowName}'" + rows.size() == 1 && String.valueOf(rows[0][0]) == String.valueOf(expected) + }) + } + + def dumpJobState = { + log.info("jobs : " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks : " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + } + + // ── 0. Pre-create PG table with existing rows ───────────────────────── + // A1, B1 are inserted BEFORE the job starts with offset=latest. + // They will NOT appear in Doris (no snapshot taken). + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgSchema}.${table1} ( + name VARCHAR(200) PRIMARY KEY, + age INT4 + )""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)""" + } + + // ── 1. Start streaming job with offset=latest ───────────────────────── + // The Doris table is auto-created from the PG schema at job creation time. + // Streaming begins from the current WAL LSN — A1 and B1 are not captured. + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() == 1 + + // Wait for job to enter RUNNING state (streaming split established). + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(1, SECONDS).until({ + def rows = sql """select Status from jobs("type"="insert") + where Name='${jobName}' and ExecuteType='STREAMING'""" + rows.size() == 1 && rows[0][0] == "RUNNING" + }) + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Baseline: insert C1 to verify streaming is active. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)""" + } + + try { + waitForRow('C1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // A1, B1 must NOT be present (offset=latest, no snapshot). + assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as int == 0 \ + : "A1 must not be present (offset=latest)" + assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as int == 0 \ + : "B1 must not be present (offset=latest)" + + // Only C1(30) should be in Doris. + qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """ + + // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4) ────────────── + // Both ALTER TABLEs happen in PG before any DML triggers CDC detection. + // The single INSERT D1 triggers the detection, which fetches the fresh PG schema + // (already containing both c1 and c2), and generates two ADD COLUMN DDLs in one shot. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT""" + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2) VALUES ('D1', 40, 'hello', 42)""" + } + + try { + waitForColumn('c1', true) + waitForColumn('c2', true) + waitForRow('D1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), Extra(5) + def descAfterDoubleAdd = sql "DESC ${table1}" + assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1 must be added as text" + assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int' : "c2 must be added as int" + + // Pre-double-ADD row C1 must have NULL for both new columns. + assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null : "C1.c1 must be NULL" + assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null : "C1.c2 must be NULL" Review Comment: The second assertion is intended to validate `C1.c2` but it queries `SELECT c1` again, so it never verifies `c2`. Update the query to select `c2` to actually test the intended behavior. ```suggestion assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == null : "C1.c2 must be NULL" ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java: ########## @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.DorisType; + +import org.apache.flink.util.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.debezium.relational.Column; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */ +public class SchemaChangeHelper { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeHelper.class); + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + private SchemaChangeHelper() {} + + // ─── Schema diff result ──────────────────────────────────────────────────── + + /** + * Holds the result of a full schema comparison between an after-schema and stored TableChange. + */ + public static class SchemaDiff { + /** Fields present in afterSchema but absent from stored. */ + public final List<Field> added; + + /** Column names present in stored but absent from afterSchema. */ + public final List<String> dropped; + + /** Same-named columns whose Doris type or default value differs. */ + public final Map<String, Field> modified; + + public SchemaDiff(List<Field> added, List<String> dropped, Map<String, Field> modified) { + this.added = added; + this.dropped = dropped; + this.modified = modified; + } + + public boolean isEmpty() { + return added.isEmpty() && dropped.isEmpty() && modified.isEmpty(); + } + } + + // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) ────── + + /** + * Name-only schema diff: compare field names in {@code afterSchema} against the stored {@link + * TableChanges.TableChange}, detecting added and dropped columns by name only. + * + * <p>Only support add and drop and not support modify and rename + * + * <p>When {@code stored} is null or empty, both lists are empty (no baseline to diff against). + */ + public static SchemaDiff diffSchemaByName(Schema afterSchema, TableChanges.TableChange stored) { + List<Field> added = new ArrayList<>(); + List<String> dropped = new ArrayList<>(); + + if (afterSchema == null || stored == null || stored.getTable() == null) { + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // Detect added: fields present in afterSchema but absent from stored + for (Field field : afterSchema.fields()) { + if (stored.getTable().columnWithName(field.name()) == null) { + added.add(field); + } + } + + // Detect dropped: columns present in stored but absent from afterSchema + for (Column col : stored.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + dropped.add(col.name()); + } + } + + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // ─── Quoting helpers ────────────────────────────────────────────────────── + + /** Wrap a name in backticks if not already quoted. */ + public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } + return "`" + name + "`"; + } + + /** Return a fully-qualified {@code `db`.`table`} identifier string. */ + public static String quoteTableIdentifier(String db, String table) { + return identifier(db) + "." + identifier(table); + } + + /** + * Format a default value (already a plain Java string, not a raw SQL expression) into a form + * suitable for a Doris {@code DEFAULT} clause. + * + * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. obtained from the + * Kafka Connect schema via {@code field.schema().defaultValue().toString()} — rather than a raw + * PG SQL expression. This avoids having to strip PG-specific type casts ({@code ::text}, etc.). + * + * <ul> + * <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code TRUE}, {@code FALSE}) are + * returned as-is. + * <li>Numeric literals are returned as-is (no quotes). + * <li>Everything else is wrapped in single quotes. + * </ul> + */ + public static String quoteDefaultValue(String defaultValue) { + if (defaultValue == null) { + return null; + } + if (defaultValue.equalsIgnoreCase("current_timestamp") + || defaultValue.equalsIgnoreCase("null") + || defaultValue.equalsIgnoreCase("true") + || defaultValue.equalsIgnoreCase("false")) { + return defaultValue; + } + try { + Double.parseDouble(defaultValue); + return defaultValue; + } catch (NumberFormatException ignored) { + // fall through + } + return "'" + defaultValue + "'"; Review Comment: String default values are wrapped in single quotes without escaping embedded quotes. A default like `O'Reilly` will generate invalid SQL (and opens an injection surface if defaults can be influenced). Escape single quotes in `defaultValue` for SQL literals (e.g., replace `'` with `''`) before quoting. ```suggestion // Escape single quotes inside the string literal for SQL (e.g., O'Reilly -> 'O''Reilly') return "'" + defaultValue.replace("'", "''") + "'"; ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java: ########## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.deserialize; + +import org.apache.doris.job.cdc.DataSourceConfigKeys; + +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-specific deserializer that handles DDL schema change events. + * + * <p>When a schema change event is detected, it parses the HistoryRecord, computes the diff against + * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a SCHEMA_CHANGE result. + */ +public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class); + private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + private String targetDb; + + @Override + public void init(Map<String, String> props) { + super.init(props); + this.targetDb = props.get(DataSourceConfigKeys.DATABASE); + } + + @Override + public DeserializeResult deserialize(Map<String, String> context, SourceRecord record) + throws IOException { + if (RecordUtils.isSchemaChangeEvent(record)) { + return handleSchemaChangeEvent(record, context); + } + return super.deserialize(context, record); + } + + private DeserializeResult handleSchemaChangeEvent( + SourceRecord record, Map<String, String> context) { + // todo: record has mysql ddl, need to convert doris ddl + return DeserializeResult.empty(); Review Comment: This method currently drops MySQL schema change events on the floor. With MySQL schema changes enabled in the reader, this TODO becomes a runtime behavior (schema events silently ignored). Either implement conversion to Doris DDL (return `DeserializeResult.schemaChange(...)`) or explicitly disable schema change events until supported. ```suggestion // Currently, MySQL schema change (DDL) events are not supported. // Failing fast here prevents silently dropping schema changes when they are enabled // in the upstream MySQL CDC source (e.g., include.schema.changes = true). LOG.error( "Received unsupported MySQL schema change event. " + "Disable schema change events in the MySQL CDC source configuration " + "(for example, set 'include.schema.changes' to false). " + "Record: {}", record); throw new UnsupportedOperationException( "MySQL schema change events are not supported by MySqlDebeziumJsonDeserializer. " + "Disable schema change events in the MySQL CDC source configuration."); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java: ########## @@ -52,4 +52,8 @@ protected boolean isRedirectable(String method) { .addInterceptorLast(new RequestContent(true)) .build(); } + + public static String getAuthHeader() { + return "Basic YWRtaW46"; + } Review Comment: This hardcodes a Basic auth header (`admin:`) in code. If this header is required, it should be derived from configured credentials (or removed if token auth is sufficient) to avoid embedding credentials and to support non-default deployments. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java: ########## @@ -778,7 +775,7 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str configFactory.serverTimeZone( ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString()); - configFactory.includeSchemaChanges(false); + configFactory.includeSchemaChanges(true); Review Comment: Schema change events are now enabled for MySQL, but the MySQL schema-change handling path is currently a stub (see `MySqlDebeziumJsonDeserializer#handleSchemaChangeEvent` returning empty). If MySQL DDL support is not ready, keep `includeSchemaChanges(false)` to avoid emitting and processing extra non-DML records (overhead + confusing behavior). ```suggestion // Disable schema change events because MySqlDebeziumJsonDeserializer does not handle them yet. configFactory.includeSchemaChanges(false); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java: ########## @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.DorisType; + +import org.apache.flink.util.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.debezium.relational.Column; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */ +public class SchemaChangeHelper { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeHelper.class); + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + private SchemaChangeHelper() {} + + // ─── Schema diff result ──────────────────────────────────────────────────── + + /** + * Holds the result of a full schema comparison between an after-schema and stored TableChange. + */ + public static class SchemaDiff { + /** Fields present in afterSchema but absent from stored. */ + public final List<Field> added; + + /** Column names present in stored but absent from afterSchema. */ + public final List<String> dropped; + + /** Same-named columns whose Doris type or default value differs. */ + public final Map<String, Field> modified; + + public SchemaDiff(List<Field> added, List<String> dropped, Map<String, Field> modified) { + this.added = added; + this.dropped = dropped; + this.modified = modified; + } + + public boolean isEmpty() { + return added.isEmpty() && dropped.isEmpty() && modified.isEmpty(); + } + } + + // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) ────── + + /** + * Name-only schema diff: compare field names in {@code afterSchema} against the stored {@link + * TableChanges.TableChange}, detecting added and dropped columns by name only. + * + * <p>Only support add and drop and not support modify and rename + * + * <p>When {@code stored} is null or empty, both lists are empty (no baseline to diff against). + */ + public static SchemaDiff diffSchemaByName(Schema afterSchema, TableChanges.TableChange stored) { + List<Field> added = new ArrayList<>(); + List<String> dropped = new ArrayList<>(); + + if (afterSchema == null || stored == null || stored.getTable() == null) { + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // Detect added: fields present in afterSchema but absent from stored + for (Field field : afterSchema.fields()) { + if (stored.getTable().columnWithName(field.name()) == null) { + added.add(field); + } + } + + // Detect dropped: columns present in stored but absent from afterSchema + for (Column col : stored.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + dropped.add(col.name()); + } + } + + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // ─── Quoting helpers ────────────────────────────────────────────────────── + + /** Wrap a name in backticks if not already quoted. */ + public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } + return "`" + name + "`"; + } + + /** Return a fully-qualified {@code `db`.`table`} identifier string. */ + public static String quoteTableIdentifier(String db, String table) { + return identifier(db) + "." + identifier(table); + } + + /** + * Format a default value (already a plain Java string, not a raw SQL expression) into a form + * suitable for a Doris {@code DEFAULT} clause. + * + * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. obtained from the + * Kafka Connect schema via {@code field.schema().defaultValue().toString()} — rather than a raw + * PG SQL expression. This avoids having to strip PG-specific type casts ({@code ::text}, etc.). + * + * <ul> + * <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code TRUE}, {@code FALSE}) are + * returned as-is. + * <li>Numeric literals are returned as-is (no quotes). + * <li>Everything else is wrapped in single quotes. + * </ul> + */ + public static String quoteDefaultValue(String defaultValue) { + if (defaultValue == null) { + return null; + } + if (defaultValue.equalsIgnoreCase("current_timestamp") + || defaultValue.equalsIgnoreCase("null") + || defaultValue.equalsIgnoreCase("true") + || defaultValue.equalsIgnoreCase("false")) { + return defaultValue; + } + try { + Double.parseDouble(defaultValue); + return defaultValue; + } catch (NumberFormatException ignored) { + // fall through + } + return "'" + defaultValue + "'"; + } + + /** Escape single quotes inside a COMMENT string. */ + public static String quoteComment(String comment) { + if (comment == null) { + return ""; + } + return comment.replaceAll("'", "\\\\'"); Review Comment: Escaping comment strings with `\\'` is not standard SQL string literal escaping and may be rejected or interpreted differently by Doris. Prefer SQL-standard escaping by doubling quotes (`'` → `''`). Also, `replaceAll` is regex-based; `replace` is safer and faster for literal replacement. ```suggestion // Use SQL-standard escaping: single quote -> two single quotes, and avoid regex-based replaceAll return comment.replace("'", "''"); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: ########## @@ -84,6 +86,9 @@ public class PostgresSourceReader extends JdbcIncrementalSourceReader { public PostgresSourceReader() { super(); + this.setSerializer( + new org.apache.doris.cdcclient.source.deserialize + .PostgresDebeziumJsonDeserializer()); } Review Comment: The class is already imported (`PostgresDebeziumJsonDeserializer`), but the instantiation uses a fully-qualified name split across lines. Prefer using the imported type directly to improve readability and reduce unnecessary verbosity. ```suggestion this.setSerializer(new PostgresDebeziumJsonDeserializer()); } ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } // Process data messages - List<String> serializedRecords = - sourceReader.deserialize(writeRecordRequest.getConfig(), element); - - if (!CollectionUtils.isEmpty(serializedRecords)) { - String database = writeRecordRequest.getTargetDb(); + DeserializeResult result = + sourceReader.deserialize(deserializeContext, element); + + if (result.getType() == DeserializeResult.Type.SCHEMA_CHANGE) { + // Flush pending data before DDL + batchStreamLoad.forceFlush(); + SchemaChangeManager.executeDdls(feAddr, targetDb, token, result.getDdls()); + hasExecuteDDL = true; Review Comment: For rename-guard cases, `result.getDdls()` is intentionally empty. Calling `executeDdls` will log `No DDL statements to execute` as WARN, which can become noisy and look like an operational issue. Consider short-circuiting here when `result.getDdls()` is null/empty, or downgrade that log in `executeDdls` to INFO/DEBUG for empty lists. ```suggestion if (!CollectionUtils.isEmpty(result.getDdls())) { SchemaChangeManager.executeDdls(feAddr, targetDb, token, result.getDdls()); hasExecuteDDL = true; } ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Static utility class for executing DDL schema changes on the Doris FE via HTTP. */ +public class SchemaChangeManager { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String COLUMN_EXISTS_MSG = "Can not add column which already exists"; + private static final String COLUMN_NOT_EXISTS_MSG = "Column does not exists"; Review Comment: Idempotency relies on substring matching of FE error messages. `COLUMN_NOT_EXISTS_MSG` uses the grammatically incorrect `does not exists`, which is unlikely to match typical server messages (`does not exist`) and would cause DROP COLUMN retries to fail instead of being swallowed. Consider matching both variants or using a more robust predicate (e.g., case-insensitive contains for `column` + `does not exist`). ```suggestion private static final String COLUMN_NOT_EXISTS_MSG = "Column does not exist"; ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java: ########## @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.DorisType; + +import org.apache.flink.util.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.debezium.relational.Column; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */ +public class SchemaChangeHelper { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeHelper.class); + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + private SchemaChangeHelper() {} + + // ─── Schema diff result ──────────────────────────────────────────────────── + + /** + * Holds the result of a full schema comparison between an after-schema and stored TableChange. + */ + public static class SchemaDiff { + /** Fields present in afterSchema but absent from stored. */ + public final List<Field> added; + + /** Column names present in stored but absent from afterSchema. */ + public final List<String> dropped; + + /** Same-named columns whose Doris type or default value differs. */ + public final Map<String, Field> modified; + + public SchemaDiff(List<Field> added, List<String> dropped, Map<String, Field> modified) { + this.added = added; + this.dropped = dropped; + this.modified = modified; + } + + public boolean isEmpty() { + return added.isEmpty() && dropped.isEmpty() && modified.isEmpty(); + } + } + + // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) ────── + + /** + * Name-only schema diff: compare field names in {@code afterSchema} against the stored {@link + * TableChanges.TableChange}, detecting added and dropped columns by name only. + * + * <p>Only support add and drop and not support modify and rename + * + * <p>When {@code stored} is null or empty, both lists are empty (no baseline to diff against). + */ + public static SchemaDiff diffSchemaByName(Schema afterSchema, TableChanges.TableChange stored) { + List<Field> added = new ArrayList<>(); + List<String> dropped = new ArrayList<>(); + + if (afterSchema == null || stored == null || stored.getTable() == null) { + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // Detect added: fields present in afterSchema but absent from stored + for (Field field : afterSchema.fields()) { + if (stored.getTable().columnWithName(field.name()) == null) { + added.add(field); + } + } + + // Detect dropped: columns present in stored but absent from afterSchema + for (Column col : stored.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + dropped.add(col.name()); + } + } + + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // ─── Quoting helpers ────────────────────────────────────────────────────── + + /** Wrap a name in backticks if not already quoted. */ + public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } + return "`" + name + "`"; + } + + /** Return a fully-qualified {@code `db`.`table`} identifier string. */ + public static String quoteTableIdentifier(String db, String table) { + return identifier(db) + "." + identifier(table); + } + + /** + * Format a default value (already a plain Java string, not a raw SQL expression) into a form + * suitable for a Doris {@code DEFAULT} clause. + * + * <p>The caller is expected to pass a <em>deserialized</em> value — e.g. obtained from the + * Kafka Connect schema via {@code field.schema().defaultValue().toString()} — rather than a raw + * PG SQL expression. This avoids having to strip PG-specific type casts ({@code ::text}, etc.). + * + * <ul> + * <li>SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code TRUE}, {@code FALSE}) are + * returned as-is. + * <li>Numeric literals are returned as-is (no quotes). + * <li>Everything else is wrapped in single quotes. + * </ul> + */ + public static String quoteDefaultValue(String defaultValue) { + if (defaultValue == null) { + return null; + } + if (defaultValue.equalsIgnoreCase("current_timestamp") + || defaultValue.equalsIgnoreCase("null") + || defaultValue.equalsIgnoreCase("true") + || defaultValue.equalsIgnoreCase("false")) { + return defaultValue; + } + try { + Double.parseDouble(defaultValue); + return defaultValue; + } catch (NumberFormatException ignored) { + // fall through + } + return "'" + defaultValue + "'"; + } + + /** Escape single quotes inside a COMMENT string. */ + public static String quoteComment(String comment) { + if (comment == null) { + return ""; + } + return comment.replaceAll("'", "\\\\'"); + } + + // ─── DDL builders ───────────────────────────────────────────────────────── + + /** + * Build {@code ALTER TABLE ... ADD COLUMN} SQL. + * + * @param db target database + * @param table target table + * @param colName column name + * @param colType Doris column type string (including optional NOT NULL) + * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT clause + * @param comment optional COMMENT; {@code null}/empty = omit COMMENT clause + */ + public static String buildAddColumnSql( + String db, + String table, + String colName, + String colType, + String defaultValue, + String comment) { + StringBuilder sb = + new StringBuilder( + String.format( + ADD_DDL, + quoteTableIdentifier(db, table), + identifier(colName), + colType)); + if (defaultValue != null) { + sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue)); + } + appendComment(sb, comment); + return sb.toString(); + } + + /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */ + public static String buildDropColumnSql(String db, String table, String colName) { + return String.format(DROP_DDL, quoteTableIdentifier(db, table), identifier(colName)); + } + + // ─── Type mapping ───────────────────────────────────────────────────────── + + /** Convert a Debezium Column to a Doris column type string (via PG type name). */ + public static String columnToDorisType(Column column) { + return pgTypeNameToDorisType(column.typeName(), column.length(), column.scale().orElse(-1)); + } + + /** Map a PostgreSQL native type name to a Doris type string. */ + static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) { + Preconditions.checkNotNull(pgTypeName); + switch (pgTypeName.toLowerCase()) { + case "bool": + return DorisType.BOOLEAN; + case "bit": + return length == 1 ? DorisType.BOOLEAN : DorisType.STRING; + case "int2": + case "smallserial": + return DorisType.SMALLINT; + case "int4": + case "serial": + return DorisType.INT; + case "int8": + case "bigserial": + return DorisType.BIGINT; + case "float4": + return DorisType.FLOAT; + case "float8": + return DorisType.DOUBLE; + case "numeric": + { + int p = length > 0 ? Math.min(length, 38) : 38; + int s = scale >= 0 ? scale : 9; + return String.format("%s(%d, %d)", DorisType.DECIMAL, p, s); + } + case "bpchar": + { + int len = length * 3; + if (len > 255) { + return String.format("%s(%s)", DorisType.VARCHAR, len); + } else { + return String.format("%s(%s)", DorisType.CHAR, len); + } + } Review Comment: If `length` is 0 or negative (which can happen when upstream metadata is missing/unknown), this produces `CHAR(0)`/`CHAR(-3)` which is invalid DDL. Guard for `length <= 0` and fall back to a safe default (e.g., `STRING` or a capped VARCHAR) before multiplying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
