This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 39749bfda [FLINK-39001][doc][Flink-source]supple NewlyAddTable's doc
with mongodb,postgres,oracle connectors (#4247)
39749bfda is described below
commit 39749bfdaf03607a58ae825d8d2f6090fa69191f
Author: Thorne <[email protected]>
AuthorDate: Wed Feb 25 18:06:26 2026 +0800
[FLINK-39001][doc][Flink-source]supple NewlyAddTable's doc with
mongodb,postgres,oracle connectors (#4247)
Co-authored-by: Thorne <syyfffy@email>
Co-authored-by: Thorne <[email protected]>
---
.../docs/connectors/flink-sources/mongodb-cdc.md | 57 +++++++++++++++++++
.../docs/connectors/flink-sources/oracle-cdc.md | 61 ++++++++++++++++++++
.../docs/connectors/flink-sources/postgres-cdc.md | 65 ++++++++++++++++++++++
.../docs/connectors/flink-sources/mongodb-cdc.md | 57 +++++++++++++++++++
.../docs/connectors/flink-sources/oracle-cdc.md | 61 ++++++++++++++++++++
.../docs/connectors/flink-sources/postgres-cdc.md | 65 ++++++++++++++++++++++
6 files changed, 366 insertions(+)
diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index f98c7b6db..123288989 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -489,6 +489,63 @@ MongoDB 的`oplog.rs` 集合没有在状态之前保持更改记录的更新,
顺便说一句,[DBZ-435](https://issues.redhat.com/browse/DBZ-435)提到的Debezium的MongoDB变更流探索,正在制定路线图。<br>
如果完成了,我们可以考虑集成两种源连接器供用户选择。
+### 动态加表
+
+**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。
+
+动态加表功能使你可以为正在运行的作业添加新集合进行监控。新添加的集合将首先读取其快照数据,然后自动读取其变更流。
+
+想象一下这个场景:一开始,Flink 作业监控集合 `[product, user, address]`,但几天后,我们希望这个作业还可以监控集合
`[order, custom]`,这些集合包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。
+
+以下操作显示了如何启用此功能来解决上述场景。使用现有的 MongoDB CDC Source 作业,如下:
+
+```java
+ MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+ .hosts("yourHostname:27017")
+ .databaseList("db") // 设置捕获的数据库
+ .collectionList("db.product", "db.user", "db.address") // 设置捕获的集合
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+
+如果我们想添加新集合 `[order, custom]` 到现有的 Flink 作业,只需更新作业的 `collectionList()` 将新增集合
`[order, custom]` 加入并从已有的 savepoint 恢复作业。
+
+_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: 更新现有 Flink 作业的集合列表选项。
+1. 更新 `collectionList()` 参数。
+2. 编译更新后的作业,示例如下:
+```java
+ MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+ .hosts("yourHostname:27017")
+ .databaseList("db")
+ .collectionList("db.product", "db.user", "db.address", "db.order",
"db.custom") // 设置捕获的集合 [product, user, address, order, custom]
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true)
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**注意:** 请参考文档 [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
了解更多详细信息。
+
### DataStream Source
MongoDB CDC 连接器也可以是一个数据流源。 你可以创建 SourceFunction,如下所示:
diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
index 2e5c1dd60..5258e6b41 100644
--- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
@@ -558,6 +558,67 @@ _Note: the mechanism of `scan.startup.mode` option relying
on Debezium's `snapsh
The Oracle CDC source can't work in parallel reading, because there is only
one task can receive change events.
+### 动态加表
+
+**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。
+
+动态加表功能使你可以为正在运行的作业添加新表进行监控。新添加的表将首先读取其快照数据,然后自动读取其 redo log。
+
+想象一下这个场景:一开始,Flink 作业监控表 `[product, user, address]`,但几天后,我们希望这个作业还可以监控表
`[order, custom]`,这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。
+
+以下操作显示了如何启用此功能来解决上述场景。使用现有的 Oracle CDC Source 作业,如下:
+
+```java
+ JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+ .hostname("yourHostname")
+ .port(1521)
+ .databaseList("ORCLCDB") // 设置捕获的数据库
+ .schemaList("INVENTORY") // 设置捕获的 schema
+ .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS")
// 设置捕获的表
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+
+如果我们想添加新表 `[INVENTORY.ORDER, INVENTORY.CUSTOM]` 到现有的 Flink 作业,只需更新作业的
`tableList()` 将新增表 `[INVENTORY.ORDER, INVENTORY.CUSTOM]` 加入并从已有的 savepoint 恢复作业。
+
+_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: 更新现有 Flink 作业的表列表选项。
+1. 更新 `tableList()` 参数。
+2. 编译更新后的作业,示例如下:
+```java
+ JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+ .hostname("yourHostname")
+ .port(1521)
+ .databaseList("ORCLCDB")
+ .schemaList("INVENTORY")
+ .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS",
"INVENTORY.ORDER", "INVENTORY.CUSTOM") // 设置捕获的表 [PRODUCT, USER, ADDRESS,
ORDER, CUSTOM]
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true)
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**注意:** 请参考文档 [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
了解更多详细信息。
+
### DataStream Source
The Oracle CDC connector can also be a DataStream source. There are two modes
for the DataStream source:
diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index a9a5b560b..44f5fca41 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -510,6 +510,71 @@ The config option `scan.startup.mode` specifies the
startup mode for PostgreSQL
- `committed-offset`: Skip snapshot phase and start reading events from a
`confirmed_flush_lsn` offset of replication slot.
- `snapshot`: Only the snapshot phase is performed and exits after the
snapshot phase reading is completed.
+### 动态加表
+
+**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。
+
+动态加表功能使你可以为正在运行的作业添加新表进行监控。新添加的表将首先读取其快照数据,然后自动读取其 WAL (Write-Ahead Log) 日志 或者
replication slot changes 复制槽。
+
+想象一下这个场景:一开始,Flink 作业监控表 `[product, user, address]`,但几天后,我们希望这个作业还可以监控表
`[order, custom]`,这些表包含历史数据,我们需要作业仍然可以复用作业的已有状态。动态加表功能可以优雅地解决此问题。
+
+以下操作显示了如何启用此功能来解决上述场景。使用现有的 PostgreSQL CDC Source 作业,如下:
+
+```java
+ JdbcIncrementalSource<String> postgresSource =
+ PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+ .hostname("yourHostname")
+ .port(5432)
+ .database("postgres") // 设置捕获的数据库
+ .schemaList("inventory") // 设置捕获的 schema
+ .tableList("inventory.product", "inventory.user",
"inventory.address") // 设置捕获的表
+ .username("yourUsername")
+ .password("yourPassword")
+ .slotName("flink")
+ .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+
+如果我们想添加新表 `[inventory.order, inventory.custom]` 到现有的 Flink 作业,只需更新作业的
`tableList()` 将新增表 `[inventory.order, inventory.custom]` 加入并从已有的 savepoint 恢复作业。
+
+_Step 1_: 使用 savepoint 停止现有的 Flink 作业。
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: 更新现有 Flink 作业的表列表选项。
+1. 更新 `tableList()` 参数。
+2. 编译更新后的作业,示例如下:
+```java
+ JdbcIncrementalSource<String> postgresSource =
+ PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+ .hostname("yourHostname")
+ .port(5432)
+ .database("postgres")
+ .schemaList("inventory")
+ .tableList("inventory.product", "inventory.user",
"inventory.address", "inventory.order", "inventory.custom") // 设置捕获的表 [product,
user, address, order, custom]
+ .username("yourUsername")
+ .password("yourPassword")
+ .slotName("flink")
+ .scanNewlyAddedTableEnabled(true)
+ .deserializer(new JsonDebeziumDeserializationSchema()) // 将
SourceRecord 转换为 JSON 字符串
+ .build();
+ // 你的业务代码
+```
+_Step 3_: 从 savepoint 还原更新后的 Flink 作业。
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**注意:** 请参考文档 [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
了解更多详细信息。
+
### DataStream Source
The Postgres CDC connector can also be a DataStream source. There are two
modes for the DataStream source:
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index 98353a3b8..80ecad42f 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all
data changes on a single
By the way, Debezium's MongoDB change streams exploration mentioned by
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br>
If it's done, we can consider integrating two kinds of source connector for
users to choose.
+### Scan Newly Added Collections
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+The Scan Newly Added Collections feature enables you to add new collections to
monitor for existing running pipeline. The newly added collections will read
their snapshot data firstly and then read their change stream automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors collections
`[product, user, address]`, but after some days we would like the job can also
monitor collections `[order, custom]` which contain history data, and we need
the job can still reuse existing state of the job. This feature can resolve
this case gracefully.
+
+The following operations show how to enable this feature to resolve above
scenario. An existing Flink job which uses MongoDB CDC Source like:
+
+```java
+ MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+ .hosts("yourHostname:27017")
+ .databaseList("db") // set captured database
+ .collectionList("db.product", "db.user", "db.address") // set captured
collections
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true) // enable scan the newly added
collections feature
+ .deserializer(new JsonDebeziumDeserializationSchema()) // converts
SourceRecord to JSON String
+ .build();
+ // your business code
+```
+
+If we would like to add new collections `[order, custom]` to an existing Flink
job, we just need to update the `collectionList()` value of the job to include
`[order, custom]` and restore the job from previous savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the collection list option for the existing Flink job.
+1. update `collectionList()` value.
+2. build the jar of updated job.
+```java
+ MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+ .hosts("yourHostname:27017")
+ .databaseList("db")
+ .collectionList("db.product", "db.user", "db.address", "db.order",
"db.custom") // set captured collections [product, user, address, order, custom]
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true) // enable scan newly added tables
feature
+ .deserializer(new JsonDebeziumDeserializationSchema()) // converts
SourceRecord to JSON String
+ .build();
+ // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**Note:** Please refer the doc [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
for more details.
+
### DataStream Source
The MongoDB CDC connector can also be a DataStream source. You can create a
SourceFunction as the following shows:
diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
index 260e9635a..f61eb0f9a 100644
--- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying
on Debezium's `snapsh
The Oracle CDC source can't work in parallel reading, because there is only
one task can receive change events.
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for
an existing running pipeline. The newly added tables will read their snapshot
data first and then read their redo log automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables
`[product, user, address]`, but after some days we would like the job to also
monitor tables `[order, custom]` which contain historical data, and we need the
job to still reuse existing state of the job. This feature can resolve this
case gracefully.
+
+The following operations show how to enable this feature to resolve above
scenario. An existing Flink job which uses Oracle CDC Source like:
+
+```java
+ JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+ .hostname("yourHostname")
+ .port(1521)
+ .databaseList("ORCLCDB") // set captured database
+ .schemaList("INVENTORY") // set captured schema
+ .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS")
// set captured tables
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true) // enable scan newly added tables
feature
+ .deserializer(new JsonDebeziumDeserializationSchema()) // converts
SourceRecord to JSON String
+ .build();
+ // your business code
+```
+
+If we would like to add new tables `[INVENTORY.ORDER, INVENTORY.CUSTOM]` to an
existing Flink job, we just need to update the `tableList()` value of the job
to include `[INVENTORY.ORDER, INVENTORY.CUSTOM]` and restore the job from
previous savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the table list option for the existing Flink job.
+1. update `tableList()` value.
+2. build the jar of updated job.
+```java
+ JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+ .hostname("yourHostname")
+ .port(1521)
+ .databaseList("ORCLCDB")
+ .schemaList("INVENTORY")
+ .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS",
"INVENTORY.ORDER", "INVENTORY.CUSTOM") // set captured tables [PRODUCT, USER,
ADDRESS, ORDER, CUSTOM]
+ .username("yourUsername")
+ .password("yourPassword")
+ .scanNewlyAddedTableEnabled(true)
+ .deserializer(new JsonDebeziumDeserializationSchema()) // converts
SourceRecord to JSON String
+ .build();
+ // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**Note:** Please refer the doc [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
for more details.
+
### DataStream Source
The Oracle CDC connector can also be a DataStream source. There are two modes
for the DataStream source:
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index a0477e54c..d14549b27 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the
startup mode for PostgreSQL
- `committed-offset`: Skip snapshot phase and start reading events from a
`confirmed_flush_lsn` offset of replication slot.
- `snapshot`: Only the snapshot phase is performed and exits after the
snapshot phase reading is completed.
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for
existing running pipeline. The newly added tables will read their snapshot data
firstly and then read their WAL (Write-Ahead Log) or replication slot changes
automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables
`[product, user, address]`, but after some days we would like the job to also
monitor tables `[order, custom]` which contain historical data, and we need the
job to still reuse existing state of the job. This feature can resolve this
case gracefully.
+
+The following operations show how to enable this feature to resolve above
scenario. An existing Flink job which uses PostgreSQL CDC Source like:
+
+```java
+ JdbcIncrementalSource<String> postgresSource =
+ PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+ .hostname("yourHostname")
+ .port(5432)
+ .database("postgres") // set captured database
+ .schemaList("inventory") // set captured schema
+ .tableList("inventory.product", "inventory.user",
"inventory.address") // set captured tables
+ .username("yourUsername")
+ .password("yourPassword")
+ .slotName("flink")
+ .scanNewlyAddedTableEnabled(true) // enable scan newly added
tables feature
+ .deserializer(new JsonDebeziumDeserializationSchema()) //
converts SourceRecord to JSON String
+ .build();
+ // your business code
+```
+
+If we would like to add new tables `[inventory.order, inventory.custom]` to an
existing Flink job, we just need to update the `tableList()` value of the job
to include `[inventory.order, inventory.custom]` and restore the job from
previous savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the table list option for the existing Flink job.
+1. update `tableList()` value.
+2. build the jar of updated job.
+```java
+ JdbcIncrementalSource<String> postgresSource =
+ PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+ .hostname("yourHostname")
+ .port(5432)
+ .database("postgres")
+ .schemaList("inventory")
+ .tableList("inventory.product", "inventory.user",
"inventory.address", "inventory.order", "inventory.custom") // set captured
tables [product, user, address, order, custom]
+ .username("yourUsername")
+ .password("yourPassword")
+ .slotName("flink")
+ .scanNewlyAddedTableEnabled(true)
+ .deserializer(new JsonDebeziumDeserializationSchema()) //
converts SourceRecord to JSON String
+ .build();
+ // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+ --detached \
+ --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+ ./FlinkCDCExample.jar
+```
+**Note:** Please refer to the doc [Restore the job from previous
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
for more details.
+
### DataStream Source
The Postgres CDC connector can also be a DataStream source. There are two
modes for the DataStream source: