This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e25c9f7070 [fix](streaming-job) restore split-bound Java types when 
reading FE-persisted CDC offset (#63219)
4e25c9f7070 is described below

commit 4e25c9f707096cff0ee8b0b541e584e079e76946
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:15:37 2026 +0800

    [fix](streaming-job) restore split-bound Java types when reading 
FE-persisted CDC offset (#63219)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    PG streaming jobs with a temporal chunk-key column (e.g. `DATE` PK) fail
    during snapshot read with:
    
    ```
    PSQLException: ERROR: operator does not exist: date <= character varying
      Hint: No operator matches the given name and argument types.
    ```
    
    **Root cause**
    
    cdc_client reports snapshot splits to FE; FE persists the offset as JSON
    via Spring Jackson, where `SqlDateSerializer` writes `java.sql.Date` as
    a `"yyyy-MM-dd"` string. When the offset is later read back,
    `ObjectMapper.convertValue(offset, SnapshotSplit.class)` lands the bound
    values back into `Object[]` as `String` (type info is gone). At read
    time `PostgresQueryUtils.readTableSplitDataStatement` calls bare
    `statement.setObject(idx, splitEnd[i])`; PG JDBC sends `String` as
    VARCHAR oid, and PostgreSQL — strict about operator resolution — refuses
    `date <= varchar` (no implicit cast from varchar to date).
    
    The same FE round-trip happens for MySQL too. MySQL server is lenient
    enough to implicitly coerce the bound, so the surface error does not
    appear, but `SplitKeyUtils.compareObjects` falls back to `toString()`
    comparison whenever the restored type doesn't match the Debezium
    connect-schema type — a latent issue worth keeping consistent.
    
    **Fix**
    
    Restore the original Java types at the cdc_client side, where the loss
    happens:
    
    - `AbstractCdcSourceReader` adds:
    - abstract `probeSplitKeyClass(TableId, Column, JobBaseConfig)` —
    dialect-specific lookup
    - `resolveSplitKeyClass(...)` — per-column cached wrapper (1 probe per
    table.column, reused across splits)
    - static `convertBounds(Object[], Class<?>, ObjectMapper)` — restores
    Object[] elements to the target class
    - `PostgresSourceReader` / `MySqlSourceReader` override
    `probeSplitKeyClass`: run `SELECT col FROM table WHERE 1=0` and use
    `ResultSetMetaData.getColumnClassName(1)` so the JDBC driver itself
    decides the Java type. Probe failure throws (no silent fallback — silent
    fallback would let the original bug recur).
    - `JdbcIncrementalSourceReader.createSnapshotSplit` /
    `createStreamSplit` and `MySqlSourceReader.createSnapshotSplit` /
    `createBinlogSplit` (4 sites total) apply `convertBounds` before
    constructing Flink CDC's `SnapshotSplit` / `FinishedSnapshotSplitInfo`.
    - `convertBounds` special-cases `java.sql.Date` / `Timestamp` / `Time`
    via `valueOf` to match the JVM-default-TZ semantics of `rs.getObject`
    (Jackson's default `SqlDateDeserializer` hard-codes GMT, which would
    shift the value in non-UTC TZs). Other types fall through to
    `ObjectMapper.convertValue`.
    
    ### Release note
    
    Fix PG/MySQL streaming snapshot failing on temporal chunk-key columns
    after FE offset JSON round-trip strips the original Java type.
---
 .../source/reader/AbstractCdcSourceReader.java     |  45 ++++++
 .../source/reader/JdbcIncrementalSourceReader.java |  39 ++++-
 .../source/reader/mysql/MySqlSourceReader.java     |  58 +++++++-
 .../reader/postgres/PostgresSourceReader.java      |  23 +++
 .../source/reader/AbstractCdcSourceReaderTest.java | 165 +++++++++++++++++++++
 .../cdc/test_streaming_mysql_job_date_pk.out       |  29 ++++
 .../cdc/test_streaming_postgres_job_date_pk.out    |  29 ++++
 .../cdc/test_streaming_mysql_job_date_pk.groovy    | 129 ++++++++++++++++
 .../cdc/test_streaming_postgres_job_date_pk.groovy | 131 ++++++++++++++++
 9 files changed, 635 insertions(+), 13 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 6ebf75a99aa..f006d791528 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.source.reader;
 import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
 import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer;
 import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.request.JobBaseConfig;
 import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
 
 import org.apache.flink.cdc.connectors.base.utils.SerializerUtils;
@@ -37,6 +38,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.debezium.document.Document;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
+import io.debezium.relational.Column;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 import lombok.Getter;
@@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements 
SourceReader {
     protected SourceRecordDeserializer<SourceRecord, DeserializeResult> 
serializer;
     protected Map<TableId, TableChanges.TableChange> tableSchemas;
 
+    private final Map<String, Class<?>> splitKeyClassCache = new 
ConcurrentHashMap<>();
+
+    protected abstract Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig);
+
+    protected Class<?> resolveSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        String key = tableId.identifier() + "." + splitColumn.name();
+        return splitKeyClassCache.computeIfAbsent(
+                key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig));
+    }
+
+    protected static Object[] convertBounds(Object[] raw, Class<?> target, 
ObjectMapper mapper) {
+        if (raw == null) {
+            return null;
+        }
+        Object[] out = new Object[raw.length];
+        for (int i = 0; i < raw.length; i++) {
+            out[i] = convertBound(raw[i], target, mapper);
+        }
+        return out;
+    }
+
+    private static Object convertBound(Object v, Class<?> target, ObjectMapper 
mapper) {
+        if (v == null) {
+            return null;
+        }
+        if (target.isInstance(v)) {
+            return v;
+        }
+        String s = v.toString();
+        if (target == java.sql.Date.class) {
+            return java.sql.Date.valueOf(s);
+        }
+        if (target == java.sql.Timestamp.class) {
+            return java.sql.Timestamp.valueOf(s);
+        }
+        if (target == java.sql.Time.class) {
+            return java.sql.Time.valueOf(s);
+        }
+        return mapper.convertValue(v, target);
+    }
+
     /**
      * Load tableSchemas from a JSON string (produced by {@link 
#serializeTableSchemas()}). Used
      * when a binlog/stream split is resumed from FE-persisted state.
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 10cb98e448a..ae64b695e95 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -614,8 +614,6 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
             createSnapshotSplit(Map<String, Object> offset, JobBaseConfig 
jobConfig) {
         SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
         TableId tableId = TableId.parse(snapshotSplit.getTableId(), false);
-        Object[] splitStart = snapshotSplit.getSplitStart();
-        Object[] splitEnd = snapshotSplit.getSplitEnd();
         List<String> splitKeys = snapshotSplit.getSplitKey();
         Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
         TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -623,7 +621,18 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                 tableChange, "Can not find table " + tableId + " in job " + 
jobConfig.getJobId());
         // only support one split key
         String splitKey = splitKeys.get(0);
-        io.debezium.relational.Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+        Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+        Preconditions.checkNotNull(
+                splitColumn,
+                "Split key column "
+                        + splitKey
+                        + " not found in table "
+                        + tableId
+                        + " for job "
+                        + jobConfig.getJobId());
+        Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn, 
jobConfig);
+        Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), 
keyClass, objectMapper);
+        Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), 
keyClass, objectMapper);
         RowType splitType = getSplitType(splitColumn);
         org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit 
split =
                 new 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit(
@@ -660,6 +669,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                             
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
                             .toList();
 
+            Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(config);
             for (SnapshotSplit split : assignedSplitLists) {
                 // find the min offset
                 Map<String, String> offsetMap = split.getHighWatermark();
@@ -670,12 +680,29 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
                 if (maxOffsetFinishSplits == null || 
sourceOffset.isAfter(maxOffsetFinishSplits)) {
                     maxOffsetFinishSplits = sourceOffset;
                 }
+                TableId tid = TableId.parse(split.getTableId());
+                TableChanges.TableChange tableChange = tableSchemas.get(tid);
+                Preconditions.checkNotNull(
+                        tableChange, "Can not find table " + tid + " in job " 
+ config.getJobId());
+                String splitKey = split.getSplitKey().get(0);
+                Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+                Preconditions.checkNotNull(
+                        splitColumn,
+                        "Split key column "
+                                + splitKey
+                                + " not found in table "
+                                + tid
+                                + " for job "
+                                + config.getJobId());
+                Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn, 
config);
+                Object[] start = convertBounds(split.getSplitStart(), 
keyClass, objectMapper);
+                Object[] end = convertBounds(split.getSplitEnd(), keyClass, 
objectMapper);
                 finishedSnapshotSplitInfos.add(
                         new FinishedSnapshotSplitInfo(
-                                TableId.parse(split.getTableId()),
+                                tid,
                                 split.getSplitId(),
-                                split.getSplitStart(),
-                                split.getSplitEnd(),
+                                start,
+                                end,
                                 sourceOffset,
                                 getOffsetFactory()));
             }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index bf8ac56312b..84340f5e137 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -69,7 +70,9 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.IOException;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -590,8 +593,6 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
             Map<String, Object> offset, JobBaseConfig jobConfig) throws 
JsonProcessingException {
         SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, 
SnapshotSplit.class);
         TableId tableId = TableId.parse(snapshotSplit.getTableId());
-        Object[] splitStart = snapshotSplit.getSplitStart();
-        Object[] splitEnd = snapshotSplit.getSplitEnd();
         List<String> splitKeys = snapshotSplit.getSplitKey();
         Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(jobConfig);
         TableChanges.TableChange tableChange = tableSchemas.get(tableId);
@@ -600,6 +601,17 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         // only support one split key
         String splitKey = splitKeys.get(0);
         Column splitColumn = tableChange.getTable().columnWithName(splitKey);
+        Preconditions.checkNotNull(
+                splitColumn,
+                "Split key column "
+                        + splitKey
+                        + " not found in table "
+                        + tableId
+                        + " for job "
+                        + jobConfig.getJobId());
+        Class<?> keyClass = resolveSplitKeyClass(tableId, splitColumn, 
jobConfig);
+        Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), 
keyClass, objectMapper);
+        Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), 
keyClass, objectMapper);
         RowType splitType = ChunkUtils.getChunkKeyColumnType(splitColumn, 
false);
         MySqlSnapshotSplit split =
                 new MySqlSnapshotSplit(
@@ -631,6 +643,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                             
.sorted(Comparator.comparing(AbstractSourceSplit::getSplitId))
                             .toList();
 
+            Map<TableId, TableChanges.TableChange> tableSchemas = 
getTableSchemas(config);
             for (SnapshotSplit split : assignedSplitLists) {
                 // find the min binlog offset
                 Map<String, String> offsetMap = split.getHighWatermark();
@@ -641,13 +654,26 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                 if (maxOffsetFinishSplits == null || 
binlogOffset.isAfter(maxOffsetFinishSplits)) {
                     maxOffsetFinishSplits = binlogOffset;
                 }
+                TableId tid = TableId.parse(split.getTableId());
+                TableChanges.TableChange tableChange = tableSchemas.get(tid);
+                Preconditions.checkNotNull(
+                        tableChange, "Can not find table " + tid + " in job " 
+ config.getJobId());
+                String splitKey = split.getSplitKey().get(0);
+                Column splitColumn = 
tableChange.getTable().columnWithName(splitKey);
+                Preconditions.checkNotNull(
+                        splitColumn,
+                        "Split key column "
+                                + splitKey
+                                + " not found in table "
+                                + tid
+                                + " for job "
+                                + config.getJobId());
+                Class<?> keyClass = resolveSplitKeyClass(tid, splitColumn, 
config);
+                Object[] start = convertBounds(split.getSplitStart(), 
keyClass, objectMapper);
+                Object[] end = convertBounds(split.getSplitEnd(), keyClass, 
objectMapper);
                 finishedSnapshotSplitInfos.add(
                         new FinishedSnapshotSplitInfo(
-                                TableId.parse(split.getTableId()),
-                                split.getSplitId(),
-                                split.getSplitStart(),
-                                split.getSplitEnd(),
-                                binlogOffset));
+                                tid, split.getSplitId(), start, end, 
binlogOffset));
             }
         }
 
@@ -1050,6 +1076,24 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         return schemas;
     }
 
+    @Override
+    protected Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        MySqlSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        String sql =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1=0",
+                        StatementUtils.quote(splitColumn.name()), 
StatementUtils.quote(tableId));
+        try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig);
+                Statement st = jdbc.connection().createStatement();
+                ResultSet rs = st.executeQuery(sql)) {
+            return Class.forName(rs.getMetaData().getColumnClassName(1));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Probe split key class failed for " + tableId + "." + 
splitColumn.name(), e);
+        }
+    }
+
     private Map<TableId, TableChanges.TableChange> 
discoverTableSchemas(JobBaseConfig config) {
         MySqlSourceConfig sourceConfig = getSourceConfig(config);
         try (MySqlConnection jdbc = 
DebeziumUtils.createMySqlConnection(sourceConfig)) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 6638787ea48..8bf53a1eb97 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -49,10 +49,13 @@ import 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetch
 import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
 import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
 import org.apache.flink.table.types.DataType;
 
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
@@ -366,6 +369,26 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         return PostgresTypeUtils.fromDbzColumn(splitColumn);
     }
 
+    @Override
+    protected Class<?> probeSplitKeyClass(
+            TableId tableId, Column splitColumn, JobBaseConfig jobConfig) {
+        PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
+        String sql =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1=0",
+                        PostgresQueryUtils.quote(splitColumn.name()),
+                        PostgresQueryUtils.quote(tableId));
+        try (JdbcConnection jdbc =
+                        new 
PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig);
+                Statement st = jdbc.connection().createStatement();
+                ResultSet rs = st.executeQuery(sql)) {
+            return Class.forName(rs.getMetaData().getColumnClassName(1));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Probe split key class failed for " + tableId + "." + 
splitColumn.name(), e);
+        }
+    }
+
     /**
      * Why not call dialect.displayCurrentOffset(sourceConfig) ? The 
underlying system calls
      * `txid_current()` to advance the WAL log. Here, it's just a query; 
retrieving the LSN is
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
new file mode 100644
index 00000000000..fbca1ad41e6
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader;
+
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class AbstractCdcSourceReaderTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Test
+    void convertBoundsRestoresDateFromString() {
+        Object[] raw = new Object[] {"2025-01-06"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertTrue(out[0] instanceof Date);
+        assertEquals(Date.valueOf("2025-01-06"), out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresTimestampFromString() {
+        Object[] raw = new Object[] {"2025-01-06 12:34:56"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
Timestamp.class, MAPPER);
+        assertTrue(out[0] instanceof Timestamp);
+        assertEquals(Timestamp.valueOf("2025-01-06 12:34:56"), out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresLongFromJsonInteger() {
+        // Jackson typically deserializes JSON integer to Integer; restore as 
Long for int8 cols.
+        Object[] raw = new Object[] {123456};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Long.class, 
MAPPER);
+        assertTrue(out[0] instanceof Long);
+        assertEquals(123456L, out[0]);
+    }
+
+    @Test
+    void convertBoundsRestoresBigDecimalFromString() {
+        Object[] raw = new Object[] {"12.34"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
BigDecimal.class, MAPPER);
+        assertTrue(out[0] instanceof BigDecimal);
+        assertEquals(new BigDecimal("12.34"), out[0]);
+    }
+
+    @Test
+    void convertBoundsPreservesStringForVarcharColumns() {
+        Object[] raw = new Object[] {"event_id_42"};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, 
String.class, MAPPER);
+        assertEquals("event_id_42", out[0]);
+    }
+
+    @Test
+    void convertBoundsReturnsNullForNullInput() {
+        assertNull(AbstractCdcSourceReader.convertBounds(null, Date.class, 
MAPPER));
+    }
+
+    @Test
+    void convertBoundsKeepsNullElement() {
+        Object[] raw = new Object[] {null};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertEquals(1, out.length);
+        assertNull(out[0]);
+    }
+
+    @Test
+    void convertBoundsHandlesMultiElementArray() {
+        Object[] raw = new Object[] {"2025-01-06", null};
+        Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, 
MAPPER);
+        assertEquals(2, out.length);
+        assertEquals(Date.valueOf("2025-01-06"), out[0]);
+        assertNull(out[1]);
+    }
+
+    @Test
+    void restoresDateChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.events:0");
+        feOffset.put("tableId", "public.events");
+        feOffset.put("splitKey", Arrays.asList("event_date"));
+        feOffset.put("splitStart", null);
+        feOffset.put("splitEnd", Arrays.asList("2025-01-06"));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+        assertEquals(String.class, deserialized.getSplitEnd()[0].getClass());
+
+        Object[] restored =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Date.class, MAPPER);
+        assertEquals(Date.class, restored[0].getClass());
+        assertEquals(Date.valueOf("2025-01-06"), restored[0]);
+    }
+
+    @Test
+    void restoresTimestampChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.orders:7");
+        feOffset.put("tableId", "public.orders");
+        feOffset.put("splitKey", Arrays.asList("created_at"));
+        feOffset.put("splitStart", Arrays.asList("2025-01-06 00:00:00"));
+        feOffset.put("splitEnd", Arrays.asList("2025-01-07 00:00:00"));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+
+        Object[] restoredStart =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitStart(), Timestamp.class, MAPPER);
+        Object[] restoredEnd =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Timestamp.class, MAPPER);
+        assertEquals(Timestamp.class, restoredStart[0].getClass());
+        assertEquals(Timestamp.valueOf("2025-01-06 00:00:00"), 
restoredStart[0]);
+        assertEquals(Timestamp.valueOf("2025-01-07 00:00:00"), restoredEnd[0]);
+    }
+
+    @Test
+    void restoresBigintChunkKeyAfterFeRoundTrip() {
+        Map<String, Object> feOffset = new HashMap<>();
+        feOffset.put("splitId", "public.orders:0");
+        feOffset.put("tableId", "public.orders");
+        feOffset.put("splitKey", Arrays.asList("id"));
+        feOffset.put("splitStart", Arrays.asList(100));
+        feOffset.put("splitEnd", Arrays.asList(200));
+
+        SnapshotSplit deserialized = MAPPER.convertValue(feOffset, 
SnapshotSplit.class);
+
+        Object[] restoredStart =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitStart(), Long.class, MAPPER);
+        Object[] restoredEnd =
+                AbstractCdcSourceReader.convertBounds(
+                        deserialized.getSplitEnd(), Long.class, MAPPER);
+        assertEquals(Long.class, restoredStart[0].getClass());
+        assertEquals(100L, restoredStart[0]);
+        assertEquals(200L, restoredEnd[0]);
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01     A1
+2025-01-02     B1
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01     1       A2
+2025-02-02     2       B2
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+
+-- !select_binlog_date_pk --
+2025-01-02     B1_upd
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+2025-01-06     F1
+
+-- !select_binlog_composite_pk --
+2025-02-02     2       B2_upd
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+2025-02-06     6       F2
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
new file mode 100644
index 00000000000..d6105d7a899
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out
@@ -0,0 +1,29 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_date_pk --
+2025-01-01     A1
+2025-01-02     B1
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+
+-- !select_snapshot_composite_pk --
+2025-02-01     1       A2
+2025-02-02     2       B2
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+
+-- !select_binlog_date_pk --
+2025-01-02     B1_upd
+2025-01-03     C1
+2025-01-04     D1
+2025-01-05     E1
+2025-01-06     F1
+
+-- !select_binlog_composite_pk --
+2025-02-02     2       B2_upd
+2025-02-03     3       C2
+2025-02-04     4       D2
+2025-02-05     5       E2
+2025-02-06     6       F2
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
new file mode 100644
index 00000000000..85570960de5
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_date_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_date_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDate = "events_mysql_date_pk"
+    def tableComposite = "events_mysql_date_id_pk"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDate} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDate}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableDate} (
+                  `event_date` date NOT NULL,
+                  `payload` varchar(200) DEFAULT NULL,
+                  PRIMARY KEY (`event_date`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-01', 'A1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-02', 'B1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-03', 'C1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-04', 'D1')"""
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-05', 'E1')"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}"""
+            sql """CREATE TABLE ${mysqlDb}.${tableComposite} (
+                  `event_date` date NOT NULL,
+                  `id` int NOT NULL,
+                  `payload` varchar(200) DEFAULT NULL,
+                  PRIMARY KEY (`event_date`, `id`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-01', 1, 'A2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-02', 2, 'B2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-03', 3, 'C2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-04', 4, 'D2')"""
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-05', 5, 'E2')"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${tableDate},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "1",
+                    "snapshot_parallelism" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '5' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) 
VALUES ('2025-01-06', 'F1')"""
+            sql """UPDATE ${mysqlDb}.${tableDate} SET payload = 'B1_upd' WHERE 
event_date = '2025-01-02'"""
+            sql """DELETE FROM ${mysqlDb}.${tableDate} WHERE event_date = 
'2025-01-01'"""
+
+            sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, 
payload) VALUES ('2025-02-06', 6, 'F2')"""
+            sql """UPDATE ${mysqlDb}.${tableComposite} SET payload = 'B2_upd' 
WHERE event_date = '2025-02-02' AND id = 2"""
+            sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE event_date = 
'2025-02-01' AND id = 1"""
+        }
+
+        sleep(60000)
+
+        qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        def jobInfo = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobInfo.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
new file mode 100644
index 00000000000..ec43d22f988
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_date_pk", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_date_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def tableDate = "events_pg_date_pk"
+    def tableComposite = "events_pg_date_id_pk"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${tableDate} force"""
+    sql """drop table if exists ${currentDb}.${tableComposite} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} (
+                  "event_date" date PRIMARY KEY,
+                  "payload" varchar(200)
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-01', 'A1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-02', 'B1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-03', 'C1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-04', 'D1');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-05', 'E1');"""
+
+            sql """DROP TABLE IF EXISTS 
${pgDB}.${pgSchema}.${tableComposite}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} (
+                  "event_date" date NOT NULL,
+                  "id" int4 NOT NULL,
+                  "payload" varchar(200),
+                  PRIMARY KEY ("event_date", "id")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${tableDate},${tableComposite}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "1",
+                    "snapshot_parallelism" = "2"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        jobSuccendCount.size() == 1 && '5' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, 
payload) VALUES ('2025-01-06', 'F1');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload = 
'B1_upd' WHERE event_date = '2025-01-02';"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE 
event_date = '2025-01-01';"""
+
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} 
(event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');"""
+            sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload = 
'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;"""
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE 
event_date = '2025-02-01' AND id = 1;"""
+        }
+
+        sleep(60000)
+
+        qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by 
event_date asc """
+        qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} 
order by event_date asc, id asc """
+
+        def jobInfo = sql """select status from jobs("type"="insert") where 
Name='${jobName}'"""
+        assert jobInfo.get(0).get(0) == "RUNNING"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to