This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4e25c9f7070 [fix](streaming-job) restore split-bound Java types when
reading FE-persisted CDC offset (#63219)
4e25c9f7070 is described below
commit 4e25c9f707096cff0ee8b0b541e584e079e76946
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:15:37 2026 +0800
[fix](streaming-job) restore split-bound Java types when reading
FE-persisted CDC offset (#63219)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
PG streaming jobs with a temporal chunk-key column (e.g. `DATE` PK) fail
during snapshot read with:
```
PSQLException: ERROR: operator does not exist: date <= character varying
Hint: No operator matches the given name and argument types.
```
**Root cause**
cdc_client reports snapshot splits to FE; FE persists the offset as JSON
via Spring Jackson, where `SqlDateSerializer` writes `java.sql.Date` as
a `"yyyy-MM-dd"` string. When the offset is later read back,
`ObjectMapper.convertValue(offset, SnapshotSplit.class)` lands the bound
values back into `Object[]` as `String` (type info is gone). At read
time `PostgresQueryUtils.readTableSplitDataStatement` calls bare
`statement.setObject(idx, splitEnd[i])`; PG JDBC sends `String` as
VARCHAR oid, and PostgreSQL — strict about operator resolution — refuses
`date <= varchar` (no implicit cast from varchar to date).
The same FE round-trip happens for MySQL too. MySQL server is lenient
enough to implicitly coerce the bound, so the surface error does not
appear, but `SplitKeyUtils.compareObjects` falls back to `toString()`
comparison whenever the restored type doesn't match the Debezium
connect-schema type — a latent issue worth keeping consistent.
**Fix**
Restore the original Java types at the cdc_client side, where the loss
happens:
- `AbstractCdcSourceReader` adds:
- abstract `probeSplitKeyClass(TableId, Column, JobBaseConfig)` —
dialect-specific lookup
- `resolveSplitKeyClass(...)` — per-column cached wrapper (1 probe per
table.column, reused across splits)
- static `convertBounds(Object[], Class<?>, ObjectMapper)` — restores
Object[] elements to the target class
- `PostgresSourceReader` / `MySqlSourceReader` override
`probeSplitKeyClass`: run `SELECT col FROM table WHERE 1=0` and use
`ResultSetMetaData.getColumnClassName(1)` so the JDBC driver itself
decides the Java type. Probe failure throws (no silent fallback — silent
fallback would let the original bug recur).
- `JdbcIncrementalSourceReader.createSnapshotSplit` /
`createStreamSplit` and `MySqlSourceReader.createSnapshotSplit` /
`createBinlogSplit` (4 sites total) apply `convertBounds` before
constructing Flink CDC's `SnapshotSplit` / `FinishedSnapshotSplitInfo`.
- `convertBounds` special-cases `java.sql.Date` / `Timestamp` / `Time`
via `valueOf` to match the JVM-default-TZ semantics of `rs.getObject`
(Jackson's default `SqlDateDeserializer` hard-codes GMT, which would
shift the value in non-UTC TZs). Other types fall through to
`ObjectMapper.convertValue`.
### Release note
Fix PG/MySQL streaming snapshot failing on temporal chunk-key columns
after FE offset JSON round-trip strips the original Java type.
---
.../source/reader/AbstractCdcSourceReader.java | 45 ++++++
.../source/reader/JdbcIncrementalSourceReader.java | 39 ++++-
.../source/reader/mysql/MySqlSourceReader.java | 58 +++++++-
.../reader/postgres/PostgresSourceReader.java | 23 +++
.../source/reader/AbstractCdcSourceReaderTest.java | 165 +++++++++++++++++++++
.../cdc/test_streaming_mysql_job_date_pk.out | 29 ++++
.../cdc/test_streaming_postgres_job_date_pk.out | 29 ++++
.../cdc/test_streaming_mysql_job_date_pk.groovy | 129 ++++++++++++++++
.../cdc/test_streaming_postgres_job_date_pk.groovy | 131 ++++++++++++++++
9 files changed, 635 insertions(+), 13 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 6ebf75a99aa..f006d791528 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.source.reader;
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
+import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import lombok.Getter;
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
protected SourceRecordDeserializer<SourceRecord, DeserializeResult>
serializer;
protected Map<TableId, TableChanges.TableChange> tableSchemas;
+ private final Map<String, Class<?>> splitKeyClassCache = new
ConcurrentHashMap<>();
+
+ protected abstract Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+ protected Class<?> resolveSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ String key = tableId.identifier() + "." + splitColumn.name();
+ return splitKeyClassCache.computeIfAbsent(
+ key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+ }
+
+ protected static Object[] convertBounds(Object[] raw, Class<?> target,
ObjectMapper mapper) {
+ if (raw == null) {
+ return null;
+ }
+ Object[] out = new Object[raw.length];
+ for (int i = 0; i < raw.length; i++) {
+ out[i] = convertBound(raw[i], target, mapper);
+ }
+ return out;
+ }
+
+ private static Object convertBound(Object v, Class<?> target, ObjectMapper
mapper) {
+ if (v == null) {
+ return null;
+ }
+ if (target.isInstance(v)) {
+ return v;
+ }
+ String s = v.toString();
+ if (target == java.sql.Date.class) {
+ return java.sql.Date.valueOf(s);
+ }
+ if (target == java.sql.Timestamp.class) {
+ return java.sql.Timestamp.valueOf(s);
+ }
+ if (target == java.sql.Time.class) {
+ return java.sql.Time.valueOf(s);
+ }
+ return mapper.convertValue(v, target);
+ }
+
/**
* Load tableSchemas from a JSON string (produced by {@link
#serializeTableSchemas()}). Used
* when a binlog/stream split is resumed from FE-persisted state.
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 10cb98e448a..ae64b695e95 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -614,8 +614,6 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
createSnapshotSplit(Map<String, Object> offset, JobBaseConfig
jobConfig) {
SnapshotSplit snapshotSplit = objectMapper.convertValue(offset,
SnapshotSplit.class);
TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
List<String> splitKeys = snapshotSplit.getSplitKey();
Map<TableId, TableChanges.TableChange> tableSchemas =
getTableSchemas(jobConfig);
TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -623,7 +621,18 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
tableChange, "Can not find table " + tableId + " in job " +
jobConfig.getJobId());
// only support one split key
String splitKey = splitKeys.get(0);
- io.debezium.relational.Column splitColumn =
tableChange.getTable().columnWithName(splitKey);
+ Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+ Preconditions.checkNotNull(
+ splitColumn,
+ "Split key column "
+ + splitKey
+ + " not found in table "
+ + tableId
+ + " for job "
+ + jobConfig.getJobId());
+ Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn,
jobConfig);
+ Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(),
keyClass, objectMapper);
+ Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(),
keyClass, objectMapper);
RowType splitType = getSplitType(splitColumn);
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
split =
new
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit(
@@ -660,6 +669,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
.toList();
+ Map<TableId, TableChanges.TableChange> tableSchemas =
getTableSchemas(config);
for (SnapshotSplit split : assignedSplitLists) {
// find the min offset
Map<String, String> offsetMap = split.getHighWatermark();
@@ -670,12 +680,29 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
if (maxOffsetFinishSplits == null ||
sourceOffset.isAfter(maxOffsetFinishSplits)) {
maxOffsetFinishSplits = sourceOffset;
}
+ TableId tid = TableId.parse(split.getTableId());
+ TableChanges.TableChange tableChange = tableSchemas.get(tid);
+ Preconditions.checkNotNull(
+ tableChange, "Can not find table " + tid + " in job "
+ config.getJobId());
+ String splitKey = split.getSplitKey().get(0);
+ Column splitColumn =
tableChange.getTable().columnWithName(splitKey);
+ Preconditions.checkNotNull(
+ splitColumn,
+ "Split key column "
+ + splitKey
+ + " not found in table "
+ + tid
+ + " for job "
+ + config.getJobId());
+ Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn,
config);
+ Object[] start = convertBounds(split.getSplitStart(),
keyClass, objectMapper);
+ Object[] end = convertBounds(split.getSplitEnd(), keyClass,
objectMapper);
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
- TableId.parse(split.getTableId()),
+ tid,
split.getSplitId(),
- split.getSplitStart(),
- split.getSplitEnd(),
+ start,
+ end,
sourceOffset,
getOffsetFactory()));
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index bf8ac56312b..84340f5e137 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -61,6 +61,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -69,7 +70,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -590,8 +593,6 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
Map<String, Object> offset, JobBaseConfig jobConfig) throws
JsonProcessingException {
SnapshotSplit snapshotSplit = objectMapper.convertValue(offset,
SnapshotSplit.class);
TableId tableId = TableId.parse(snapshotSplit.getTableId());
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
List<String> splitKeys = snapshotSplit.getSplitKey();
Map<TableId, TableChanges.TableChange> tableSchemas =
getTableSchemas(jobConfig);
TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -600,6 +601,17 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
// only support one split key
String splitKey = splitKeys.get(0);
Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+ Preconditions.checkNotNull(
+ splitColumn,
+ "Split key column "
+ + splitKey
+ + " not found in table "
+ + tableId
+ + " for job "
+ + jobConfig.getJobId());
+ Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn,
jobConfig);
+ Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(),
keyClass, objectMapper);
+ Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(),
keyClass, objectMapper);
RowType splitType = ChunkUtils.getChunkKeyColumnType(splitColumn,
false);
MySqlSnapshotSplit split =
new MySqlSnapshotSplit(
@@ -631,6 +643,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
.toList();
+ Map<TableId, TableChanges.TableChange> tableSchemas =
getTableSchemas(config);
for (SnapshotSplit split : assignedSplitLists) {
// find the min binlog offset
Map<String, String> offsetMap = split.getHighWatermark();
@@ -641,13 +654,26 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
if (maxOffsetFinishSplits == null ||
binlogOffset.isAfter(maxOffsetFinishSplits)) {
maxOffsetFinishSplits = binlogOffset;
}
+ TableId tid = TableId.parse(split.getTableId());
+ TableChanges.TableChange tableChange = tableSchemas.get(tid);
+ Preconditions.checkNotNull(
+ tableChange, "Can not find table " + tid + " in job "
+ config.getJobId());
+ String splitKey = split.getSplitKey().get(0);
+ Column splitColumn =
tableChange.getTable().columnWithName(splitKey);
+ Preconditions.checkNotNull(
+ splitColumn,
+ "Split key column "
+ + splitKey
+ + " not found in table "
+ + tid
+ + " for job "
+ + config.getJobId());
+ Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn,
config);
+ Object[] start = convertBounds(split.getSplitStart(),
keyClass, objectMapper);
+ Object[] end = convertBounds(split.getSplitEnd(), keyClass,
objectMapper);
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
- TableId.parse(split.getTableId()),
- split.getSplitId(),
- split.getSplitStart(),
- split.getSplitEnd(),
- binlogOffset));
+ tid, split.getSplitId(), start, end,
binlogOffset));
}
}
@@ -1050,6 +1076,24 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
return schemas;
}
+ @Override
+ protected Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ MySqlSourceConfig sourceConfig = getSourceConfig(jobConfig);
+ String sql =
+ String.format(
+ "SELECT %s FROM %s WHERE 1=0",
+ StatementUtils.quote(splitColumn.name()),
StatementUtils.quote(tableId));
+ try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig);
+ Statement st = jdbc.connection().createStatement();
+ ResultSet rs = st.executeQuery(sql)) {
+ return Class.forName(rs.getMetaData().getColumnClassName(1));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Probe split key class failed for " + tableId + "." +
splitColumn.name(), e);
+ }
+ }
+
private Map<TableId, TableChanges.TableChange>
discoverTableSchemas(JobBaseConfig config) {
MySqlSourceConfig sourceConfig = getSourceConfig(config);
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig)) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 6638787ea48..8bf53a1eb97 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -49,10 +49,13 @@ import
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetch
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.table.types.DataType;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
@@ -366,6 +369,26 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
return PostgresTypeUtils.fromDbzColumn(splitColumn);
}
+ @Override
+ protected Class<?> probeSplitKeyClass(
+ TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+ PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+ String sql =
+ String.format(
+ "SELECT %s FROM %s WHERE 1=0",
+ PostgresQueryUtils.quote(splitColumn.name()),
+ PostgresQueryUtils.quote(tableId));
+ try (JdbcConnection jdbc =
+ new
PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig);
+ Statement st = jdbc.connection().createStatement();
+ ResultSet rs = st.executeQuery(sql)) {
+ return Class.forName(rs.getMetaData().getColumnClassName(1));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Probe split key class failed for " + tableId + "." +
splitColumn.name(), e);
+ }
+ }
+
/**
* Why not call dialect.displayCurrentOffset(sourceConfig) ? The
underlying system calls
* `txid_current()` to advance the WAL log. Here, it's just a query;
retrieving the LSN is
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
new file mode 100644
index 00000000000..fbca1ad41e6
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class AbstractCdcSourceReaderTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ void convertBoundsRestoresDateFromString() {
+ Object[] raw = new Object[] {"2025-01-06"};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class,
MAPPER);
+ assertTrue(out[0] instanceof Date);
+ assertEquals(Date.valueOf("2025-01-06"), out[0]);
+ }
+
+ @Test
+ void convertBoundsRestoresTimestampFromString() {
+ Object[] raw = new Object[] {"2025-01-06 12:34:56"};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw,
Timestamp.class, MAPPER);
+ assertTrue(out[0] instanceof Timestamp);
+ assertEquals(Timestamp.valueOf("2025-01-06 12:34:56"), out[0]);
+ }
+
+ @Test
+ void convertBoundsRestoresLongFromJsonInteger() {
+ // Jackson typically deserializes JSON integer to Integer; restore as
Long for int8 cols.
+ Object[] raw = new Object[] {123456};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw, Long.class,
MAPPER);
+ assertTrue(out[0] instanceof Long);
+ assertEquals(123456L, out[0]);
+ }
+
+ @Test
+ void convertBoundsRestoresBigDecimalFromString() {
+ Object[] raw = new Object[] {"12.34"};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw,
BigDecimal.class, MAPPER);
+ assertTrue(out[0] instanceof BigDecimal);
+ assertEquals(new BigDecimal("12.34"), out[0]);
+ }
+
+ @Test
+ void convertBoundsPreservesStringForVarcharColumns() {
+ Object[] raw = new Object[] {"event_id_42"};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw,
String.class, MAPPER);
+ assertEquals("event_id_42", out[0]);
+ }
+
+ @Test
+ void convertBoundsReturnsNullForNullInput() {
+ assertNull(AbstractCdcSourceReader.convertBounds(null, Date.class,
MAPPER));
+ }
+
+ @Test
+ void convertBoundsKeepsNullElement() {
+ Object[] raw = new Object[] {null};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class,
MAPPER);
+ assertEquals(1, out.length);
+ assertNull(out[0]);
+ }
+
+ @Test
+ void convertBoundsHandlesMultiElementArray() {
+ Object[] raw = new Object[] {"2025-01-06", null};
+ Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class,
MAPPER);
+ assertEquals(2, out.length);
+ assertEquals(Date.valueOf("2025-01-06"), out[0]);
+ assertNull(out[1]);
+ }
+
+ @Test
+ void restoresDateChunkKeyAfterFeRoundTrip() {
+ Map<String, Object> feOffset = new HashMap<>();
+ feOffset.put("splitId", "public.events:0");
+ feOffset.put("tableId", "public.events");
+ feOffset.put("splitKey", Arrays.asList("event_date"));
+ feOffset.put("splitStart", null);
+ feOffset.put("splitEnd", Arrays.asList("2025-01-06"));
+
+ SnapshotSplit deserialized = MAPPER.convertValue(feOffset,
SnapshotSplit.class);
+ assertEquals(String.class, deserialized.getSplitEnd()[0].getClass());
+
+ Object[] restored =
+ AbstractCdcSourceReader.convertBounds(
+ deserialized.getSplitEnd(), Date.class, MAPPER);
+ assertEquals(Date.class, restored[0].getClass());
+ assertEquals(Date.valueOf("2025-01-06"), restored[0]);
+ }
+
+ @Test
+ void restoresTimestampChunkKeyAfterFeRoundTrip() {
+ Map<String, Object> feOffset = new HashMap<>();
+ feOffset.put("splitId", "public.orders:7");
+ feOffset.put("tableId", "public.orders");
+ feOffset.put("splitKey", Arrays.asList("created_at"));
+ feOffset.put("splitStart", Arrays.asList("2025-01-06 00:00:00"));
+ feOffset.put("splitEnd", Arrays.asList("2025-01-07 00:00:00"));
+
+ SnapshotSplit deserialized = MAPPER.convertValue(feOffset,
SnapshotSplit.class);
+
+ Object[] restoredStart =
+ AbstractCdcSourceReader.convertBounds(
+ deserialized.getSplitStart(), Timestamp.class, MAPPER);
+ Object[] restoredEnd =
+ AbstractCdcSourceReader.convertBounds(
+ deserialized.getSplitEnd(), Timestamp.class, MAPPER);
+ assertEquals(Timestamp.class, restoredStart[0].getClass());
+ assertEquals(Timestamp.valueOf("2025-01-06 00:00:00"),
restoredStart[0]);
+ assertEquals(Timestamp.valueOf("2025-01-07 00:00:00"), restoredEnd[0]);
+ }
+
+ @Test
+ void restoresBigintChunkKeyAfterFeRoundTrip() {
+ Map<String, Object> feOffset = new HashMap<>();
+ feOffset.put("splitId", "public.orders:0");
+ feOffset.put("tableId", "public.orders");
+ feOffset.put("splitKey", Arrays.asList("id"));
+ feOffset.put("splitStart", Arrays.asList(100));
+ feOffset.put("splitEnd", Arrays.asList(200));
+
+ SnapshotSplit deserialized = MAPPER.convertValue(feOffset,
SnapshotSplit.class);
+
+ Object[] restoredStart =
+ AbstractCdcSourceReader.convertBounds(
+ deserialized.getSplitStart(), Long.class, MAPPER);
+ Object[] restoredEnd =
+ AbstractCdcSourceReader.convertBounds(
+ deserialized.getSplitEnd(), Long.class, MAPPER);
+ assertEquals(Long.class, restoredStart[0].getClass());
+ assertEquals(100L, restoredStart[0]);
+ assertEquals(200L, restoredEnd[0]);
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01 A1
+2025-01-02 B1
+2025-01-03 C1
+2025-01-04 D1
+2025-01-05 E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01 1 A2
+2025-02-02 2 B2
+2025-02-03 3 C2
+2025-02-04 4 D2
+2025-02-05 5 E2
+
+-- !select_binlog_date_pk --
+2025-01-02 B1_upd
+2025-01-03 C1
+2025-01-04 D1
+2025-01-05 E1
+2025-01-06 F1
+
+-- !select_binlog_composite_pk --
+2025-02-02 2 B2_upd
+2025-02-03 3 C2
+2025-02-04 4 D2
+2025-02-05 5 E2
+2025-02-06 6 F2
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01 A1
+2025-01-02 B1
+2025-01-03 C1
+2025-01-04 D1
+2025-01-05 E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01 1 A2
+2025-02-02 2 B2
+2025-02-03 3 C2
+2025-02-04 4 D2
+2025-02-05 5 E2
+
+-- !select_binlog_date_pk --
+2025-01-02 B1_upd
+2025-01-03 C1
+2025-01-04 D1
+2025-01-05 E1
+2025-01-06 F1
+
+-- !select_binlog_composite_pk --
+2025-02-02 2 B2_upd
+2025-02-03 3 C2
+2025-02-04 4 D2
+2025-02-05 5 E2
+2025-02-06 6 F2
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
new file mode 100644
index 00000000000..85570960de5
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_date_pk",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_date_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableDate = "events_mysql_date_pk"
+ def tableComposite = "events_mysql_date_id_pk"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableDate} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDate}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableDate} (
+ `event_date` date NOT NULL,
+ `payload` varchar(200) DEFAULT NULL,
+ PRIMARY KEY (`event_date`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-01', 'A1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-02', 'B1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-03', 'C1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-04', 'D1')"""
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-05', 'E1')"""
+
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+ sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+ `event_date` date NOT NULL,
+ `id` int NOT NULL,
+ `payload` varchar(200) DEFAULT NULL,
+ PRIMARY KEY (`event_date`, `id`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-01', 1, 'A2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-02', 2, 'B2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-03', 3, 'C2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-04', 4, 'D2')"""
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-05', 5, 'E2')"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" =
"jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${tableDate},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1",
+ "snapshot_parallelism" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '5' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by
event_date asc """
+ qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite}
order by event_date asc, id asc """
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload)
VALUES ('2025-01-06', 'F1')"""
+ sql """UPDATE ${mysqlDb}.${tableDate} SET payload = 'B1_upd' WHERE
event_date = '2025-01-02'"""
+ sql """DELETE FROM ${mysqlDb}.${tableDate} WHERE event_date =
'2025-01-01'"""
+
+ sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id,
payload) VALUES ('2025-02-06', 6, 'F2')"""
+ sql """UPDATE ${mysqlDb}.${tableComposite} SET payload = 'B2_upd'
WHERE event_date = '2025-02-02' AND id = 2"""
+ sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE event_date =
'2025-02-01' AND id = 1"""
+ }
+
+ sleep(60000)
+
+ qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by
event_date asc """
+ qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite}
order by event_date asc, id asc """
+
+ def jobInfo = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobInfo.get(0).get(0) == "RUNNING"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
new file mode 100644
index 00000000000..ec43d22f988
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_date_pk",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_date_pk_name"
+ def currentDb = (sql "select database()")[0][0]
+ def tableDate = "events_pg_date_pk"
+ def tableComposite = "events_pg_date_id_pk"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${tableDate} force"""
+ sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} (
+ "event_date" date PRIMARY KEY,
+ "payload" varchar(200)
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-01', 'A1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-02', 'B1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-03', 'C1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-04', 'D1');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-05', 'E1');"""
+
+ sql """DROP TABLE IF EXISTS
${pgDB}.${pgSchema}.${tableComposite}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+ "event_date" date NOT NULL,
+ "id" int4 NOT NULL,
+ "payload" varchar(200),
+ PRIMARY KEY ("event_date", "id")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${tableDate},${tableComposite}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1",
+ "snapshot_parallelism" = "2"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '5' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by
event_date asc """
+ qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite}
order by event_date asc, id asc """
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date,
payload) VALUES ('2025-01-06', 'F1');"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload =
'B1_upd' WHERE event_date = '2025-01-02';"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE
event_date = '2025-01-01';"""
+
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite}
(event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');"""
+ sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload =
'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE
event_date = '2025-02-01' AND id = 1;"""
+ }
+
+ sleep(60000)
+
+ qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by
event_date asc """
+ qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite}
order by event_date asc, id asc """
+
+ def jobInfo = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ assert jobInfo.get(0).get(0) == "RUNNING"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]