This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a1a72ec2ac [fix](streaming-job) cdc client MySQL TIME range fix with 
type-consistency and GTID guards (#64741)
0a1a72ec2ac is described below

commit 0a1a72ec2acc8e8e70f0425bad4fc0a9ed8637c5
Author: wudi <[email protected]>
AuthorDate: Thu Jun 25 19:14:36 2026 +0800

    [fix](streaming-job) cdc client MySQL TIME range fix with type-consistency 
and GTID guards (#64741)
    
    ## Proposed changes
    
    - `DebeziumJsonDeserializer.convertToTime`: negative and `>=24h` MySQL
    TIME values now format as `±HH:MM:SS[.ffffff]` instead of falling back
    to the raw long literal. Only the MySQL TIME path changes; in-range
    values and PostgreSQL `time` are unaffected.
    - Adds snapshot-vs-binlog type-consistency ITCases (MySQL + PostgreSQL):
    assert the snapshot (JDBC) and binlog (decoding) paths deserialize every
    column identically; JSON is compared by parsed value to tolerate
    whitespace/key-order.
    - Adds a GTID multi-interval guard unit test: pins that a GTID set with
    multiple disjoint intervals per server uuid survives parsing /
    serialization / offset-map round-trip without the gap being merged away
---
 .../deserialize/DebeziumJsonDeserializer.java      |  50 ++++-
 .../source/reader/mysql/MySqlSourceReader.java     |  25 +--
 .../itcase/MySqlStartupGtidOffsetITCase.java       | 165 ++++++++++++++++
 .../cdcclient/itcase/MySqlTimeRangeITCase.java     | 149 ++++++++++++++
 .../itcase/MySqlTypeConsistencyITCase.java         | 214 +++++++++++++++++++++
 .../cdcclient/itcase/PostgresTimeRangeITCase.java  | 126 ++++++++++++
 .../itcase/PostgresTypeConsistencyITCase.java      | 206 ++++++++++++++++++++
 .../deserialize/DebeziumJsonDeserializerTest.java  | 104 ++++++++++
 .../reader/mysql/GtidMultiIntervalOffsetTest.java  |  95 +++++++++
 .../source/reader/mysql/MySqlSourceReaderTest.java |  55 ++++++
 10 files changed, 1173 insertions(+), 16 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 8487975f7d1..36edf1c5b2d 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -34,15 +34,18 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.DateTimeException;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -412,19 +415,56 @@ public class DebeziumJsonDeserializer
         return dbzObj.toString();
     }
 
+    // Format a since-midnight time value given as total microseconds (may be 
negative or exceed
+    // 24h) as MySQL TIME literal text: ±HH:MM:SS[.ffffff], with trailing 
fractional zeros removed.
+    private static String formatTimeText(long microsTotal) {
+        String sign = microsTotal < 0 ? "-" : "";
+        Duration d = Duration.of(Math.abs(microsTotal), ChronoUnit.MICROS);
+        StringBuilder sb = new StringBuilder(sign);
+        // toHours may exceed two digits (e.g. 838); minute/second parts are 
always < 60.
+        sb.append(
+                String.format(
+                        Locale.ROOT,
+                        "%02d:%02d:%02d",
+                        d.toHours(),
+                        d.toMinutesPart(),
+                        d.toSecondsPart()));
+        long micros = d.toNanosPart() / 1000L;
+        if (micros > 0) {
+            // six-digit zero-padded micros, then drop trailing zeros (e.g. 
500000 -> "5").
+            sb.append('.')
+                    .append(StringUtils.stripEnd(String.format(Locale.ROOT, 
"%06d", micros), "0"));
+        }
+        return sb.toString();
+    }
+
     protected Object convertToTime(Object dbzObj, Schema schema) {
         try {
             if (dbzObj instanceof Long) {
+                long v = (Long) dbzObj;
                 switch (schema.name()) {
                     case MicroTime.SCHEMA_NAME:
-                        // micro to nano
-                        return LocalTime.ofNanoOfDay((Long) dbzObj * 
1000L).toString();
+                        // MySQL TIME spans [-838:59:59, 838:59:59]; 
out-of-range (negative or
+                        // >=24h) cannot use LocalTime, format as ±HH:MM:SS 
instead. micro to nano.
+                        if (v >= 0 && v < 86_400_000_000L) {
+                            return LocalTime.ofNanoOfDay(v * 1000L).toString();
+                        }
+                        return formatTimeText(v);
                     case NanoTime.SCHEMA_NAME:
-                        return LocalTime.ofNanoOfDay((Long) dbzObj).toString();
+                        if (v >= 0 && v < 86_400_000_000_000L) {
+                            return LocalTime.ofNanoOfDay(v).toString();
+                        }
+                        // out-of-range: nano to micro. MySQL/PG TIME carries 
at most microsecond
+                        // precision, so dropping the sub-micro nanos here is 
lossless for them.
+                        return formatTimeText(v / 1000L);
                 }
             } else if (dbzObj instanceof Integer) {
-                // millis to nano
-                return LocalTime.ofNanoOfDay((Integer) dbzObj * 
1_000_000L).toString();
+                // millis to nano; out-of-range formats as ±HH:MM:SS.
+                int v = (Integer) dbzObj;
+                if (v >= 0 && v < 86_400_000) {
+                    return LocalTime.ofNanoOfDay((long) v * 
1_000_000L).toString();
+                }
+                return formatTimeText((long) v * 1000L);
             } else if (dbzObj instanceof java.util.Date) {
                 long millisOfDay = ((Date) dbzObj).getTime() % (24 * 60 * 60 * 
1000);
                 // mills to nano
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index efea979a62c..6891f06ba42 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -957,20 +957,23 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
             if (MapUtils.isEmpty(offsetMap)) {
                 throw new RuntimeException("Incorrect offset " + startupMode);
             }
-            if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
-                    && 
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
-                BinlogOffset binlogOffset =
-                        BinlogOffset.builder()
-                                .setBinlogFilePosition(
-                                        
offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY),
-                                        Long.parseLong(
-                                                offsetMap.get(
-                                                        
BinlogOffset.BINLOG_POSITION_OFFSET_KEY)))
-                                .build();
-                
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
+            boolean hasFilePosition =
+                    
offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
+                            && 
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY);
+            boolean hasGtids = 
offsetMap.containsKey(BinlogOffset.GTID_SET_KEY);
+            BinlogOffset binlogOffset;
+            if (hasFilePosition) {
+                // Keep the full map so gtids and other fields survive; 
supplement kind.
+                offsetMap.putIfAbsent(
+                        BinlogOffset.OFFSET_KIND_KEY, 
BinlogOffsetKind.SPECIFIC.name());
+                binlogOffset = new BinlogOffset(offsetMap);
+            } else if (hasGtids) {
+                // ofGtidSet seeds placeholder file/pos and kind, like Flink 
CDC.
+                binlogOffset = 
BinlogOffset.ofGtidSet(offsetMap.get(BinlogOffset.GTID_SET_KEY));
             } else {
                 throw new RuntimeException("Incorrect offset " + startupMode);
             }
+            
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
         } else if (is13Timestamp(startupMode)) {
             // start from timestamp
             Long ts = Long.parseLong(startupMode);
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
new file mode 100644
index 00000000000..cdadfb2561f
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlStartupGtidOffsetITCase.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * GTID counterpart of {@link MySqlStartupSpecificOffsetITCase}: given a 
recorded GTID set as the
+ * specific-offset (no binlog file/position), the job must replay from exactly 
that point forward —
+ * proving the GTID-only resume offset is honored end-to-end, not just 
preserved in config.
+ */
+@Testcontainers
+class MySqlStartupGtidOffsetITCase {
+
+    private static final String ROOT_USER = "root";
+    private static final String ROOT_PASSWORD = "123456";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(980_000);
+
+    @Container
+    static final MySQLContainer<?> MYSQL =
+            new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                    .withCommand("--gtid-mode=ON", 
"--enforce-gtid-consistency=ON")
+                    .withDatabaseName("cdc_test")
+                    .withUsername("cdc")
+                    .withPassword("123456")
+                    .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+    private String jobId;
+    private String database;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        database = "gtidoff_db_" + jobId;
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("CREATE DATABASE " + database);
+            st.execute("USE " + database);
+            st.execute("CREATE TABLE t_user (id INT PRIMARY KEY, name 
VARCHAR(50))");
+            st.execute("INSERT INTO t_user VALUES (1,'alice'), (2,'bob')");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("DROP DATABASE IF EXISTS " + database);
+        }
+    }
+
+    @Test
+    void specificGtidOffsetReplaysFromRecordedPositionForward() throws 
Exception {
+        // Record the GTID set right after the pre-existing rows; resume must 
start past it.
+        String offset = String.format("{\"gtids\":\"%s\"}", 
currentGtidExecuted());
+
+        // This change happens after the recorded GTID but before the job 
starts — it must be read.
+        insert("INSERT INTO t_user VALUES (3,'carol')");
+
+        try (MockDorisServer mock = new MockDorisServer();
+                CdcClientWriteHarness harness =
+                        CdcClientWriteHarness.mysql(
+                                jobId,
+                                MYSQL.getHost(),
+                                MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+                                ROOT_USER,
+                                ROOT_PASSWORD,
+                                database,
+                                "t_user",
+                                offset,
+                                "doris_target_db",
+                                mock)) {
+
+            // Resume positions by GTID, so row 3 (already in the binlog) is 
read straight away.
+            List<Integer> first = ids(harness.readBinlogFromStartupMode(1, 
Duration.ofSeconds(90)));
+            assertThat(first).containsExactly(3);
+
+            insert("INSERT INTO t_user VALUES (4,'dave')");
+            List<Integer> second = ids(harness.continueBinlog(1, 
Duration.ofSeconds(90)));
+            assertThat(second).containsExactly(4);
+
+            // Replays from the recorded GTID forward: rows 3 and 4 only, 
never 1 or 2.
+            List<Integer> all = ids(harness.loadedRecords());
+            assertThat(all).doesNotContain(1, 2);
+            assertThat(all).containsExactlyInAnyOrder(3, 4);
+        }
+    }
+
+    private String currentGtidExecuted() throws Exception {
+        try (Connection conn = rootConnection(database);
+                Statement st = conn.createStatement();
+                ResultSet rs = st.executeQuery("SELECT @@gtid_executed")) {
+            if (!rs.next()) {
+                throw new IllegalStateException("@@gtid_executed returned no 
row");
+            }
+            // Single server uuid fits on one line; strip newlines defensively.
+            return rs.getString(1).replace("\n", "").trim();
+        }
+    }
+
+    private List<Integer> ids(List<String> records) throws Exception {
+        List<Integer> result = new ArrayList<>();
+        for (String record : records) {
+            JsonNode node = MAPPER.readTree(record);
+            result.add(node.get("id").asInt());
+        }
+        return result;
+    }
+
+    private void insert(String sql) throws Exception {
+        try (Connection conn = rootConnection(database);
+                Statement st = conn.createStatement()) {
+            st.execute(sql);
+        }
+    }
+
+    private Connection rootConnection(String db) throws Exception {
+        String url =
+                "jdbc:mysql://"
+                        + MYSQL.getHost()
+                        + ":"
+                        + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+                        + "/"
+                        + db;
+        return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
new file mode 100644
index 00000000000..f0d960ca410
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTimeRangeITCase.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * End-to-end coverage of MySQL TIME full range [-838:59:59, 838:59:59] across 
the snapshot and
+ * binlog phases. Out-of-range values (negative or >=24h) must be emitted as 
±HH:MM:SS[.ffffff]
+ * text rather than the raw long literal they fell back to before the 
convertToTime fix.
+ */
+@Testcontainers
+class MySqlTimeRangeITCase {
+
+    private static final String ROOT_USER = "root";
+    private static final String ROOT_PASSWORD = "123456";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(810_000);
+
+    @Container
+    static final MySQLContainer<?> MYSQL =
+            new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                    .withDatabaseName("cdc_test")
+                    .withUsername("cdc")
+                    .withPassword("123456")
+                    .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+    private String jobId;
+    private String database;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        database = "time_db_" + jobId;
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("CREATE DATABASE " + database);
+            st.execute("USE " + database);
+            st.execute(
+                    "CREATE TABLE t_time (id INT NOT NULL, t_col TIME(6), 
PRIMARY KEY (id))");
+            // snapshot rows: in-range, negative lower bound, positive upper 
bound.
+            // MySQL TIME bounds are exactly +/-838:59:59 (no fractional part 
allowed at the bound).
+            st.execute(
+                    "INSERT INTO t_time VALUES "
+                            + "(1,'12:34:56.123456'), (2,'-838:59:59'), 
(3,'838:59:59')");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("DROP DATABASE IF EXISTS " + database);
+        }
+    }
+
+    @Test
+    void timeFullRangeInBothPhases() throws Exception {
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.mysql(
+                        jobId,
+                        MYSQL.getHost(),
+                        MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+                        ROOT_USER,
+                        ROOT_PASSWORD,
+                        database,
+                        "t_time",
+                        "initial")) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("t_time");
+            SnapshotResult snapshot = harness.readSnapshot(splits);
+            Map<Integer, JsonNode> snap = indexById(snapshot.records());
+
+            // snapshot phase: in-range keeps LocalTime text, out-of-range 
formats as ±HH:MM:SS
+            
assertThat(snap.get(1).get("t_col").asText()).isEqualTo("12:34:56.123456");
+            
assertThat(snap.get(2).get("t_col").asText()).isEqualTo("-838:59:59");
+            
assertThat(snap.get(3).get("t_col").asText()).isEqualTo("838:59:59");
+
+            // binlog phase: negative >24h, and midnight (in-range) stay 
correct
+            try (Connection conn = rootConnection(database);
+                    Statement st = conn.createStatement()) {
+                st.execute("INSERT INTO t_time VALUES 
(101,'-100:00:00.500000'), (102,'00:00:00')");
+            }
+            List<String> binlog = harness.readBinlogUntil(snapshot, splits, 2, 
Duration.ofSeconds(60));
+            Map<Integer, JsonNode> bin = indexById(binlog);
+            
assertThat(bin.get(101).get("t_col").asText()).isEqualTo("-100:00:00.5");
+            assertThat(bin.get(102).get("t_col").asText()).isEqualTo("00:00");
+        }
+    }
+
+    private Map<Integer, JsonNode> indexById(List<String> records) throws 
Exception {
+        Map<Integer, JsonNode> result = new HashMap<>();
+        for (String record : records) {
+            JsonNode node = MAPPER.readTree(record);
+            result.put(node.get("id").asInt(), node);
+        }
+        return result;
+    }
+
+    private Connection rootConnection(String db) throws Exception {
+        String url =
+                "jdbc:mysql://"
+                        + MYSQL.getHost()
+                        + ":"
+                        + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+                        + "/"
+                        + db;
+        return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
new file mode 100644
index 00000000000..3279d5f5162
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/MySqlTypeConsistencyITCase.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.itcase.CdcClientReadHarness.SnapshotResult;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Guards that the snapshot phase (JDBC) and the binlog phase (Debezium 
decoding) — two different
+ * converter paths — deserialize the same MySQL value identically. The 
identical value is inserted
+ * once before capture (snapshot, id 1) and once after (binlog, id 2); every 
column must match
+ * across phases. Any difference is a real bug (the MySQL TIME out-of-range 
issue was one instance).
+ *
+ * <p>JSON is compared by parsed value, not text: MySQL returns snapshot JSON 
with key reordering and
+ * spaces ({@code {"a": [2, 3], "k": 1}}) while the binlog path emits compact 
JSON
+ * ({@code {"a":[2,3],"k":1}}) — same value, different text, so a semantic 
comparison is used.
+ */
+@Testcontainers
+class MySqlTypeConsistencyITCase {
+
+    private static final String ROOT_USER = "root";
+    private static final String ROOT_PASSWORD = "123456";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(821_000);
+
+    // Identical column tuple used for both the snapshot row (id 1) and the 
binlog row (id 2).
+    private static final String VALUES =
+            "200,"                              // c_tinyint_u TINYINT UNSIGNED
+                    + "4000000000,"             // c_int_u INT UNSIGNED
+                    + "18446744073709551615,"   // c_bigint_u BIGINT UNSIGNED
+                    + "12345678901234.567890,"  // c_decimal DECIMAL(20,6)
+                    + "1.5,"                    // c_float FLOAT
+                    + "3.141592653589793,"      // c_double DOUBLE
+                    + "1,"                      // c_bool TINYINT(1)
+                    + "'b',"                    // c_enum
+                    + "'x,z',"                  // c_set
+                    + "b'10100101',"            // c_bit BIT(8)
+                    + "'{\"k\": 1, \"a\": [2, 3]}'," // c_json
+                    + "'2023-08-15 12:34:56.123456',"  // c_datetime 
DATETIME(6)
+                    + "'2023-08-15 12:34:56.123456',"  // c_timestamp 
TIMESTAMP(6)
+                    + "'2023-08-15',"           // c_date DATE
+                    + "'12:34:56.123456',"      // c_time TIME(6)
+                    + "2023,"                   // c_year YEAR
+                    + "0x0102030405,"           // c_varbinary VARBINARY(8)
+                    + "'abcde'";                // c_char CHAR(5)
+
+    @Container
+    static final MySQLContainer<?> MYSQL =
+            new MySQLContainer<>(DockerImageName.parse("mysql:8.0"))
+                    
.withConfigurationOverride("docker/server-allow-ancient-date-time")
+                    .withDatabaseName("cdc_test")
+                    .withUsername("cdc")
+                    .withPassword("123456")
+                    .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+
+    private String jobId;
+    private String database;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        database = "typescan_db_" + jobId;
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("CREATE DATABASE " + database);
+            st.execute("USE " + database);
+            st.execute(
+                    "CREATE TABLE t_scan ("
+                            + "id INT NOT NULL,"
+                            + "c_tinyint_u TINYINT UNSIGNED,"
+                            + "c_int_u INT UNSIGNED,"
+                            + "c_bigint_u BIGINT UNSIGNED,"
+                            + "c_decimal DECIMAL(20,6),"
+                            + "c_float FLOAT,"
+                            + "c_double DOUBLE,"
+                            + "c_bool TINYINT(1),"
+                            + "c_enum ENUM('a','b','c'),"
+                            + "c_set SET('x','y','z'),"
+                            + "c_bit BIT(8),"
+                            + "c_json JSON,"
+                            + "c_datetime DATETIME(6),"
+                            + "c_timestamp TIMESTAMP(6) NULL,"
+                            + "c_date DATE,"
+                            + "c_time TIME(6),"
+                            + "c_year YEAR,"
+                            + "c_varbinary VARBINARY(8),"
+                            + "c_char CHAR(5),"
+                            + "PRIMARY KEY (id))");
+            st.execute("INSERT INTO t_scan VALUES (1," + VALUES + ")");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = rootConnection("");
+                Statement st = conn.createStatement()) {
+            st.execute("DROP DATABASE IF EXISTS " + database);
+        }
+    }
+
+    @Test
+    void snapshotAndBinlogDeserializeIdentically() throws Exception {
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.mysql(
+                        jobId,
+                        MYSQL.getHost(),
+                        MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT),
+                        ROOT_USER,
+                        ROOT_PASSWORD,
+                        database,
+                        "t_scan",
+                        "initial")) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("t_scan");
+            SnapshotResult snapshot = harness.readSnapshot(splits);
+            JsonNode snap = MAPPER.readTree(snapshot.records().get(0));
+
+            // identical value inserted after capture starts -> binlog path
+            try (Connection conn = rootConnection(database);
+                    Statement st = conn.createStatement()) {
+                st.execute("INSERT INTO t_scan VALUES (2," + VALUES + ")");
+            }
+            List<String> binlog = harness.readBinlogUntil(snapshot, splits, 1, 
Duration.ofSeconds(60));
+            JsonNode bin = MAPPER.readTree(binlog.get(0));
+
+            List<String> mismatches = new ArrayList<>();
+            Iterator<String> fields = snap.fieldNames();
+            while (fields.hasNext()) {
+                String col = fields.next();
+                if (col.equals("id") || col.startsWith("__DORIS")) {
+                    continue;
+                }
+                JsonNode snapNode = snap.get(col);
+                JsonNode binNode = bin.get(col);
+                if (!columnsMatch(col, snapNode, binNode)) {
+                    mismatches.add(col + ": snapshot=[" + snapNode + "] 
binlog=[" + binNode + "]");
+                }
+            }
+            mismatches.forEach(m -> System.out.println("[TYPE SCAN][MISMATCH] 
" + m));
+            assertThat(mismatches).as("snapshot vs binlog per-column 
mismatches").isEmpty();
+        }
+    }
+
+    // Compare the parsed nodes directly so container columns (objects/arrays) 
compare by content
+    // rather than collapsing to "" via JsonNode.asText(). The 
whitespace/key-order tolerance is
+    // limited to the JSON column: a JSON value carried as a string can differ 
in spacing/key order
+    // between the snapshot (JDBC) and binlog paths, so it is compared by 
parsed value; every other
+    // column must match exactly so a real representation difference is never 
masked.
+    private boolean columnsMatch(String col, JsonNode a, JsonNode b) {
+        if (a == null || b == null) {
+            return false;
+        }
+        if (a.equals(b)) {
+            return true;
+        }
+        if (col.equals("c_json")) {
+            try {
+                return 
MAPPER.readTree(a.asText()).equals(MAPPER.readTree(b.asText()));
+            } catch (Exception e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    private Connection rootConnection(String db) throws Exception {
+        String url =
+                "jdbc:mysql://"
+                        + MYSQL.getHost()
+                        + ":"
+                        + MYSQL.getMappedPort(MySQLContainer.MYSQL_PORT)
+                        + "/"
+                        + db;
+        return DriverManager.getConnection(url, ROOT_USER, ROOT_PASSWORD);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
new file mode 100644
index 00000000000..2b97cfdda4a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTimeRangeITCase.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Guards that the MySQL TIME out-of-range fix 
(DebeziumJsonDeserializer.convertToTime) does not
+ * regress PostgreSQL time handling. PG time values share the same MicroTime 
path: in-range values
+ * must stay byte-for-byte on the unchanged LocalTime branch, and the PG-legal 
boundary '24:00:00'
+ * is now formatted as text instead of falling back to the raw long.
+ */
+@Testcontainers
+class PostgresTimeRangeITCase {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(585_000);
+
+    @Container
+    static final PostgreSQLContainer<?> POSTGRES =
+            new PostgreSQLContainer<>(DockerImageName.parse("postgres:14"))
+                    .withCommand("postgres", "-c", "wal_level=logical");
+
+    private String jobId;
+    private String table;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        // per-run table name so concurrent forks / parallel runs cannot 
collide on a shared table.
+        table = "t_time_" + jobId;
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS " + table);
+            st.execute("CREATE TABLE " + table + " (id INT PRIMARY KEY, t_col 
TIME(6))");
+            st.execute("ALTER TABLE " + table + " REPLICA IDENTITY FULL");
+            // id 1: ordinary in-range value -- the fix must leave it 
byte-for-byte unchanged.
+            // id 2: PG-legal upper boundary 24:00:00 -- a raw-long fallback 
before the fix.
+            st.execute("INSERT INTO " + table + " VALUES 
(1,'12:34:56.123456'), (2,'24:00:00')");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS " + table);
+        }
+    }
+
+    @Test
+    void pgTimeUnaffectedByMysqlTimeFix() throws Exception {
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.postgres(
+                        jobId,
+                        POSTGRES.getHost(),
+                        
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+                        POSTGRES.getUsername(),
+                        POSTGRES.getPassword(),
+                        POSTGRES.getDatabaseName(),
+                        "public",
+                        table,
+                        "initial")) {
+
+            List<SnapshotSplit> splits = harness.fetchAllSnapshotSplits(table);
+            CdcClientReadHarness.SnapshotResult snapshot = 
harness.readSnapshot(splits);
+            Map<Integer, JsonNode> snap = indexById(snapshot.records());
+
+            // in-range ordinary value is unchanged (the PG common case)
+            
assertThat(snap.get(1).get("t_col").asText()).isEqualTo("12:34:56.123456");
+            // PG-legal 24:00:00 is now formatted instead of emitted as a raw 
long
+            
assertThat(snap.get(2).get("t_col").asText()).isEqualTo("24:00:00");
+        }
+    }
+
+    private Map<Integer, JsonNode> indexById(List<String> records) throws 
Exception {
+        Map<Integer, JsonNode> result = new HashMap<>();
+        for (String record : records) {
+            JsonNode node = MAPPER.readTree(record);
+            result.put(node.get("id").asInt(), node);
+        }
+        return result;
+    }
+
+    private Connection connect() throws Exception {
+        return DriverManager.getConnection(
+                POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), 
POSTGRES.getPassword());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
new file mode 100644
index 00000000000..0f44ffe1c9b
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresTypeConsistencyITCase.java
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.itcase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PostgreSQL counterpart of {@link MySqlTypeConsistencyITCase}: the same 
value inserted before
+ * capture (snapshot, JDBC path) and after (binlog, logical-decoding path) 
must deserialize
+ * identically per column. JSON/JSONB are compared by parsed value to tolerate 
whitespace/key-order
+ * differences while still catching a genuinely different value.
+ */
+@Testcontainers
+class PostgresTypeConsistencyITCase {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final AtomicLong JOB_ID_SEQ = new AtomicLong(595_000);
+
+    // Identical column tuple for both the snapshot row (id 1) and the binlog 
row (id 2).
+    private static final String VALUES =
+            "12345678901234.567890,"            // c_numeric NUMERIC(20,6)
+                    + "1.5,"                    // c_real REAL
+                    + "3.141592653589793,"      // c_double DOUBLE PRECISION
+                    + "true,"                   // c_bool BOOLEAN
+                    + "'hello',"                // c_text TEXT
+                    + "'world',"                // c_varchar VARCHAR(20)
+                    + "'abcde',"                // c_char CHAR(5)
+                    + "'{\"k\": 1, \"a\": [2, 3]}',"  // c_json JSON
+                    + "'{\"k\": 1, \"a\": [2, 3]}',"  // c_jsonb JSONB
+                    + "B'10100101',"            // c_bit BIT(8)
+                    + "B'101',"                 // c_varbit VARBIT(16)
+                    + "'\\x0102030405',"        // c_bytea BYTEA
+                    + "'11111111-1111-1111-1111-111111111111',"  // c_uuid UUID
+                    + "'2023-08-15',"           // c_date DATE
+                    + "'12:34:56.123456',"      // c_time TIME(6)
+                    + "'12:34:56.123456+08',"   // c_timetz TIMETZ
+                    + "'2023-08-15 12:34:56.123456',"     // c_timestamp 
TIMESTAMP(6)
+                    + "'2023-08-15 12:34:56.123456+08',"  // c_timestamptz 
TIMESTAMPTZ
+                    + "'1 day 02:03:04',"       // c_interval INTERVAL
+                    + "'192.168.0.1',"          // c_inet INET
+                    + "'{1,2,3}',"              // c_int_arr INT[]
+                    + "'{a,b}',"                // c_text_arr TEXT[]
+                    + "'12.34'";                // c_money MONEY
+
+    @Container
+    static final PostgreSQLContainer<?> POSTGRES =
+            new PostgreSQLContainer<>(DockerImageName.parse("postgres:14"))
+                    .withCommand("postgres", "-c", "wal_level=logical");
+
+    private String jobId;
+    private String table;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        jobId = String.valueOf(JOB_ID_SEQ.incrementAndGet());
+        table = "t_scan_" + jobId;
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS " + table);
+            st.execute(
+                    "CREATE TABLE " + table + " ("
+                            + "id INT PRIMARY KEY,"
+                            + "c_numeric NUMERIC(20,6),"
+                            + "c_real REAL,"
+                            + "c_double DOUBLE PRECISION,"
+                            + "c_bool BOOLEAN,"
+                            + "c_text TEXT,"
+                            + "c_varchar VARCHAR(20),"
+                            + "c_char CHAR(5),"
+                            + "c_json JSON,"
+                            + "c_jsonb JSONB,"
+                            + "c_bit BIT(8),"
+                            + "c_varbit VARBIT(16),"
+                            + "c_bytea BYTEA,"
+                            + "c_uuid UUID,"
+                            + "c_date DATE,"
+                            + "c_time TIME(6),"
+                            + "c_timetz TIMETZ,"
+                            + "c_timestamp TIMESTAMP(6),"
+                            + "c_timestamptz TIMESTAMPTZ,"
+                            + "c_interval INTERVAL,"
+                            + "c_inet INET,"
+                            + "c_int_arr INT[],"
+                            + "c_text_arr TEXT[],"
+                            + "c_money MONEY)");
+            st.execute("ALTER TABLE " + table + " REPLICA IDENTITY FULL");
+            st.execute("INSERT INTO " + table + " VALUES (1," + VALUES + ")");
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        Env.getCurrentEnv().close(jobId);
+        try (Connection conn = connect();
+                Statement st = conn.createStatement()) {
+            st.execute("DROP TABLE IF EXISTS " + table);
+        }
+    }
+
+    @Test
+    void snapshotAndBinlogDeserializeIdentically() throws Exception {
+        try (CdcClientReadHarness harness =
+                CdcClientReadHarness.postgres(
+                        jobId,
+                        POSTGRES.getHost(),
+                        
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+                        POSTGRES.getUsername(),
+                        POSTGRES.getPassword(),
+                        POSTGRES.getDatabaseName(),
+                        "public",
+                        table,
+                        "initial")) {
+
+            List<SnapshotSplit> splits = harness.fetchAllSnapshotSplits(table);
+            CdcClientReadHarness.SnapshotResult snapshot = 
harness.readSnapshot(splits);
+            JsonNode snap = MAPPER.readTree(snapshot.records().get(0));
+
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("INSERT INTO " + table + " VALUES (2," + VALUES + 
")");
+            }
+            List<String> binlog = harness.readBinlogUntil(snapshot, splits, 1, 
Duration.ofSeconds(60));
+            JsonNode bin = MAPPER.readTree(binlog.get(0));
+
+            List<String> mismatches = new ArrayList<>();
+            Iterator<String> fields = snap.fieldNames();
+            while (fields.hasNext()) {
+                String col = fields.next();
+                if (col.equals("id") || col.startsWith("__DORIS")) {
+                    continue;
+                }
+                JsonNode snapNode = snap.get(col);
+                JsonNode binNode = bin.get(col);
+                if (!columnsMatch(col, snapNode, binNode)) {
+                    mismatches.add(col + ": snapshot=[" + snapNode + "] 
binlog=[" + binNode + "]");
+                }
+            }
+            mismatches.forEach(m -> System.out.println("[PG TYPE 
SCAN][MISMATCH] " + m));
+            assertThat(mismatches).as("snapshot vs binlog per-column 
mismatches").isEmpty();
+        }
+    }
+
+    // Compare the parsed nodes directly so container columns (arrays such as 
c_int_arr/c_text_arr)
+    // compare by content rather than collapsing to "" via JsonNode.asText(). 
The whitespace/key-order
+    // tolerance is limited to the JSON/JSONB columns: a JSON value carried as 
a string can differ in
+    // spacing/key order between the snapshot (JDBC) and binlog paths, so 
those are compared by parsed
+    // value; every other column must match exactly so a real representation 
difference is never masked.
+    private boolean columnsMatch(String col, JsonNode a, JsonNode b) {
+        if (a == null || b == null) {
+            return false;
+        }
+        if (a.equals(b)) {
+            return true;
+        }
+        if (col.equals("c_json") || col.equals("c_jsonb")) {
+            try {
+                return 
MAPPER.readTree(a.asText()).equals(MAPPER.readTree(b.asText()));
+            } catch (Exception e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    private Connection connect() throws Exception {
+        return DriverManager.getConnection(
+                POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), 
POSTGRES.getPassword());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
index 75b2ffbfe9e..404ff7f1d56 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -24,8 +24,12 @@ import java.time.ZoneId;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
 import io.debezium.time.NanoTimestamp;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
 
 /** Unit tests for {@link DebeziumJsonDeserializer}. */
 class DebeziumJsonDeserializerTest {
@@ -75,6 +79,106 @@ class DebeziumJsonDeserializerTest {
         }
     }
 
+    // ─── convertToTime (MySQL TIME full range) 
────────────────────────────────
+    // MySQL TIME spans [-838:59:59, 838:59:59] (Debezium MicroTime/NanoTime, 
long micros/nanos).
+    // In-range values keep the LocalTime format; out-of-range (negative or 
>=24h) must format
+    // as ±HH:MM:SS[.ffffff] instead of falling back to the raw long literal.
+
+    @Test
+    void microTime_zero_isMidnight() {
+        assertEquals("00:00", invokeConvertToTime(MicroTime.SCHEMA_NAME, 0L));
+    }
+
+    @Test
+    void microTime_inRange_withMicros() {
+        assertEquals("12:34:56.123456", 
invokeConvertToTime(MicroTime.SCHEMA_NAME, 45_296_123_456L));
+    }
+
+    @Test
+    void microTime_inRange_upperBound() {
+        assertEquals("23:59:59.999999", 
invokeConvertToTime(MicroTime.SCHEMA_NAME, 86_399_999_999L));
+    }
+
+    @Test
+    void microTime_negative_mysqlLowerBound() {
+        // MySQL '-838:59:59' = -3_020_399_000_000 micros; must not fall back 
to the raw long.
+        assertEquals("-838:59:59", invokeConvertToTime(MicroTime.SCHEMA_NAME, 
-3_020_399_000_000L));
+    }
+
+    @Test
+    void microTime_over24h_mysqlUpperBound() {
+        // MySQL '838:59:59.999999' = 3_020_399_999_999 micros.
+        assertEquals(
+                "838:59:59.999999", invokeConvertToTime(MicroTime.SCHEMA_NAME, 
3_020_399_999_999L));
+    }
+
+    @Test
+    void nanoTime_negative_mysqlLowerBound() {
+        assertEquals(
+                "-838:59:59", invokeConvertToTime(NanoTime.SCHEMA_NAME, 
-3_020_399_000_000_000L));
+    }
+
+    private Object invokeConvertToTime(String schemaName, Object dbzObj) {
+        try {
+            Schema schema = 
SchemaBuilder.int64().name(schemaName).optional().build();
+            Method m =
+                    DebeziumJsonDeserializer.class.getDeclaredMethod(
+                            "convertToTime", Object.class, Schema.class);
+            m.setAccessible(true);
+            return m.invoke(deserializer, dbzObj, schema);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // ─── formatTimeText (fraction padding / trailing-zero stripping) 
───────────
+    // convertToTime above only routes through formatTimeText with 
whole-second or
+    // full-6-digit fractions, so the sub-second padding and trailing-zero 
stripping
+    // branches are exercised directly here.
+
+    @Test
+    void formatTimeText_trailingZerosStrippedToSingleDigit() {
+        assertEquals("00:00:00.5", invokeFormatTimeText(500_000L));
+    }
+
+    @Test
+    void formatTimeText_trailingZerosStrippedToTwoDigits() {
+        assertEquals("00:00:00.12", invokeFormatTimeText(120_000L));
+    }
+
+    @Test
+    void formatTimeText_millisecondFraction() {
+        assertEquals("00:00:00.123", invokeFormatTimeText(123_000L));
+    }
+
+    @Test
+    void formatTimeText_subMicroFractionLeftPadded() {
+        // 5 micros -> ".000005": padded to six digits, no trailing zero to 
strip.
+        assertEquals("00:00:00.000005", invokeFormatTimeText(5L));
+    }
+
+    @Test
+    void formatTimeText_negativeWholeSecondPadsHourAndMinute() {
+        // -30 minutes: hour/minute keep two digits, no fractional part.
+        assertEquals("-00:30:00", invokeFormatTimeText(-1_800_000_000L));
+    }
+
+    @Test
+    void formatTimeText_negativeKeepsSignBeforeFraction() {
+        assertEquals("-00:00:00.5", invokeFormatTimeText(-500_000L));
+    }
+
+    private String invokeFormatTimeText(long microsTotal) {
+        try {
+            Method m =
+                    
DebeziumJsonDeserializer.class.getDeclaredMethod("formatTimeText", long.class);
+            m.setAccessible(true);
+            return (String) m.invoke(null, microsTotal);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     // ─── convertZoneTime 
──────────────────────────────────────────────────────
     // timetz arrives as a UTC-normalized ISO string (Debezium ZonedTime). cdc 
keeps it
     // verbatim with the offset preserved, independent of serverTimeZone, 
since a
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
new file mode 100644
index 00000000000..2646b981124
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/GtidMultiIntervalOffsetTest.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.github.shyiko.mysql.binlog.GtidSet;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Guard for MySQL GTID sets that carry more than one interval per server uuid 
(a "split
+ * interval", e.g. {@code uuid:1-3:5-7} left behind when the gap transaction 4 
was purged).
+ *
+ * <p>We delegate all GTID parsing/serialization to the binlog connector's 
{@link GtidSet} and
+ * never split the string ourselves, so the go-mysql #550 class of bug (a 
single-interval regex
+ * that silently merges split intervals and persists a corrupt resume position 
→ ERROR 1236)
+ * cannot regress unnoticed. These tests pin that the split interval survives 
both GtidSet
+ * parsing/round-trip and our FE offset-map persistence without the gap being 
swallowed.
+ */
+class GtidMultiIntervalOffsetTest {
+
+    private static final String SERVER_UUID = 
"24bc7850-2c16-11e6-a073-0242ac110002";
+    // Two disjoint intervals for the same uuid, with a purged gap at 
transaction 4.
+    private static final String MULTI_INTERVAL_GTID = SERVER_UUID + ":1-3:5-7";
+
+    @Test
+    void splitIntervalParsesAsTwoIntervalsNotMergedIntoOne() {
+        List<GtidSet.Interval> intervals =
+                new 
GtidSet(MULTI_INTERVAL_GTID).getUUIDSet(SERVER_UUID).getIntervals();
+        // The gap at txn 4 must keep this as two intervals, never collapsed 
into a single 1-7.
+        assertEquals(2, intervals.size());
+        assertEquals(1, intervals.get(0).getStart());
+        assertEquals(3, intervals.get(0).getEnd());
+        assertEquals(5, intervals.get(1).getStart());
+        assertEquals(7, intervals.get(1).getEnd());
+    }
+
+    @Test
+    void splitIntervalSurvivesGtidSetSerializationRoundTrip() {
+        // Re-serializing then re-parsing must not lose the gap (toString -> 
parse -> two intervals).
+        String reSerialized = new GtidSet(MULTI_INTERVAL_GTID).toString();
+        assertEquals(
+                2, new 
GtidSet(reSerialized).getUUIDSet(SERVER_UUID).getIntervals().size());
+    }
+
+    // BinlogOffset-layer guard only; the real generateMySqlConfig resume path 
is covered by
+    // MySqlSourceReaderTest / MySqlStartupGtidOffsetITCase.
+    @Test
+    void splitIntervalSurvivesFeOffsetMapRoundTrip() {
+        // FE persists/restores the binlog offset as a string-valued map; 
rebuild from it.
+        Map<String, String> feOffset = new HashMap<>();
+        feOffset.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, 
"mysql-bin.000004");
+        feOffset.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "1024");
+        feOffset.put(BinlogOffset.GTID_SET_KEY, MULTI_INTERVAL_GTID);
+
+        BinlogOffset restored = new BinlogOffset(new HashMap<>(feOffset));
+
+        // The gtids value is opaque to JSON and must come back byte-for-byte.
+        assertEquals(MULTI_INTERVAL_GTID, restored.getGtidSet());
+        // And it must still parse into two intervals after the round trip.
+        assertEquals(
+                2, new 
GtidSet(restored.getGtidSet()).getUUIDSet(SERVER_UUID).getIntervals().size());
+    }
+
+    @Test
+    void purgedGapTransactionStaysOutsideTheSplitInterval() {
+        GtidSet multiInterval = new GtidSet(MULTI_INTERVAL_GTID);
+        // Transactions inside the two intervals are contained; the purged gap 
(txn 4) is not.
+        assertTrue(new GtidSet(SERVER_UUID + 
":2-2").isContainedWithin(multiInterval));
+        assertTrue(new GtidSet(SERVER_UUID + 
":6-6").isContainedWithin(multiInterval));
+        assertFalse(new GtidSet(SERVER_UUID + 
":4-4").isContainedWithin(multiInterval));
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
index 1192df291d3..92be9a68c3c 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReaderTest.java
@@ -18,12 +18,24 @@
 package org.apache.doris.cdcclient.source.reader.mysql;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import com.github.shyiko.mysql.binlog.GtidSet;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import org.junit.jupiter.api.Test;
 
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
 public class MySqlSourceReaderTest {
 
+    private static final String SERVER_UUID = 
"24bc7850-2c16-11e6-a073-0242ac110002";
+
     @Test
     void testNormalizeSslModeMapsAllLegalValues() {
         assertEquals("disabled", 
MySqlSourceReader.normalizeSslModeForMysql("disable"));
@@ -59,4 +71,47 @@ public class MySqlSourceReaderTest {
                 IllegalArgumentException.class,
                 () -> MySqlSourceReader.normalizeSslModeForMysql("DISABLE"));
     }
+
+    // A JSON resume offset carrying file/pos AND a multi-interval gtid set 
must keep the gtids
+    // (and a non-null kind) through generateMySqlConfig, not just file/pos.
+    @Test
+    void jsonOffsetWithGtidsPreservesMultiIntervalGtidSet() throws Exception {
+        String gtid = SERVER_UUID + ":1-3:5-7";
+        BinlogOffset off =
+                startupBinlogOffset(
+                        
"{\"file\":\"mysql-bin.000004\",\"pos\":\"1024\",\"gtids\":\""
+                                + gtid
+                                + "\"}");
+        assertEquals(gtid, off.getGtidSet());
+        assertNotNull(off.getOffsetKind());
+        assertEquals(
+                2, new 
GtidSet(off.getGtidSet()).getUUIDSet(SERVER_UUID).getIntervals().size());
+    }
+
+    // A gtid-only JSON offset (no file/pos) is accepted, matching Flink CDC's 
specific-offset rules.
+    @Test
+    void jsonOffsetGtidOnlyIsAccepted() throws Exception {
+        String gtid = SERVER_UUID + ":1-7";
+        BinlogOffset off = startupBinlogOffset("{\"gtids\":\"" + gtid + "\"}");
+        assertEquals(gtid, off.getGtidSet());
+        assertNotNull(off.getOffsetKind());
+    }
+
+    // Drive the real generateMySqlConfig JSON-offset path and return the 
rebuilt startup offset.
+    private BinlogOffset startupBinlogOffset(String offsetJson) throws 
Exception {
+        Map<String, String> cfg = new HashMap<>();
+        cfg.put(DataSourceConfigKeys.JDBC_URL, 
"jdbc:mysql://localhost:3306/testdb");
+        cfg.put(DataSourceConfigKeys.USER, "u");
+        cfg.put(DataSourceConfigKeys.PASSWORD, "p");
+        cfg.put(DataSourceConfigKeys.DATABASE, "testdb");
+        cfg.put(DataSourceConfigKeys.TABLE, "t_test");
+        cfg.put(DataSourceConfigKeys.OFFSET, offsetJson);
+        Method m =
+                MySqlSourceReader.class.getDeclaredMethod(
+                        "generateMySqlConfig", Map.class, String.class, 
int.class);
+        m.setAccessible(true);
+        MySqlSourceConfig config =
+                (MySqlSourceConfig) m.invoke(new MySqlSourceReader(), cfg, 
"job-1", 0);
+        return config.getStartupOptions().binlogOffset;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to