This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d1eb0aff [FLINK-34990][cdc-connector][oracle] Oracle cdc support 
newly add table (#3203)
2d1eb0aff is described below

commit 2d1eb0aff1b5eba5e68d311e9ea45295fcb3bd64
Author: Xin Gong <[email protected]>
AuthorDate: Tue Jul 16 09:49:09 2024 +0800

    [FLINK-34990][cdc-connector][oracle] Oracle cdc support newly add table 
(#3203)
    
    * [cdc-connector][oracle] Oracle cdc support newly add table
    
    * [cdc-connector][oracle] Fix code style
    
    * [cdc-connector][oracle] Address comment
---
 .../oracle/source/OracleSourceBuilder.java         |   6 +
 .../oracle/source/config/OracleSourceConfig.java   |   5 +-
 .../source/config/OracleSourceConfigFactory.java   |   3 +-
 .../connectors/oracle/table/OracleTableSource.java |  15 +-
 .../oracle/table/OracleTableSourceFactory.java     |   6 +-
 .../oracle/source/NewlyAddedTableITCase.java       | 908 +++++++++++++++++++++
 .../oracle/source/OracleSourceITCase.java          |  55 +-
 .../oracle/source/OracleSourceTestBase.java        |   2 +-
 .../oracle/table/OracleTableSourceFactoryTest.java |  21 +-
 .../oracle/testutils/OracleTestUtils.java          | 124 +++
 10 files changed, 1078 insertions(+), 67 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
index 9a740b15b..0560ffb76 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
@@ -243,6 +243,12 @@ public class OracleSourceBuilder<T> {
         return this;
     }
 
+    /** Whether the {@link OracleIncrementalSource} should scan the newly 
added tables or not. */
+    public OracleSourceBuilder<T> scanNewlyAddedTableEnabled(boolean 
scanNewlyAddedTableEnabled) {
+        
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+        return this;
+    }
+
     /**
      * Build the {@link OracleIncrementalSource}.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
index 2c007fe54..19386290a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
@@ -63,7 +63,8 @@ public class OracleSourceConfig extends JdbcSourceConfig {
             int connectMaxRetries,
             int connectionPoolSize,
             String chunkKeyColumn,
-            boolean skipSnapshotBackfill) {
+            boolean skipSnapshotBackfill,
+            boolean scanNewlyAddedTableEnabled) {
         super(
                 startupOptions,
                 databaseList,
@@ -89,7 +90,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
                 connectionPoolSize,
                 chunkKeyColumn,
                 skipSnapshotBackfill,
-                false);
+                scanNewlyAddedTableEnabled);
         this.url = url;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
index cf01b849e..974cedf4a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
@@ -133,6 +133,7 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 connectMaxRetries,
                 connectionPoolSize,
                 chunkKeyColumn,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                scanNewlyAddedTableEnabled);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
index 9487a0968..5903ce41e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
@@ -79,6 +79,7 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
     private final String chunkKeyColumn;
     private final boolean closeIdleReaders;
     private final boolean skipSnapshotBackfill;
+    private final boolean scanNewlyAddedTableEnabled;
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -113,7 +114,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
             double distributionFactorLower,
             @Nullable String chunkKeyColumn,
             boolean closeIdleReaders,
-            boolean skipSnapshotBackfill) {
+            boolean skipSnapshotBackfill,
+            boolean scanNewlyAddedTableEnabled) {
         this.physicalSchema = physicalSchema;
         this.url = url;
         this.port = port;
@@ -139,6 +141,7 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.chunkKeyColumn = chunkKeyColumn;
         this.closeIdleReaders = closeIdleReaders;
         this.skipSnapshotBackfill = skipSnapshotBackfill;
+        this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
     }
 
     @Override
@@ -187,6 +190,7 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                             .closeIdleReaders(closeIdleReaders)
                             .skipSnapshotBackfill(skipSnapshotBackfill)
                             .chunkKeyColumn(chunkKeyColumn)
+                            
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
                             .build();
 
             return SourceProvider.of(oracleChangeEventSource);
@@ -252,7 +256,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                         distributionFactorLower,
                         chunkKeyColumn,
                         closeIdleReaders,
-                        skipSnapshotBackfill);
+                        skipSnapshotBackfill,
+                        scanNewlyAddedTableEnabled);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -291,7 +296,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 && Objects.equals(distributionFactorLower, 
that.distributionFactorLower)
                 && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
                 && Objects.equals(closeIdleReaders, that.closeIdleReaders)
-                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill);
+                && Objects.equals(skipSnapshotBackfill, 
that.skipSnapshotBackfill)
+                && Objects.equals(scanNewlyAddedTableEnabled, 
that.scanNewlyAddedTableEnabled);
     }
 
     @Override
@@ -321,7 +327,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                 distributionFactorLower,
                 chunkKeyColumn,
                 closeIdleReaders,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                scanNewlyAddedTableEnabled);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 5f533d8f2..70a63b3ae 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -47,6 +47,7 @@ import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_M
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -106,6 +107,7 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
 
         boolean closeIdlerReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+        boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -142,7 +144,8 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
                 distributionFactorLower,
                 chunkKeyColumn,
                 closeIdlerReaders,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                scanNewlyAddedTableEnabled);
     }
 
     @Override
@@ -180,6 +183,7 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+        options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
new file mode 100644
index 000000000..0cbbb8a57
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
+import 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.getTableNameRegex;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForUpsertSinkSize;
+
+/** IT tests to cover various newly added tables during capture process. */
+public class NewlyAddedTableITCase extends OracleSourceTestBase {
+    @Rule public final Timeout timeoutPerTest = Timeout.seconds(600);
+
+    private final ScheduledExecutorService mockRedoLogExecutor =
+            Executors.newScheduledThreadPool(1);
+
+    @BeforeClass
+    public static void beforeClass() throws SQLException {
+        try (Connection dbaConnection = getJdbcConnectionAsDBA();
+                Statement dbaStatement = dbaConnection.createStatement()) {
+            dbaStatement.execute("ALTER DATABASE ADD SUPPLEMENTAL LOG DATA 
(ALL) COLUMNS");
+        }
+    }
+
+    @Before
+    public void before() throws Exception {
+        TestValuesTableFactory.clearAllData();
+        createAndInitialize("customer.sql");
+        try (Connection connection = getJdbcConnection()) {
+            Statement statement = connection.createStatement();
+            connection.setAutoCommit(false);
+            // prepare initial data for given table
+            String tableId = ORACLE_SCHEMA + ".PRODUCE_LOG_TABLE";
+            statement.execute(
+                    format(
+                            "CREATE TABLE %s ( ID NUMBER(19), CNT NUMBER(19), 
PRIMARY KEY(ID))",
+                            tableId));
+            statement.execute(format("INSERT INTO  %s VALUES (0, 100)", 
tableId));
+            statement.execute(format("INSERT INTO  %s VALUES (1, 101)", 
tableId));
+            statement.execute(format("INSERT INTO  %s VALUES (2, 102)", 
tableId));
+            connection.commit();
+
+            // mock continuous redo log during the newly added table capturing 
process
+            mockRedoLogExecutor.schedule(
+                    () -> {
+                        try {
+                            executeSql(format("UPDATE %s SET  CNT = CNT +1 
WHERE ID < 2", tableId));
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    },
+                    500,
+                    TimeUnit.MICROSECONDS);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        mockRedoLogExecutor.shutdown();
+        // sleep 1000ms to wait until connections are closed.
+        Thread.sleep(1000L);
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineOnceWithAheadRedoLog() 
throws Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLog() 
throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void 
testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLogAndAutoCloseReader()
+            throws Exception {
+        Map<String, String> otherOptions = new HashMap<>();
+        otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                otherOptions,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI",
+                "ADDRESS_SHENZHEN");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineThriceWithAheadRedoLog() 
throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI",
+                "ADDRESS_SHENZHEN");
+    }
+
+    @Test
+    public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws 
Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void 
testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadRedoLog()
+            throws Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.JM,
+                FailoverPhase.SNAPSHOT,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testJobManagerFailoverForNewlyAddedTableWithAheadRedoLog() 
throws Exception {
+        testNewlyAddedTableOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.JM,
+                FailoverPhase.SNAPSHOT,
+                true,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.TM,
+                FailoverPhase.REDO_LOG,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testTaskManagerFailoverForNewlyAddedTableWithAheadRedoLog() 
throws Exception {
+        testNewlyAddedTableOneByOne(
+                1,
+                FailoverType.TM,
+                FailoverPhase.REDO_LOG,
+                false,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING");
+    }
+
+    @Test
+    public void testJobManagerFailoverForRemoveTableSingleParallelism() throws 
Exception {
+        testRemoveTablesOneByOne(
+                1,
+                FailoverType.JM,
+                FailoverPhase.SNAPSHOT,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testJobManagerFailoverForRemoveTable() throws Exception {
+        testRemoveTablesOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.JM,
+                FailoverPhase.SNAPSHOT,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testTaskManagerFailoverForRemoveTableSingleParallelism() 
throws Exception {
+        testRemoveTablesOneByOne(
+                1,
+                FailoverType.TM,
+                FailoverPhase.SNAPSHOT,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testTaskManagerFailoverForRemoveTable() throws Exception {
+        testRemoveTablesOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.TM,
+                FailoverPhase.SNAPSHOT,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testRemoveTableSingleParallelism() throws Exception {
+        testRemoveTablesOneByOne(
+                1,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testRemoveTable() throws Exception {
+        testRemoveTablesOneByOne(
+                DEFAULT_PARALLELISM,
+                FailoverType.NONE,
+                FailoverPhase.NEVER,
+                "ADDRESS_HANGZHOU",
+                "ADDRESS_BEIJING",
+                "ADDRESS_SHANGHAI");
+    }
+
+    @Test
+    public void testRemoveAndAddTablesOneByOne() throws Exception {
+        testRemoveAndAddTablesOneByOne(
+                1, "ADDRESS_HANGZHOU", "ADDRESS_BEIJING", "ADDRESS_SHANGHAI");
+    }
+
+    private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
captureAddressTables)
+            throws Exception {
+
+        Connection connection = getJdbcConnection();
+        // step 1: create tables with all tables included
+        initialAddressTables(connection, captureAddressTables);
+
+        final TemporaryFolder temporaryFolder = new TemporaryFolder();
+        temporaryFolder.create();
+        final String savepointDirectory = 
temporaryFolder.newFolder().toURI().toString();
+
+        // get all expected data
+        List<String> fetchedDataList = new ArrayList<>();
+
+        String finishedSavePointPath = null;
+        // test removing and adding table one by one
+        for (int round = 0; round < captureAddressTables.length; round++) {
+            String captureTableThisRound = captureAddressTables[round];
+            String cityName = captureTableThisRound.split("_")[1];
+            StreamExecutionEnvironment env =
+                    
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+            String createTableStatement =
+                    getCreateTableStatement(new HashMap<>(), 
captureTableThisRound);
+            tEnv.executeSql(createTableStatement);
+            tEnv.executeSql(
+                    "CREATE TABLE sink ("
+                            + " TABLE_NAME STRING,"
+                            + " ID BIGINT,"
+                            + " COUNTRY STRING,"
+                            + " CITY STRING,"
+                            + " DETAIL_ADDRESS STRING,"
+                            + " primary key (CITY, ID) not enforced"
+                            + ") WITH ("
+                            + " 'connector' = 'values',"
+                            + " 'sink-insert-only' = 'false'"
+                            + ")");
+            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
+            JobClient jobClient = tableResult.getJobClient().get();
+
+            // this round's snapshot data
+            fetchedDataList.addAll(
+                    Arrays.asList(
+                            format(
+                                    "+I[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
+                                    captureTableThisRound, cityName, cityName),
+                            format(
+                                    "+I[%s, 416927583791428523, China, %s, %s 
West Town address 2]",
+                                    captureTableThisRound, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614379, China, %s, %s 
West Town address 3]",
+                                    captureTableThisRound, cityName, 
cityName)));
+            waitForSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
+
+            // step 2: make redo log data for all tables before this 
round(also includes this
+            // round),
+            // test whether only this round table's data is captured.
+            for (int i = 0; i <= round; i++) {
+                String tableName = captureAddressTables[i];
+                makeRedoLogForAddressTableInRound(tableName, round);
+            }
+            // this round's redo log data
+            fetchedDataList.addAll(
+                    Arrays.asList(
+                            format(
+                                    "+U[%s, 416874195632735147, CHINA_%s, %s, 
%s West Town address 1]",
+                                    captureTableThisRound, round, cityName, 
cityName),
+                            format(
+                                    "+I[%s, %d, China, %s, %s West Town 
address 4]",
+                                    captureTableThisRound,
+                                    417022095255614380L + round,
+                                    cityName,
+                                    cityName)));
+
+            // step 3: assert fetched redo log data in this round
+            waitForSinkSize("sink", fetchedDataList.size());
+
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
+            // step 4: trigger savepoint
+            finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+            jobClient.cancel().get();
+        }
+    }
+
+    private void testRemoveTablesOneByOne(
+            int parallelism,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            String... captureAddressTables)
+            throws Exception {
+
+        // step 1: create oracle tables with all tables included
+        initialAddressTables(getJdbcConnection(), captureAddressTables);
+
+        final TemporaryFolder temporaryFolder = new TemporaryFolder();
+        temporaryFolder.create();
+        final String savepointDirectory = 
temporaryFolder.newFolder().toURI().toString();
+
+        // get all expected data
+        List<String> fetchedDataList = new ArrayList<>();
+        for (String table : captureAddressTables) {
+            String cityName = table.split("_")[1];
+            fetchedDataList.addAll(
+                    Arrays.asList(
+                            format(
+                                    "+I[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
+                                    table, cityName, cityName),
+                            format(
+                                    "+I[%s, 416927583791428523, China, %s, %s 
West Town address 2]",
+                                    table, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614379, China, %s, %s 
West Town address 3]",
+                                    table, cityName, cityName)));
+        }
+
+        String finishedSavePointPath = null;
+        // step 2: execute insert and trigger savepoint with all tables added
+        {
+            StreamExecutionEnvironment env =
+                    
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+            String createTableStatement =
+                    getCreateTableStatement(new HashMap<>(), 
captureAddressTables);
+            tEnv.executeSql(createTableStatement);
+            tEnv.executeSql(
+                    "CREATE TABLE sink ("
+                            + " TABLE_NAME STRING,"
+                            + " ID BIGINT,"
+                            + " COUNTRY STRING,"
+                            + " CITY STRING,"
+                            + " DETAIL_ADDRESS STRING,"
+                            + " primary key (CITY, ID) not enforced"
+                            + ") WITH ("
+                            + " 'connector' = 'values',"
+                            + " 'sink-insert-only' = 'false'"
+                            + ")");
+            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
+            JobClient jobClient = tableResult.getJobClient().get();
+
+            // trigger failover after some snapshot data read finished
+            if (failoverPhase == FailoverPhase.SNAPSHOT) {
+                triggerFailover(
+                        failoverType,
+                        jobClient.getJobID(),
+                        miniClusterResource.getMiniCluster(),
+                        () -> sleepMs(100));
+            }
+            waitForSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
+            finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+            jobClient.cancel().get();
+        }
+
+        // test removing table one by one, note that there should be at least 
one table remaining
+        for (int round = 0; round < captureAddressTables.length - 1; round++) {
+            String[] captureTablesThisRound =
+                    Arrays.asList(captureAddressTables)
+                            .subList(round + 1, captureAddressTables.length)
+                            .toArray(new String[0]);
+
+            StreamExecutionEnvironment env =
+                    
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+            String createTableStatement =
+                    getCreateTableStatement(new HashMap<>(), 
captureTablesThisRound);
+            tEnv.executeSql(createTableStatement);
+            tEnv.executeSql(
+                    "CREATE TABLE sink ("
+                            + " TABLE_NAME STRING,"
+                            + " ID BIGINT,"
+                            + " COUNTRY STRING,"
+                            + " CITY STRING,"
+                            + " DETAIL_ADDRESS STRING,"
+                            + " primary key (CITY, ID) not enforced"
+                            + ") WITH ("
+                            + " 'connector' = 'values',"
+                            + " 'sink-insert-only' = 'false'"
+                            + ")");
+            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
+            JobClient jobClient = tableResult.getJobClient().get();
+
+            waitForSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
+
+            // step 3: make redo log data for all tables
+            List<String> expectedRedoLogDataThisRound = new ArrayList<>();
+
+            for (int i = 0, captureAddressTablesLength = 
captureAddressTables.length;
+                    i < captureAddressTablesLength;
+                    i++) {
+                String tableName = captureAddressTables[i];
+                makeRedoLogForAddressTableInRound(tableName, round);
+                if (i <= round) {
+                    continue;
+                }
+                String cityName = tableName.split("_")[1];
+
+                expectedRedoLogDataThisRound.addAll(
+                        Arrays.asList(
+                                format(
+                                        "+U[%s, 416874195632735147, CHINA_%s, 
%s, %s West Town address 1]",
+                                        tableName, round, cityName, cityName),
+                                format(
+                                        "+I[%s, %d, China, %s, %s West Town 
address 4]",
+                                        tableName,
+                                        417022095255614380L + round,
+                                        cityName,
+                                        cityName)));
+            }
+
+            if (failoverPhase == FailoverPhase.REDO_LOG
+                    && TestValuesTableFactory.getRawResults("sink").size()
+                            > fetchedDataList.size()) {
+                triggerFailover(
+                        failoverType,
+                        jobClient.getJobID(),
+                        miniClusterResource.getMiniCluster(),
+                        () -> sleepMs(100));
+            }
+
+            fetchedDataList.addAll(expectedRedoLogDataThisRound);
+            // step 4: assert fetched redo log data in this round
+            waitForSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getRawResults("sink"));
+
+            // step 5: trigger savepoint
+            finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+            jobClient.cancel().get();
+        }
+    }
+
+    private void testNewlyAddedTableOneByOne(
+            int parallelism,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            boolean makeRedoLogBeforeCapture,
+            String... captureAddressTables)
+            throws Exception {
+        testNewlyAddedTableOneByOne(
+                parallelism,
+                new HashMap<>(),
+                failoverType,
+                failoverPhase,
+                makeRedoLogBeforeCapture,
+                captureAddressTables);
+    }
+
+    private void testNewlyAddedTableOneByOne(
+            int parallelism,
+            Map<String, String> sourceOptions,
+            FailoverType failoverType,
+            FailoverPhase failoverPhase,
+            boolean makeRedoLogBeforeCapture,
+            String... captureAddressTables)
+            throws Exception {
+
+        // step 1: create oracle tables with initial data
+        initialAddressTables(getJdbcConnection(), captureAddressTables);
+
+        final TemporaryFolder temporaryFolder = new TemporaryFolder();
+        temporaryFolder.create();
+        final String savepointDirectory = 
temporaryFolder.newFolder().toURI().toString();
+
+        // test newly added table one by one
+        String finishedSavePointPath = null;
+        List<String> fetchedDataList = new ArrayList<>();
+        for (int round = 0; round < captureAddressTables.length; round++) {
+            String[] captureTablesThisRound =
+                    Arrays.asList(captureAddressTables)
+                            .subList(0, round + 1)
+                            .toArray(new String[0]);
+            String newlyAddedTable = captureAddressTables[round];
+            if (makeRedoLogBeforeCapture) {
+                makeRedoLogBeforeCaptureForAddressTable(newlyAddedTable);
+            }
+            StreamExecutionEnvironment env =
+                    
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+            String createTableStatement =
+                    getCreateTableStatement(sourceOptions, 
captureTablesThisRound);
+            tEnv.executeSql(createTableStatement);
+            tEnv.executeSql(
+                    "CREATE TABLE sink ("
+                            + " TABLE_NAME STRING,"
+                            + " ID BIGINT,"
+                            + " COUNTRY STRING,"
+                            + " CITY STRING,"
+                            + " DETAIL_ADDRESS STRING,"
+                            + " primary key (CITY, ID) not enforced"
+                            + ") WITH ("
+                            + " 'connector' = 'values',"
+                            + " 'sink-insert-only' = 'false'"
+                            + ")");
+            TableResult tableResult = tEnv.executeSql("insert into sink select 
* from address");
+            JobClient jobClient = tableResult.getJobClient().get();
+
+            // step 2: assert fetched snapshot data in this round
+            String cityName = newlyAddedTable.split("_")[1];
+            List<String> expectedSnapshotDataThisRound =
+                    Arrays.asList(
+                            format(
+                                    "+I[%s, 416874195632735147, China, %s, %s 
West Town address 1]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 416927583791428523, China, %s, %s 
West Town address 2]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614379, China, %s, %s 
West Town address 3]",
+                                    newlyAddedTable, cityName, cityName));
+            if (makeRedoLogBeforeCapture) {
+                expectedSnapshotDataThisRound =
+                        Arrays.asList(
+                                format(
+                                        "+I[%s, 416874195632735147, China, %s, 
%s West Town address 1]",
+                                        newlyAddedTable, cityName, cityName),
+                                format(
+                                        "+I[%s, 416927583791428523, China, %s, 
%s West Town address 2]",
+                                        newlyAddedTable, cityName, cityName),
+                                format(
+                                        "+I[%s, 417022095255614379, China, %s, 
%s West Town address 3]",
+                                        newlyAddedTable, cityName, cityName),
+                                format(
+                                        "+I[%s, 417022095255614381, China, %s, 
%s West Town address 5]",
+                                        newlyAddedTable, cityName, cityName));
+            }
+
+            // trigger failover after some snapshot data read finished
+            if (failoverPhase == FailoverPhase.SNAPSHOT) {
+                triggerFailover(
+                        failoverType,
+                        jobClient.getJobID(),
+                        miniClusterResource.getMiniCluster(),
+                        () -> sleepMs(100));
+            }
+            fetchedDataList.addAll(expectedSnapshotDataThisRound);
+            waitForUpsertSinkSize("sink", fetchedDataList.size());
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getResults("sink"));
+
+            // step 3: make some redo log data for this round
+            makeFirstPartRedoLogForAddressTable(newlyAddedTable);
+            if (failoverPhase == FailoverPhase.REDO_LOG) {
+                triggerFailover(
+                        failoverType,
+                        jobClient.getJobID(),
+                        miniClusterResource.getMiniCluster(),
+                        () -> sleepMs(100));
+            }
+            makeSecondPartRedoLogForAddressTable(newlyAddedTable);
+
+            // step 4: assert fetched redo log data in this round
+            // retract the old data with id 416874195632735147
+            fetchedDataList =
+                    fetchedDataList.stream()
+                            .filter(
+                                    r ->
+                                            !r.contains(
+                                                    format(
+                                                            "%s, 
416874195632735147",
+                                                            newlyAddedTable)))
+                            .collect(Collectors.toList());
+            List<String> expectedRedoLogUpsertDataThisRound =
+                    Arrays.asList(
+                            // add the new data with id 416874195632735147
+                            format(
+                                    "+I[%s, 416874195632735147, CHINA, %s, %s 
West Town address 1]",
+                                    newlyAddedTable, cityName, cityName),
+                            format(
+                                    "+I[%s, 417022095255614380, China, %s, %s 
West Town address 4]",
+                                    newlyAddedTable, cityName, cityName));
+
+            // step 5: assert fetched redo log data in this round
+            fetchedDataList.addAll(expectedRedoLogUpsertDataThisRound);
+
+            waitForUpsertSinkSize("sink", fetchedDataList.size());
+            // the result size of sink may arrive fetchedDataList.size() with 
old data, wait one
+            // checkpoint to wait retract old record and send new record
+            Thread.sleep(1000);
+            assertEqualsInAnyOrder(fetchedDataList, 
TestValuesTableFactory.getResults("sink"));
+
+            // step 6: trigger savepoint
+            if (round != captureAddressTables.length - 1) {
+                finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+            }
+            jobClient.cancel().get();
+        }
+    }
+
+    private void initialAddressTables(Connection connection, String[] 
addressTables)
+            throws SQLException {
+        try {
+            connection.setAutoCommit(false);
+            Statement statement = connection.createStatement();
+            for (String tableName : addressTables) {
+                // make initial data for given table
+                String tableId = ORACLE_SCHEMA + '.' + tableName;
+                String cityName = tableName.split("_")[1];
+                statement.execute(
+                        "CREATE TABLE "
+                                + tableId
+                                + "("
+                                + "  ID NUMBER(19) NOT NULL,"
+                                + "  COUNTRY VARCHAR(255) NOT NULL,"
+                                + "  CITY VARCHAR(255) NOT NULL,"
+                                + "  DETAIL_ADDRESS VARCHAR(1024),"
+                                + "  PRIMARY KEY(ID)"
+                                + ")");
+                statement.execute(
+                        format(
+                                "INSERT INTO  %s "
+                                        + "VALUES (416874195632735147, 
'China', '%s', '%s West Town address 1')",
+                                tableId, cityName, cityName));
+                statement.execute(
+                        format(
+                                "INSERT INTO  %s "
+                                        + "VALUES (416927583791428523, 
'China', '%s', '%s West Town address 2')",
+                                tableId, cityName, cityName));
+                statement.execute(
+                        format(
+                                "INSERT INTO  %s "
+                                        + "VALUES (417022095255614379, 
'China', '%s', '%s West Town address 3')",
+                                tableId, cityName, cityName));
+            }
+            connection.commit();
+        } finally {
+            connection.close();
+        }
+    }
+
+    private void makeFirstPartRedoLogForAddressTable(String tableName) throws 
Exception {
+        String tableId = ORACLE_SCHEMA + '.' + tableName;
+        executeSql(
+                format("UPDATE %s SET COUNTRY = 'CHINA' where ID = 
416874195632735147", tableId));
+    }
+
+    private void makeSecondPartRedoLogForAddressTable(String tableName) throws 
Exception {
+        String tableId = ORACLE_SCHEMA + '.' + tableName;
+        String cityName = tableName.split("_")[1];
+        executeSql(
+                format(
+                        "INSERT INTO %s VALUES(417022095255614380, 
'China','%s','%s West Town address 4')",
+                        tableId, cityName, cityName));
+    }
+
+    private void makeRedoLogBeforeCaptureForAddressTable(String tableName) 
throws Exception {
+        String tableId = ORACLE_SCHEMA + '.' + tableName;
+        String cityName = tableName.split("_")[1];
+        executeSql(
+                format(
+                        "INSERT INTO %s VALUES(417022095255614381, 
'China','%s','%s West Town address 5')",
+                        tableId, cityName, cityName));
+    }
+
+    private void makeRedoLogForAddressTableInRound(String tableName, int 
round) throws Exception {
+        String tableId = ORACLE_SCHEMA + '.' + tableName;
+        String cityName = tableName.split("_")[1];
+        executeSql(
+                format(
+                        "UPDATE %s SET COUNTRY = 'CHINA_%s' where id = 
416874195632735147",
+                        tableId, round));
+        executeSql(
+                format(
+                        "INSERT INTO %s VALUES(%d, 'China','%s','%s West Town 
address 4')",
+                        tableId, 417022095255614380L + round, cityName, 
cityName));
+    }
+
+    private void sleepMs(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException ignored) {
+        }
+    }
+
+    private String triggerSavepointWithRetry(JobClient jobClient, String 
savepointDirectory)
+            throws ExecutionException, InterruptedException {
+        int retryTimes = 0;
+        // retry 600 times, it takes 100 milliseconds per time, at most retry 
1 minute
+        while (retryTimes < 600) {
+            try {
+                return jobClient.triggerSavepoint(savepointDirectory).get();
+            } catch (Exception e) {
+                Optional<CheckpointException> exception =
+                        ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+                if (exception.isPresent()
+                        && exception.get().getMessage().contains("Checkpoint 
triggering task")) {
+                    Thread.sleep(100);
+                    retryTimes++;
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return null;
+    }
+
+    private StreamExecutionEnvironment 
getStreamExecutionEnvironmentFromSavePoint(
+            String finishedSavePointPath, int parallelism) throws Exception {
+        Configuration configuration = new Configuration();
+        if (finishedSavePointPath != null) {
+            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
+        }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(200L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
+        return env;
+    }
+
+    private String getCreateTableStatement(
+            Map<String, String> otherOptions, String... captureTableNames) {
+        return String.format(
+                "CREATE TABLE address ("
+                        + " table_name STRING METADATA VIRTUAL,"
+                        + " ID BIGINT NOT NULL,"
+                        + " COUNTRY STRING,"
+                        + " CITY STRING,"
+                        + " DETAIL_ADDRESS STRING,"
+                        + " primary key (CITY, ID) not enforced"
+                        + ") WITH ("
+                        + " 'connector' = 'oracle-cdc',"
+                        + " 'scan.incremental.snapshot.enabled' = 'true',"
+                        + " 'hostname' = '%s',"
+                        + " 'port' = '%s',"
+                        + " 'username' = '%s',"
+                        + " 'password' = '%s',"
+                        + " 'database-name' = '%s',"
+                        + " 'schema-name' = '%s',"
+                        + " 'table-name' = '%s',"
+                        + " 'debezium.log.mining.strategy' = 'online_catalog',"
+                        + " 
'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+                        + " 'scan.incremental.snapshot.chunk.size' = '2',"
+                        + " 'scan.newly-added-table.enabled' = 'true',"
+                        + " 'chunk-meta.group.size' = '2'"
+                        + " %s"
+                        + ")",
+                ORACLE_CONTAINER.getHost(),
+                ORACLE_CONTAINER.getOraclePort(),
+                ORACLE_CONTAINER.getUsername(),
+                ORACLE_CONTAINER.getPassword(),
+                ORACLE_DATABASE,
+                ORACLE_SCHEMA,
+                getTableNameRegex(captureTableNames),
+                otherOptions.isEmpty()
+                        ? ""
+                        : ","
+                                + otherOptions.entrySet().stream()
+                                        .map(
+                                                e ->
+                                                        String.format(
+                                                                "'%s'='%s'",
+                                                                e.getKey(), 
e.getValue()))
+                                        .collect(Collectors.joining(",")));
+    }
+
+    private void executeSql(String sql) throws Exception {
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index 3c4a71590..ee5a1c947 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -25,9 +25,9 @@ import 
org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import 
org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
 import 
org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
 import 
org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
+import 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
+import 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
 import org.apache.flink.cdc.connectors.oracle.testutils.TestTable;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
-import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -58,6 +58,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.catalog.Column.physical;
@@ -707,54 +708,4 @@ public class OracleSourceITCase extends 
OracleSourceTestBase {
             statement.execute(sql);
         }
     }
-
-    // ------------------------------------------------------------------------
-    //  test utilities
-    // ------------------------------------------------------------------------
-
-    /** The type of failover. */
-    private enum FailoverType {
-        TM,
-        JM,
-        NONE
-    }
-
-    /** The phase of failover. */
-    private enum FailoverPhase {
-        SNAPSHOT,
-        REDO_LOG,
-        NEVER
-    }
-
-    private static void triggerFailover(
-            FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable 
afterFailAction)
-            throws Exception {
-        switch (type) {
-            case TM:
-                restartTaskManager(miniCluster, afterFailAction);
-                break;
-            case JM:
-                triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
-                break;
-            case NONE:
-                break;
-            default:
-                throw new IllegalStateException("Unexpected value: " + type);
-        }
-    }
-
-    private static void triggerJobManagerFailover(
-            JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) 
throws Exception {
-        final HaLeadershipControl haLeadershipControl = 
miniCluster.getHaLeadershipControl().get();
-        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
-        afterFailAction.run();
-        haLeadershipControl.grantJobMasterLeadership(jobId).get();
-    }
-
-    private static void restartTaskManager(MiniCluster miniCluster, Runnable 
afterFailAction)
-            throws Exception {
-        miniCluster.terminateTaskManager(0).get();
-        afterFailAction.run();
-        miniCluster.startTaskManager();
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
index 8fb1157f4..7f8b83b49 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
@@ -174,7 +174,7 @@ public class OracleSourceTestBase extends TestLogger {
     }
 
     // ------------------ utils -----------------------
-    private static List<TableId> listTables(Connection connection) {
+    protected static List<TableId> listTables(Connection connection) {
 
         Set<TableId> tableIdSet = new HashSet<>();
         String queryTablesSql =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
index 584c8c16b..4fe043a5e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
@@ -118,7 +118,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -154,7 +155,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -164,6 +166,7 @@ public class OracleTableSourceFactoryTest {
         options.put("port", "1521");
         options.put("hostname", MY_LOCALHOST);
         options.put("debezium.snapshot.mode", "initial");
+        options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), 
"true");
 
         DynamicTableSource actualSource = createTableSource(options);
         Properties dbzProperties = new Properties();
@@ -194,7 +197,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        true);
         assertEquals(expectedSource, actualSource);
     }
 
@@ -220,6 +224,7 @@ public class OracleTableSourceFactoryTest {
         options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), 
String.valueOf(fetchSize));
         
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), 
"true");
         
options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(), 
"true");
+        options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), 
"true");
 
         options.put(
                 JdbcSourceOptions.CONNECT_TIMEOUT.key(),
@@ -260,6 +265,7 @@ public class OracleTableSourceFactoryTest {
                         distributionFactorLower,
                         null,
                         true,
+                        true,
                         true);
         assertEquals(expectedSource, actualSource);
     }
@@ -297,7 +303,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -334,7 +341,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -375,7 +383,8 @@ public class OracleTableSourceFactoryTest {
                                 .defaultValue(),
                         null,
                         
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =
                 Arrays.asList("op_ts", "database_name", "table_name", 
"schema_name");
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
new file mode 100644
index 000000000..2dd6e3a04
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.testutils;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Oracle test utilities. */
+public class OracleTestUtils {
+
+    /** The type of failover. */
+    public enum FailoverType {
+        TM,
+        JM,
+        NONE
+    }
+
+    /** The phase of failover. */
+    public enum FailoverPhase {
+        SNAPSHOT,
+        REDO_LOG,
+        NEVER
+    }
+
+    public static void triggerFailover(
+            FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable 
afterFailAction)
+            throws Exception {
+        switch (type) {
+            case TM:
+                restartTaskManager(miniCluster, afterFailAction);
+                break;
+            case JM:
+                triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
+                break;
+            case NONE:
+                break;
+            default:
+                throw new IllegalStateException("Unexpected value: " + type);
+        }
+    }
+
+    public static void triggerJobManagerFailover(
+            JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) 
throws Exception {
+        final HaLeadershipControl haLeadershipControl = 
miniCluster.getHaLeadershipControl().get();
+        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+        afterFailAction.run();
+        haLeadershipControl.grantJobMasterLeadership(jobId).get();
+    }
+
+    public static void restartTaskManager(MiniCluster miniCluster, Runnable 
afterFailAction)
+            throws Exception {
+        miniCluster.terminateTaskManager(0).get();
+        afterFailAction.run();
+        miniCluster.startTaskManager();
+    }
+
+    public static void waitForSinkSize(String sinkName, int expectedSize)
+            throws InterruptedException {
+        while (sinkSize(sinkName) < expectedSize) {
+            Thread.sleep(100);
+        }
+    }
+
+    public static int sinkSize(String sinkName) {
+        synchronized (TestValuesTableFactory.class) {
+            try {
+                return TestValuesTableFactory.getRawResults(sinkName).size();
+            } catch (IllegalArgumentException e) {
+                // job is not started yet
+                return 0;
+            }
+        }
+    }
+
+    public static void waitForUpsertSinkSize(String sinkName, int expectedSize)
+            throws InterruptedException {
+        while (upsertSinkSize(sinkName) < expectedSize) {
+            Thread.sleep(100);
+        }
+    }
+
+    public static int upsertSinkSize(String sinkName) {
+        synchronized (TestValuesTableFactory.class) {
+            try {
+                return TestValuesTableFactory.getResults(sinkName).size();
+            } catch (IllegalArgumentException e) {
+                // job is not started yet
+                return 0;
+            }
+        }
+    }
+
+    public static String getTableNameRegex(String[] captureCustomerTables) {
+        checkState(captureCustomerTables.length > 0);
+        if (captureCustomerTables.length == 1) {
+            return captureCustomerTables[0];
+        } else {
+            // pattern that matches multiple tables
+            return format("(%s)", StringUtils.join(captureCustomerTables, 
"|"));
+        }
+    }
+}

Reply via email to