This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c210258f5ca [Improve](Streamingjob) support only snapshot sync for 
mysql and pg (#61389)
c210258f5ca is described below

commit c210258f5ca8d13cd34599fe3348cd551364086d
Author: wudi <[email protected]>
AuthorDate: Mon Mar 23 14:22:35 2026 +0800

    [Improve](Streamingjob) support only snapshot sync for mysql and pg (#61389)
    
    ### What problem does this PR solve?
    
    #### Background
    StreamingJob currently supports two offset modes:
    - `initial`: full snapshot + continuous incremental replication
    - `latest`: incremental replication only (no snapshot)
    
      There is no way to perform a one-time full sync and stop. This is
      needed for data migration scenarios where only a point-in-time full
      copy is required, without ongoing replication.
    
      #### Usage
    
      Set `offset=snapshot` when creating a StreamingJob:
    
      ```sql
    CREATE JOB mysql_db_sync
    ON STREAMING
    FROM MYSQL (
        ...
        "user" = "root",
        "password" = "",
        "database" = "db",
        "include_tables" = "user_info,student",
        "offset" = "snapshot"
    )
    TO DATABASE target_test_db (
    )
    ```
    
      The job will perform a full table snapshot and automatically transition
      to FINISHED once all data is synced. No binlog/WAL subscription is
      established.
    
     #### Design
    
    The implementation centers on a hasReachedEnd() signal in 
SourceOffsetProvider:
    
      - FE: JdbcSourceOffsetProvider.hasReachedEnd() returns true when in 
snapshot-only mode and all snapshot splits have been consumed
      (finishedSplits non-empty, remainingSplits empty). 
StreamingInsertJob.onStreamTaskSuccess() checks hasReachedEnd() before creating
      the next task — if true, the job is marked FINISHED.
      - BE (cdc_client): snapshot maps to StartupOptions.snapshot() for both 
MySQL and PostgreSQL connectors. The chunk-split path is
      reused from initial mode.
      - Crash recovery: if FE crashes before persisting FINISHED, the job 
resumes via PAUSED→PENDING. handlePendingState() calls
      replayOffsetProviderIfNeed() then checks hasReachedEnd() — if all splits 
are already finished, the job transitions directly to
      FINISHED without creating any new task.
    
    #### Testing
    
      Added regression tests for both MySQL and PostgreSQL:
      - test_streaming_mysql_job_snapshot.groovy
      - test_streaming_postgres_job_snapshot.groovy
    
      Both tests verify:
      1. All existing data is synced correctly after job finishes
      2. Job status transitions to FINISHED
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   3 +-
 .../streaming/DataSourceConfigValidator.java       |   3 +-
 .../insert/streaming/StreamingInsertJob.java       |  10 ++
 .../streaming/StreamingJobSchedulerTask.java       |   6 ++
 .../doris/job/offset/SourceOffsetProvider.java     |   9 ++
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  34 +++++-
 .../source/reader/JdbcIncrementalSourceReader.java |   3 +-
 .../source/reader/mysql/MySqlSourceReader.java     |   7 +-
 .../reader/postgres/PostgresSourceReader.java      |   2 +
 .../cdc/test_streaming_mysql_job_snapshot.out      |   9 ++
 .../cdc/test_streaming_postgres_job_snapshot.out   |   9 ++
 .../cdc/test_streaming_mysql_job_snapshot.groovy   | 111 ++++++++++++++++++++
 .../test_streaming_postgres_job_snapshot.groovy    | 114 +++++++++++++++++++++
 13 files changed, 309 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index b2bda583beb..47ee5f21d27 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -27,11 +27,12 @@ public class DataSourceConfigKeys {
     public static final String SCHEMA = "schema";
     public static final String INCLUDE_TABLES = "include_tables";
     public static final String EXCLUDE_TABLES = "exclude_tables";
-    // initial,earliest,latest,{binlog,postion},\d{13}
+    // initial,earliest,latest,snapshot,{binlog,position},\d{13}
     public static final String OFFSET = "offset";
     public static final String OFFSET_INITIAL = "initial";
     public static final String OFFSET_EARLIEST = "earliest";
     public static final String OFFSET_LATEST = "latest";
+    public static final String OFFSET_SNAPSHOT = "snapshot";
     public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 63efaf296cb..b75e202b1a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -110,7 +110,8 @@ public class DataSourceConfigValidator {
 
         if (key.equals(DataSourceConfigKeys.OFFSET)
                 && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
-                || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+                || value.equals(DataSourceConfigKeys.OFFSET_LATEST)
+                || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
             return false;
         }
         return true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be5c70d864a..c999b9d99e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -630,6 +630,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             }
 
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            if (offsetProvider.hasReachedEnd()) {
+                // offset provider has reached a natural end, mark job as 
finished
+                log.info("Streaming insert job {} source data fully consumed, 
marking job as FINISHED", getJobId());
+                updateJobStatus(JobStatus.FINISHED);
+                return;
+            }
             AbstractStreamingTask nextTask = createStreamingTask();
             this.runningStreamTask = nextTask;
             log.info("Streaming insert job {} create next streaming insert 
task {} after task {} success",
@@ -1262,6 +1268,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    public boolean hasReachedEnd() {
+        return offsetProvider != null && offsetProvider.hasReachedEnd();
+    }
+
     /**
      * 1. Clean offset info in ms (s3 tvf)
      * 2. Clean chunk info in meta table (jdbc)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 8df18f1ee63..95ace617a7e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -66,6 +66,12 @@ public class StreamingJobSchedulerTask extends AbstractTask {
             }
         }
         streamingInsertJob.replayOffsetProviderIfNeed();
+        if (streamingInsertJob.hasReachedEnd()) {
+            // Source already fully consumed (e.g. snapshot-only mode 
recovered after FE restart).
+            // Transition directly to FINISHED without creating a new task.
+            streamingInsertJob.updateJobStatus(JobStatus.FINISHED);
+            return;
+        }
         streamingInsertJob.createStreamingTask();
         streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
         streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 892231444e3..16fb2394fe3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -110,5 +110,14 @@ public interface SourceOffsetProvider {
         return null;
     }
 
+    /**
+     * Returns true if the provider has reached a natural completion point
+     * and the job should be marked as FINISHED.
+     * Default: false (most providers run indefinitely).
+     */
+    default boolean hasReachedEnd() {
+        return false;
+    }
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index d8959086fa5..b77dd8d8bd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -116,7 +116,8 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
             nextOffset.setSplits(snapshotSplits);
             return nextOffset;
         } else if (currentOffset != null && currentOffset.snapshotSplit()) {
-            // snapshot to binlog
+            // initial mode: snapshot to binlog
+            // snapshot-only mode must be intercepted by hasReachedEnd() 
before reaching here
             BinlogSplit binlogSplit = new BinlogSplit();
             binlogSplit.setFinishedSplits(finishedSplits);
             nextOffset.setSplits(Collections.singletonList(binlogSplit));
@@ -243,6 +244,9 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         }
 
         if (currentOffset.snapshotSplit()) {
+            if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) {
+                return false;
+            }
             return true;
         }
 
@@ -372,14 +376,21 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                     List<SnapshotSplit> lastSnapshotSplits =
                             recalculateRemainingSplits(chunkHighWatermarkMap, 
snapshotSplits);
                     if (this.remainingSplits.isEmpty()) {
-                        currentOffset = new JdbcOffset();
                         if (!lastSnapshotSplits.isEmpty()) {
+                            currentOffset = new JdbcOffset();
                             currentOffset.setSplits(lastSnapshotSplits);
-                        } else {
-                            // when snapshot to binlog phase fe restarts
+                        } else if (!isSnapshotOnlyMode()) {
+                            // initial mode: rebuild binlog split for 
snapshot-to-binlog transition
+                            currentOffset = new JdbcOffset();
                             BinlogSplit binlogSplit = new BinlogSplit();
                             binlogSplit.setFinishedSplits(finishedSplits);
                             
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+                        } else {
+                            // snapshot-only completed: leave currentOffset as 
null,
+                            // hasReachedEnd() detects completion via 
finishedSplits
+                            log.info("Replaying offset provider for job {}: 
snapshot-only mode completed,"
+                                    + " finishedSplits={}, skip currentOffset 
restoration",
+                                    getJobId(), finishedSplits.size());
                         }
                     }
                 }
@@ -535,7 +546,20 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         if (startMode == null) {
             return false;
         }
-        return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
+        return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode)
+                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
+    }
+
+    private boolean isSnapshotOnlyMode() {
+        String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
+        return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
+    }
+
+    @Override
+    public boolean hasReachedEnd() {
+        return isSnapshotOnlyMode()
+                && CollectionUtils.isNotEmpty(finishedSplits)
+                && remainingSplits.isEmpty();
     }
 
     /**
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 5b8e343faae..77052577341 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -143,7 +143,8 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
 
         // Check startup mode - for PostgreSQL, we use similar logic as MySQL
         String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
+        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
+                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
             remainingSnapshotSplits =
                     startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
         } else {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 11e5007894d..15787782da9 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -163,7 +163,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
         List<MySqlSnapshotSplit> remainingSnapshotSplits = new ArrayList<>();
         MySqlBinlogSplit remainingBinlogSplit = null;
-        if (startupMode.equals(StartupMode.INITIAL)) {
+        if (startupMode.equals(StartupMode.INITIAL) || 
startupMode.equals(StartupMode.SNAPSHOT)) {
             remainingSnapshotSplits =
                     startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
         } else {
@@ -789,8 +789,9 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         // setting startMode
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
-            // do not need set offset when initial
-            // configFactory.startupOptions(StartupOptions.initial());
+            configFactory.startupOptions(StartupOptions.initial());
+        } else if 
(DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
+            configFactory.startupOptions(StartupOptions.snapshot());
         } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
             configFactory.startupOptions(StartupOptions.earliest());
             BinlogOffset binlogOffset =
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 6a5670ad6de..d465a71c242 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -205,6 +205,8 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
             configFactory.startupOptions(StartupOptions.initial());
+        } else if 
(DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
+            configFactory.startupOptions(StartupOptions.snapshot());
         } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
             configFactory.startupOptions(StartupOptions.earliest());
         } else if 
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
new file mode 100644
index 00000000000..ea60bd7e201
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_table1 --
+A1     1
+B1     2
+
+-- !select_snapshot_table2 --
+A2     1
+B2     2
+
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
new file mode 100644
index 00000000000..ea60bd7e201
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_table1 --
+A1     1
+B1     2
+
+-- !select_snapshot_table2 --
+A2     1
+B2     2
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
new file mode 100644
index 00000000000..72ee1f29c5f
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_snapshot.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test snapshot-only mode (offset=snapshot):
+ *   1. Job syncs existing data via full snapshot.
+ *   2. Job transitions to FINISHED after snapshot completes (no binlog phase).
+ *   3. Data inserted after job finishes is NOT synced to Doris.
+ */
+suite("test_streaming_mysql_job_snapshot", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_snapshot_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_mysql_snapshot1"
+    def table2 = "user_info_mysql_snapshot2"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // prepare source tables and pre-existing data in mysql
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `name` varchar(200) NOT NULL,
+                  `age` int DEFAULT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 
1)"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 
2)"""
+            sql """CREATE TABLE ${mysqlDb}.${table2} (
+                  `name` varchar(200) NOT NULL,
+                  `age` int DEFAULT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 
1)"""
+            sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 
2)"""
+        }
+
+        // create streaming job with offset=snapshot (snapshot-only mode)
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "snapshot"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // wait for job to transition to FINISHED
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def jobStatus = sql """select Status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                        log.info("jobStatus: " + jobStatus)
+                        jobStatus.size() == 1 && jobStatus.get(0).get(0) == 
'FINISHED'
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        // verify snapshot data is correctly synced
+        qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
+        qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name 
asc """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
new file mode 100644
index 00000000000..771b4934319
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot.groovy
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test snapshot-only mode (offset=snapshot):
+ *   1. Job syncs existing data via full snapshot.
+ *   2. Job transitions to FINISHED after snapshot completes (no binlog phase).
+ *   3. Data inserted after job finishes is NOT synced to Doris.
+ */
+suite("test_streaming_postgres_job_snapshot", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_snapshot_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_snapshot1"
+    def table2 = "user_info_pg_snapshot2"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source tables and pre-existing data in postgres
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table2}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2)"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table2} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('A2', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table2} (name, age) 
VALUES ('B2', 2)"""
+        }
+
+        // create streaming job with offset=snapshot (snapshot-only mode)
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${pgUser}",
+                    "password" = "${pgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "snapshot"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // wait for job to transition to FINISHED
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def jobStatus = sql """select Status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING'"""
+                        log.info("jobStatus: " + jobStatus)
+                        jobStatus.size() == 1 && jobStatus.get(0).get(0) == 
'FINISHED'
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        // verify snapshot data is correctly synced
+        qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
+        qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name 
asc """
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to