Copilot commented on code in PR #4247:
URL: https://github.com/apache/flink-cdc/pull/4247#discussion_r2745652422


##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses Oracle CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+        .hostname("yourHostname")
+        .port(1521)
+        .databaseList("ORCLCDB") // set captured database
+        .schemaList("INVENTORY") // set captured schema
+        .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS") 
// set captured tables
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature

Review Comment:
   The comment refers to "scan the newly added tables feature" but should be 
"scan newly added tables feature" (remove the article "the" before "newly") to 
be consistent with the section title and standard English usage.
   ```suggestion
           .scanNewlyAddedTableEnabled(true) // enable scan newly added tables 
feature
   ```



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.

Review Comment:
   The description mentions "binlog" which is MySQL-specific terminology. For 
PostgreSQL, the correct term should be "WAL (Write-Ahead Log)" or "replication 
slot changes". PostgreSQL doesn't use binlog.



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses PostgreSQL CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> postgresSource =
+            PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+                .hostname("yourHostname")
+                .port(5432)
+                .database("postgres") // set captured database
+                .schemaList("inventory") // set captured schema
+                .tableList("inventory.product", "inventory.user", 
"inventory.address") // set captured tables
+                .username("yourUsername")
+                .password("yourPassword")
+                .slotName("flink")
+                .scanNewlyAddedTableEnabled(true) // enable scan the newly 
added tables feature
+                .deserializer(new JsonDebeziumDeserializationSchema()) // 
converts SourceRecord to JSON String
+                .build();
+   // your business code
+```
+
+If we would like to add new tables `[inventory.order, inventory.custom]` to an 
existing Flink job, just need to update the `tableList()` value of the job to 
include `[inventory.order, inventory.custom]` and restore the job from previous 
savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path: 
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the table list option for the existing Flink job.
+1. update `tableList()` value.
+2. build the jar of updated job.
+```java
+    JdbcIncrementalSource<String> postgresSource =
+            PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+                .hostname("yourHostname")
+                .port(5432)
+                .database("postgres")
+                .schemaList("inventory")
+                .tableList("inventory.product", "inventory.user", 
"inventory.address", "inventory.order", "inventory.custom") // set captured 
tables [product, user, address, order, custom]
+                .username("yourUsername")
+                .password("yourPassword")
+                .slotName("flink")
+                .scanNewlyAddedTableEnabled(true)
+                .deserializer(new JsonDebeziumDeserializationSchema()) // 
converts SourceRecord to JSON String
+                .build();
+   // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+      --detached \
+      --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+      ./FlinkCDCExample.jar
+```
+**Note:** Please refer the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.

Review Comment:
   The phrase "Please refer the doc" is missing the preposition "to". It should 
be "Please refer to the doc".
   ```suggestion
   **Note:** Please refer to the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.
   ```



##########
docs/content/docs/connectors/flink-sources/mongodb-cdc.md:
##########
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all 
data changes on a single
 By the way, Debezium's MongoDB change streams exploration mentioned by 
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> 
 If it's done, we can consider integrating two kinds of source connector for 
users to choose.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new collections to monitor 
for existing running pipeline. The newly added collections will read their 
snapshot data firstly and then read their change stream automatically.

Review Comment:
   The adverb "firstly" should be "first" in standard American English usage. 
While "firstly" is grammatically correct in British English, the documentation 
appears to follow American English conventions.
   ```suggestion
   Scan Newly Added Tables feature enables you to add new collections to 
monitor for existing running pipeline. The newly added collections will read 
their snapshot data first and then read their change stream automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.

Review Comment:
   The description mentions "redo log" which is Oracle-specific terminology. 
However, the description should clarify that for Oracle, changes are read from 
the redo logs specifically (not just "redo log" in singular). Consider using 
"redo logs" (plural) for accuracy.



##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.

Review Comment:
   The phrase "for existing running pipeline" is missing an article. It should 
be "for an existing running pipeline" or "for the existing running pipeline".
   ```suggestion
   Scan Newly Added Tables feature enables you to add new tables to monitor for 
an existing running pipeline. The newly added tables will read their snapshot 
data firstly and then read their redo log automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses PostgreSQL CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> postgresSource =
+            PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+                .hostname("yourHostname")
+                .port(5432)
+                .database("postgres") // set captured database
+                .schemaList("inventory") // set captured schema
+                .tableList("inventory.product", "inventory.user", 
"inventory.address") // set captured tables
+                .username("yourUsername")
+                .password("yourPassword")
+                .slotName("flink")
+                .scanNewlyAddedTableEnabled(true) // enable scan the newly 
added tables feature

Review Comment:
   The comment refers to this as "scan the newly added tables feature" but 
should be more concise as "scan newly added tables feature" (remove the article 
"the" before "newly") to match standard English usage and be consistent with 
the section title.
   ```suggestion
                   .scanNewlyAddedTableEnabled(true) // enable scan newly added 
tables feature
   ```



##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.

Review Comment:
   The adverb "firstly" should be "first" in standard American English usage. 
While "firstly" is grammatically correct in British English, the documentation 
appears to follow American English conventions.
   ```suggestion
   Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
first and then read their redo log automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.

Review Comment:
   The phrase "for existing running pipeline" is missing an article. It should 
be "for an existing running pipeline" or "for the existing running pipeline".
   ```suggestion
   Scan Newly Added Tables feature enables you to add new tables to monitor for 
an existing running pipeline. The newly added tables will read their snapshot 
data firstly and then read their binlog automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/mongodb-cdc.md:
##########
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all 
data changes on a single
 By the way, Debezium's MongoDB change streams exploration mentioned by 
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> 
 If it's done, we can consider integrating two kinds of source connector for 
users to choose.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new collections to monitor 
for existing running pipeline. The newly added collections will read their 
snapshot data firstly and then read their change stream automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors collections 
`[product, user, address]`, but after some days we would like the job can also 
monitor collections `[order, custom]` which contain history data, and we need 
the job can still reuse existing state of the job. This feature can resolve 
this case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses MongoDB CDC Source like:
+
+```java
+    MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+        .hosts("yourHostname:27017")
+        .databaseList("db") // set captured database
+        .collectionList("db.product", "db.user", "db.address") // set captured 
collections
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature

Review Comment:
   The comment refers to "scan the newly added tables feature" but should be 
"scan newly added tables feature" (remove the article "the" before "newly") to 
be consistent with the section title and standard English usage.
   ```suggestion
           .scanNewlyAddedTableEnabled(true) // enable scan newly added tables 
feature
   ```



##########
docs/content/docs/connectors/flink-sources/mongodb-cdc.md:
##########
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all 
data changes on a single
 By the way, Debezium's MongoDB change streams exploration mentioned by 
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> 
 If it's done, we can consider integrating two kinds of source connector for 
users to choose.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new collections to monitor 
for existing running pipeline. The newly added collections will read their 
snapshot data firstly and then read their change stream automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors collections 
`[product, user, address]`, but after some days we would like the job can also 
monitor collections `[order, custom]` which contain history data, and we need 
the job can still reuse existing state of the job. This feature can resolve 
this case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses MongoDB CDC Source like:
+
+```java
+    MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+        .hosts("yourHostname:27017")
+        .databaseList("db") // set captured database
+        .collectionList("db.product", "db.user", "db.address") // set captured 
collections
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+
+If we would like to add new collections `[order, custom]` to an existing Flink 
job, just need to update the `collectionList()` value of the job to include 
`[order, custom]` and restore the job from previous savepoint.

Review Comment:
   This sentence is missing a subject. It should read "we just need to update" 
instead of "just need to update". The sentence currently reads "If we would 
like to add new collections... just need to update" which is grammatically 
incorrect.
   ```suggestion
   If we would like to add new collections `[order, custom]` to an existing 
Flink job, we just need to update the `collectionList()` value of the job to 
include `[order, custom]` and restore the job from previous savepoint.
   ```



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses PostgreSQL CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> postgresSource =
+            PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
+                .hostname("yourHostname")
+                .port(5432)
+                .database("postgres") // set captured database
+                .schemaList("inventory") // set captured schema
+                .tableList("inventory.product", "inventory.user", 
"inventory.address") // set captured tables
+                .username("yourUsername")
+                .password("yourPassword")
+                .slotName("flink")
+                .scanNewlyAddedTableEnabled(true) // enable scan the newly 
added tables feature
+                .deserializer(new JsonDebeziumDeserializationSchema()) // 
converts SourceRecord to JSON String
+                .build();
+   // your business code
+```
+
+If we would like to add new tables `[inventory.order, inventory.custom]` to an 
existing Flink job, just need to update the `tableList()` value of the job to 
include `[inventory.order, inventory.custom]` and restore the job from previous 
savepoint.

Review Comment:
   This sentence is missing a subject. It should read "we just need to update" 
instead of "just need to update". The sentence currently reads "If we would 
like to add new tables... just need to update" which is grammatically incorrect.
   ```suggestion
   If we would like to add new tables `[inventory.order, inventory.custom]` to 
an existing Flink job, we just need to update the `tableList()` value of the 
job to include `[inventory.order, inventory.custom]` and restore the job from 
previous savepoint.
   ```



##########
docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -510,6 +510,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### 动态加表
+
+**注意:** 该功能从 Flink CDC 3.1.0 版本开始支持。
+
+动态加表功能使你可以为正在运行的作业添加新表进行监控。新添加的表将首先读取其快照数据,然后自动读取其 binlog。

Review Comment:
   The description mentions "binlog" which is MySQL-specific terminology. For 
PostgreSQL, the correct term should be "WAL (Write-Ahead Log)" or "replication 
slot changes" or "变更日志". PostgreSQL doesn't use binlog.



##########
docs/content/docs/connectors/flink-sources/mongodb-cdc.md:
##########
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all 
data changes on a single
 By the way, Debezium's MongoDB change streams exploration mentioned by 
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> 
 If it's done, we can consider integrating two kinds of source connector for 
users to choose.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new collections to monitor 
for existing running pipeline. The newly added collections will read their 
snapshot data firstly and then read their change stream automatically.

Review Comment:
   The phrase "for existing running pipeline" is missing an article. It should 
be "for an existing running pipeline" or "for the existing running pipeline".
   ```suggestion
   The Scan Newly Added Tables feature enables you to add new collections to 
monitor for an existing running pipeline. The newly added collections will read 
their snapshot data firstly and then read their change stream automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses Oracle CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+        .hostname("yourHostname")
+        .port(1521)
+        .databaseList("ORCLCDB") // set captured database
+        .schemaList("INVENTORY") // set captured schema
+        .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS") 
// set captured tables
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+
+If we would like to add new tables `[INVENTORY.ORDER, INVENTORY.CUSTOM]` to an 
existing Flink job, just need to update the `tableList()` value of the job to 
include `[INVENTORY.ORDER, INVENTORY.CUSTOM]` and restore the job from previous 
savepoint.

Review Comment:
   This sentence is missing a subject. It should read "we just need to update" 
instead of "just need to update". The sentence currently reads "If we would 
like to add new tables... just need to update" which is grammatically incorrect.
   ```suggestion
   If we would like to add new tables `[INVENTORY.ORDER, INVENTORY.CUSTOM]` to 
an existing Flink job, we just need to update the `tableList()` value of the 
job to include `[INVENTORY.ORDER, INVENTORY.CUSTOM]` and restore the job from 
previous savepoint.
   ```



##########
docs/content/docs/connectors/flink-sources/oracle-cdc.md:
##########
@@ -559,6 +559,67 @@ _Note: the mechanism of `scan.startup.mode` option relying 
on Debezium's `snapsh
 
 The Oracle CDC source can't work in parallel reading, because there is only 
one task can receive change events.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their redo log automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors tables 
`[product, user, address]`, but after some days we would like the job can also 
monitor tables `[order, custom]` which contain history data, and we need the 
job can still reuse existing state of the job. This feature can resolve this 
case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses Oracle CDC Source like:
+
+```java
+    JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+        .hostname("yourHostname")
+        .port(1521)
+        .databaseList("ORCLCDB") // set captured database
+        .schemaList("INVENTORY") // set captured schema
+        .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS") 
// set captured tables
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+
+If we would like to add new tables `[INVENTORY.ORDER, INVENTORY.CUSTOM]` to an 
existing Flink job, just need to update the `tableList()` value of the job to 
include `[INVENTORY.ORDER, INVENTORY.CUSTOM]` and restore the job from previous 
savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path: 
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the table list option for the existing Flink job.
+1. update `tableList()` value.
+2. build the jar of updated job.
+```java
+    JdbcIncrementalSource<String> oracleSource = new OracleSourceBuilder()
+        .hostname("yourHostname")
+        .port(1521)
+        .databaseList("ORCLCDB")
+        .schemaList("INVENTORY")
+        .tableList("INVENTORY.PRODUCT", "INVENTORY.USER", "INVENTORY.ADDRESS", 
"INVENTORY.ORDER", "INVENTORY.CUSTOM") // set captured tables [PRODUCT, USER, 
ADDRESS, ORDER, CUSTOM]
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true)
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+      --detached \
+      --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+      ./FlinkCDCExample.jar
+```
+**Note:** Please refer the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.

Review Comment:
   The phrase "Please refer the doc" is missing the preposition "to". It should 
be "Please refer to the doc".
   ```suggestion
   **Note:** Please refer to the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.
   ```



##########
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##########
@@ -511,6 +511,71 @@ The config option `scan.startup.mode` specifies the 
startup mode for PostgreSQL
 - `committed-offset`: Skip snapshot phase and start reading events from a 
`confirmed_flush_lsn` offset of replication slot.
 - `snapshot`: Only the snapshot phase is performed and exits after the 
snapshot phase reading is completed.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
firstly and then read their binlog automatically.

Review Comment:
   The adverb "firstly" should be "first" in standard American English usage. 
While "firstly" is grammatically correct in British English, the documentation 
appears to follow American English conventions.
   ```suggestion
   Scan Newly Added Tables feature enables you to add new tables to monitor for 
existing running pipeline. The newly added tables will read their snapshot data 
first and then read their binlog automatically.
   ```



##########
docs/content/docs/connectors/flink-sources/mongodb-cdc.md:
##########
@@ -512,6 +512,63 @@ Applications can use change streams to subscribe to all 
data changes on a single
 By the way, Debezium's MongoDB change streams exploration mentioned by 
[DBZ-435](https://issues.redhat.com/browse/DBZ-435) is on roadmap.<br> 
 If it's done, we can consider integrating two kinds of source connector for 
users to choose.
 
+### Scan Newly Added Tables
+
+**Note:** This feature is available since Flink CDC 3.1.0.
+
+Scan Newly Added Tables feature enables you to add new collections to monitor 
for existing running pipeline. The newly added collections will read their 
snapshot data firstly and then read their change stream automatically.
+
+Imagine this scenario: At the beginning, a Flink job monitors collections 
`[product, user, address]`, but after some days we would like the job can also 
monitor collections `[order, custom]` which contain history data, and we need 
the job can still reuse existing state of the job. This feature can resolve 
this case gracefully.
+
+The following operations show how to enable this feature to resolve above 
scenario. An existing Flink job which uses MongoDB CDC Source like:
+
+```java
+    MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+        .hosts("yourHostname:27017")
+        .databaseList("db") // set captured database
+        .collectionList("db.product", "db.user", "db.address") // set captured 
collections
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true) // enable scan the newly added 
tables feature
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+
+If we would like to add new collections `[order, custom]` to an existing Flink 
job, just need to update the `collectionList()` value of the job to include 
`[order, custom]` and restore the job from previous savepoint.
+
+_Step 1_: Stop the existing Flink job with savepoint.
+```shell
+$ ./bin/flink stop $Existing_Flink_JOB_ID
+```
+```shell
+Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
+Savepoint completed. Path: 
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
+```
+_Step 2_: Update the collection list option for the existing Flink job.
+1. update `collectionList()` value.
+2. build the jar of updated job.
+```java
+    MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
+        .hosts("yourHostname:27017")
+        .databaseList("db")
+        .collectionList("db.product", "db.user", "db.address", "db.order", 
"db.custom") // set captured collections [product, user, address, order, custom]
+        .username("yourUsername")
+        .password("yourPassword")
+        .scanNewlyAddedTableEnabled(true)
+        .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+        .build();
+   // your business code
+```
+_Step 3_: Restore the updated Flink job from savepoint.
+```shell
+$ ./bin/flink run \
+      --detached \
+      --from-savepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
+      ./FlinkCDCExample.jar
+```
+**Note:** Please refer the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.

Review Comment:
   The phrase "Please refer the doc" is missing the preposition "to". It should 
be "Please refer to the doc".
   ```suggestion
   **Note:** Please refer to the doc [Restore the job from previous 
savepoint](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/cli/#command-line-interface)
 for more details.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to