This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2d1eb0aff [FLINK-34990][cdc-connector][oracle] Oracle cdc support
newly add table (#3203)
2d1eb0aff is described below
commit 2d1eb0aff1b5eba5e68d311e9ea45295fcb3bd64
Author: Xin Gong <[email protected]>
AuthorDate: Tue Jul 16 09:49:09 2024 +0800
[FLINK-34990][cdc-connector][oracle] Oracle cdc support newly add table
(#3203)
* [cdc-connector][oracle] Oracle cdc support newly add table
* [cdc-connector][oracle] Fix code style
* [cdc-connector][oracle] Address comment
---
.../oracle/source/OracleSourceBuilder.java | 6 +
.../oracle/source/config/OracleSourceConfig.java | 5 +-
.../source/config/OracleSourceConfigFactory.java | 3 +-
.../connectors/oracle/table/OracleTableSource.java | 15 +-
.../oracle/table/OracleTableSourceFactory.java | 6 +-
.../oracle/source/NewlyAddedTableITCase.java | 908 +++++++++++++++++++++
.../oracle/source/OracleSourceITCase.java | 55 +-
.../oracle/source/OracleSourceTestBase.java | 2 +-
.../oracle/table/OracleTableSourceFactoryTest.java | 21 +-
.../oracle/testutils/OracleTestUtils.java | 124 +++
10 files changed, 1078 insertions(+), 67 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
index 9a740b15b..0560ffb76 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceBuilder.java
@@ -243,6 +243,12 @@ public class OracleSourceBuilder<T> {
return this;
}
+ /** Whether the {@link OracleIncrementalSource} should scan the newly
added tables or not. */
+ public OracleSourceBuilder<T> scanNewlyAddedTableEnabled(boolean
scanNewlyAddedTableEnabled) {
+
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+ return this;
+ }
+
/**
* Build the {@link OracleIncrementalSource}.
*
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
index 2c007fe54..19386290a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfig.java
@@ -63,7 +63,8 @@ public class OracleSourceConfig extends JdbcSourceConfig {
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn,
- boolean skipSnapshotBackfill) {
+ boolean skipSnapshotBackfill,
+ boolean scanNewlyAddedTableEnabled) {
super(
startupOptions,
databaseList,
@@ -89,7 +90,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
- false);
+ scanNewlyAddedTableEnabled);
this.url = url;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
index cf01b849e..974cedf4a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java
@@ -133,6 +133,7 @@ public class OracleSourceConfigFactory extends
JdbcSourceConfigFactory {
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ scanNewlyAddedTableEnabled);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
index 9487a0968..5903ce41e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSource.java
@@ -79,6 +79,7 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
private final boolean skipSnapshotBackfill;
+ private final boolean scanNewlyAddedTableEnabled;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
@@ -113,7 +114,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
double distributionFactorLower,
@Nullable String chunkKeyColumn,
boolean closeIdleReaders,
- boolean skipSnapshotBackfill) {
+ boolean skipSnapshotBackfill,
+ boolean scanNewlyAddedTableEnabled) {
this.physicalSchema = physicalSchema;
this.url = url;
this.port = port;
@@ -139,6 +141,7 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
+ this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
}
@Override
@@ -187,6 +190,7 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.chunkKeyColumn(chunkKeyColumn)
+
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.build();
return SourceProvider.of(oracleChangeEventSource);
@@ -252,7 +256,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ scanNewlyAddedTableEnabled);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -291,7 +296,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(distributionFactorLower,
that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders)
- && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill);
+ && Objects.equals(skipSnapshotBackfill,
that.skipSnapshotBackfill)
+ && Objects.equals(scanNewlyAddedTableEnabled,
that.scanNewlyAddedTableEnabled);
}
@Override
@@ -321,7 +327,8 @@ public class OracleTableSource implements ScanTableSource,
SupportsReadingMetada
distributionFactorLower,
chunkKeyColumn,
closeIdleReaders,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ scanNewlyAddedTableEnabled);
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 5f533d8f2..70a63b3ae 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -47,6 +47,7 @@ import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_M
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -106,6 +107,7 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
boolean closeIdlerReaders =
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+ boolean scanNewlyAddedTableEnabled =
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -142,7 +144,8 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
distributionFactorLower,
chunkKeyColumn,
closeIdlerReaders,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ scanNewlyAddedTableEnabled);
}
@Override
@@ -180,6 +183,7 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+ options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
new file mode 100644
index 000000000..0cbbb8a57
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
+import
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.getTableNameRegex;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForUpsertSinkSize;
+
+/** IT tests to cover various newly added tables during capture process. */
+public class NewlyAddedTableITCase extends OracleSourceTestBase {
+ @Rule public final Timeout timeoutPerTest = Timeout.seconds(600);
+
+ private final ScheduledExecutorService mockRedoLogExecutor =
+ Executors.newScheduledThreadPool(1);
+
+ @BeforeClass
+ public static void beforeClass() throws SQLException {
+ try (Connection dbaConnection = getJdbcConnectionAsDBA();
+ Statement dbaStatement = dbaConnection.createStatement()) {
+ dbaStatement.execute("ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
(ALL) COLUMNS");
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ TestValuesTableFactory.clearAllData();
+ createAndInitialize("customer.sql");
+ try (Connection connection = getJdbcConnection()) {
+ Statement statement = connection.createStatement();
+ connection.setAutoCommit(false);
+ // prepare initial data for given table
+ String tableId = ORACLE_SCHEMA + ".PRODUCE_LOG_TABLE";
+ statement.execute(
+ format(
+ "CREATE TABLE %s ( ID NUMBER(19), CNT NUMBER(19),
PRIMARY KEY(ID))",
+ tableId));
+ statement.execute(format("INSERT INTO %s VALUES (0, 100)",
tableId));
+ statement.execute(format("INSERT INTO %s VALUES (1, 101)",
tableId));
+ statement.execute(format("INSERT INTO %s VALUES (2, 102)",
tableId));
+ connection.commit();
+
+ // mock continuous redo log during the newly added table capturing
process
+ mockRedoLogExecutor.schedule(
+ () -> {
+ try {
+ executeSql(format("UPDATE %s SET CNT = CNT +1
WHERE ID < 2", tableId));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ },
+ 500,
+ TimeUnit.MICROSECONDS);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ mockRedoLogExecutor.shutdown();
+ // sleep 1000ms to wait until connections are closed.
+ Thread.sleep(1000L);
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineOnceWithAheadRedoLog()
throws Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLog()
throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void
testNewlyAddedTableForExistsPipelineTwiceWithAheadRedoLogAndAutoCloseReader()
+ throws Exception {
+ Map<String, String> otherOptions = new HashMap<>();
+ otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ otherOptions,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI",
+ "ADDRESS_SHENZHEN");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineThriceWithAheadRedoLog()
throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI",
+ "ADDRESS_SHENZHEN");
+ }
+
+ @Test
+ public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws
Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void
testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadRedoLog()
+ throws Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testJobManagerFailoverForNewlyAddedTableWithAheadRedoLog()
throws Exception {
+ testNewlyAddedTableOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ true,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.TM,
+ FailoverPhase.REDO_LOG,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testTaskManagerFailoverForNewlyAddedTableWithAheadRedoLog()
throws Exception {
+ testNewlyAddedTableOneByOne(
+ 1,
+ FailoverType.TM,
+ FailoverPhase.REDO_LOG,
+ false,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING");
+ }
+
+ @Test
+ public void testJobManagerFailoverForRemoveTableSingleParallelism() throws
Exception {
+ testRemoveTablesOneByOne(
+ 1,
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testJobManagerFailoverForRemoveTable() throws Exception {
+ testRemoveTablesOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testTaskManagerFailoverForRemoveTableSingleParallelism()
throws Exception {
+ testRemoveTablesOneByOne(
+ 1,
+ FailoverType.TM,
+ FailoverPhase.SNAPSHOT,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testTaskManagerFailoverForRemoveTable() throws Exception {
+ testRemoveTablesOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.TM,
+ FailoverPhase.SNAPSHOT,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testRemoveTableSingleParallelism() throws Exception {
+ testRemoveTablesOneByOne(
+ 1,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testRemoveTable() throws Exception {
+ testRemoveTablesOneByOne(
+ DEFAULT_PARALLELISM,
+ FailoverType.NONE,
+ FailoverPhase.NEVER,
+ "ADDRESS_HANGZHOU",
+ "ADDRESS_BEIJING",
+ "ADDRESS_SHANGHAI");
+ }
+
+ @Test
+ public void testRemoveAndAddTablesOneByOne() throws Exception {
+ testRemoveAndAddTablesOneByOne(
+ 1, "ADDRESS_HANGZHOU", "ADDRESS_BEIJING", "ADDRESS_SHANGHAI");
+ }
+
+ private void testRemoveAndAddTablesOneByOne(int parallelism, String...
captureAddressTables)
+ throws Exception {
+
+ Connection connection = getJdbcConnection();
+ // step 1: create tables with all tables included
+ initialAddressTables(connection, captureAddressTables);
+
+ final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final String savepointDirectory =
temporaryFolder.newFolder().toURI().toString();
+
+ // get all expected data
+ List<String> fetchedDataList = new ArrayList<>();
+
+ String finishedSavePointPath = null;
+ // test removing and adding table one by one
+ for (int round = 0; round < captureAddressTables.length; round++) {
+ String captureTableThisRound = captureAddressTables[round];
+ String cityName = captureTableThisRound.split("_")[1];
+ StreamExecutionEnvironment env =
+
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String createTableStatement =
+ getCreateTableStatement(new HashMap<>(),
captureTableThisRound);
+ tEnv.executeSql(createTableStatement);
+ tEnv.executeSql(
+ "CREATE TABLE sink ("
+ + " TABLE_NAME STRING,"
+ + " ID BIGINT,"
+ + " COUNTRY STRING,"
+ + " CITY STRING,"
+ + " DETAIL_ADDRESS STRING,"
+ + " primary key (CITY, ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+ TableResult tableResult = tEnv.executeSql("insert into sink select
* from address");
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ // this round's snapshot data
+ fetchedDataList.addAll(
+ Arrays.asList(
+ format(
+ "+I[%s, 416874195632735147, China, %s, %s
West Town address 1]",
+ captureTableThisRound, cityName, cityName),
+ format(
+ "+I[%s, 416927583791428523, China, %s, %s
West Town address 2]",
+ captureTableThisRound, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614379, China, %s, %s
West Town address 3]",
+ captureTableThisRound, cityName,
cityName)));
+ waitForSinkSize("sink", fetchedDataList.size());
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getRawResults("sink"));
+
+ // step 2: make redo log data for all tables before this
round(also includes this
+ // round),
+ // test whether only this round table's data is captured.
+ for (int i = 0; i <= round; i++) {
+ String tableName = captureAddressTables[i];
+ makeRedoLogForAddressTableInRound(tableName, round);
+ }
+ // this round's redo log data
+ fetchedDataList.addAll(
+ Arrays.asList(
+ format(
+ "+U[%s, 416874195632735147, CHINA_%s, %s,
%s West Town address 1]",
+ captureTableThisRound, round, cityName,
cityName),
+ format(
+ "+I[%s, %d, China, %s, %s West Town
address 4]",
+ captureTableThisRound,
+ 417022095255614380L + round,
+ cityName,
+ cityName)));
+
+ // step 3: assert fetched redo log data in this round
+ waitForSinkSize("sink", fetchedDataList.size());
+
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getRawResults("sink"));
+ // step 4: trigger savepoint
+ finishedSavePointPath = triggerSavepointWithRetry(jobClient,
savepointDirectory);
+ jobClient.cancel().get();
+ }
+ }
+
+ private void testRemoveTablesOneByOne(
+ int parallelism,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String... captureAddressTables)
+ throws Exception {
+
+ // step 1: create oracle tables with all tables included
+ initialAddressTables(getJdbcConnection(), captureAddressTables);
+
+ final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final String savepointDirectory =
temporaryFolder.newFolder().toURI().toString();
+
+ // get all expected data
+ List<String> fetchedDataList = new ArrayList<>();
+ for (String table : captureAddressTables) {
+ String cityName = table.split("_")[1];
+ fetchedDataList.addAll(
+ Arrays.asList(
+ format(
+ "+I[%s, 416874195632735147, China, %s, %s
West Town address 1]",
+ table, cityName, cityName),
+ format(
+ "+I[%s, 416927583791428523, China, %s, %s
West Town address 2]",
+ table, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614379, China, %s, %s
West Town address 3]",
+ table, cityName, cityName)));
+ }
+
+ String finishedSavePointPath = null;
+ // step 2: execute insert and trigger savepoint with all tables added
+ {
+ StreamExecutionEnvironment env =
+
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String createTableStatement =
+ getCreateTableStatement(new HashMap<>(),
captureAddressTables);
+ tEnv.executeSql(createTableStatement);
+ tEnv.executeSql(
+ "CREATE TABLE sink ("
+ + " TABLE_NAME STRING,"
+ + " ID BIGINT,"
+ + " COUNTRY STRING,"
+ + " CITY STRING,"
+ + " DETAIL_ADDRESS STRING,"
+ + " primary key (CITY, ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+ TableResult tableResult = tEnv.executeSql("insert into sink select
* from address");
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ // trigger failover after some snapshot data read finished
+ if (failoverPhase == FailoverPhase.SNAPSHOT) {
+ triggerFailover(
+ failoverType,
+ jobClient.getJobID(),
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+ waitForSinkSize("sink", fetchedDataList.size());
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getRawResults("sink"));
+ finishedSavePointPath = triggerSavepointWithRetry(jobClient,
savepointDirectory);
+ jobClient.cancel().get();
+ }
+
+ // test removing table one by one, note that there should be at least
one table remaining
+ for (int round = 0; round < captureAddressTables.length - 1; round++) {
+ String[] captureTablesThisRound =
+ Arrays.asList(captureAddressTables)
+ .subList(round + 1, captureAddressTables.length)
+ .toArray(new String[0]);
+
+ StreamExecutionEnvironment env =
+
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String createTableStatement =
+ getCreateTableStatement(new HashMap<>(),
captureTablesThisRound);
+ tEnv.executeSql(createTableStatement);
+ tEnv.executeSql(
+ "CREATE TABLE sink ("
+ + " TABLE_NAME STRING,"
+ + " ID BIGINT,"
+ + " COUNTRY STRING,"
+ + " CITY STRING,"
+ + " DETAIL_ADDRESS STRING,"
+ + " primary key (CITY, ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+ TableResult tableResult = tEnv.executeSql("insert into sink select
* from address");
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ waitForSinkSize("sink", fetchedDataList.size());
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getRawResults("sink"));
+
+ // step 3: make redo log data for all tables
+ List<String> expectedRedoLogDataThisRound = new ArrayList<>();
+
+ for (int i = 0, captureAddressTablesLength =
captureAddressTables.length;
+ i < captureAddressTablesLength;
+ i++) {
+ String tableName = captureAddressTables[i];
+ makeRedoLogForAddressTableInRound(tableName, round);
+ if (i <= round) {
+ continue;
+ }
+ String cityName = tableName.split("_")[1];
+
+ expectedRedoLogDataThisRound.addAll(
+ Arrays.asList(
+ format(
+ "+U[%s, 416874195632735147, CHINA_%s,
%s, %s West Town address 1]",
+ tableName, round, cityName, cityName),
+ format(
+ "+I[%s, %d, China, %s, %s West Town
address 4]",
+ tableName,
+ 417022095255614380L + round,
+ cityName,
+ cityName)));
+ }
+
+ if (failoverPhase == FailoverPhase.REDO_LOG
+ && TestValuesTableFactory.getRawResults("sink").size()
+ > fetchedDataList.size()) {
+ triggerFailover(
+ failoverType,
+ jobClient.getJobID(),
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+
+ fetchedDataList.addAll(expectedRedoLogDataThisRound);
+ // step 4: assert fetched redo log data in this round
+ waitForSinkSize("sink", fetchedDataList.size());
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getRawResults("sink"));
+
+ // step 5: trigger savepoint
+ finishedSavePointPath = triggerSavepointWithRetry(jobClient,
savepointDirectory);
+ jobClient.cancel().get();
+ }
+ }
+
+ private void testNewlyAddedTableOneByOne(
+ int parallelism,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ boolean makeRedoLogBeforeCapture,
+ String... captureAddressTables)
+ throws Exception {
+ testNewlyAddedTableOneByOne(
+ parallelism,
+ new HashMap<>(),
+ failoverType,
+ failoverPhase,
+ makeRedoLogBeforeCapture,
+ captureAddressTables);
+ }
+
+ private void testNewlyAddedTableOneByOne(
+ int parallelism,
+ Map<String, String> sourceOptions,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ boolean makeRedoLogBeforeCapture,
+ String... captureAddressTables)
+ throws Exception {
+
+ // step 1: create oracle tables with initial data
+ initialAddressTables(getJdbcConnection(), captureAddressTables);
+
+ final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final String savepointDirectory =
temporaryFolder.newFolder().toURI().toString();
+
+ // test newly added table one by one
+ String finishedSavePointPath = null;
+ List<String> fetchedDataList = new ArrayList<>();
+ for (int round = 0; round < captureAddressTables.length; round++) {
+ String[] captureTablesThisRound =
+ Arrays.asList(captureAddressTables)
+ .subList(0, round + 1)
+ .toArray(new String[0]);
+ String newlyAddedTable = captureAddressTables[round];
+ if (makeRedoLogBeforeCapture) {
+ makeRedoLogBeforeCaptureForAddressTable(newlyAddedTable);
+ }
+ StreamExecutionEnvironment env =
+
getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String createTableStatement =
+ getCreateTableStatement(sourceOptions,
captureTablesThisRound);
+ tEnv.executeSql(createTableStatement);
+ tEnv.executeSql(
+ "CREATE TABLE sink ("
+ + " TABLE_NAME STRING,"
+ + " ID BIGINT,"
+ + " COUNTRY STRING,"
+ + " CITY STRING,"
+ + " DETAIL_ADDRESS STRING,"
+ + " primary key (CITY, ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+ TableResult tableResult = tEnv.executeSql("insert into sink select
* from address");
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ // step 2: assert fetched snapshot data in this round
+ String cityName = newlyAddedTable.split("_")[1];
+ List<String> expectedSnapshotDataThisRound =
+ Arrays.asList(
+ format(
+ "+I[%s, 416874195632735147, China, %s, %s
West Town address 1]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 416927583791428523, China, %s, %s
West Town address 2]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614379, China, %s, %s
West Town address 3]",
+ newlyAddedTable, cityName, cityName));
+ if (makeRedoLogBeforeCapture) {
+ expectedSnapshotDataThisRound =
+ Arrays.asList(
+ format(
+ "+I[%s, 416874195632735147, China, %s,
%s West Town address 1]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 416927583791428523, China, %s,
%s West Town address 2]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614379, China, %s,
%s West Town address 3]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614381, China, %s,
%s West Town address 5]",
+ newlyAddedTable, cityName, cityName));
+ }
+
+ // trigger failover after some snapshot data read finished
+ if (failoverPhase == FailoverPhase.SNAPSHOT) {
+ triggerFailover(
+ failoverType,
+ jobClient.getJobID(),
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+ fetchedDataList.addAll(expectedSnapshotDataThisRound);
+ waitForUpsertSinkSize("sink", fetchedDataList.size());
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getResults("sink"));
+
+ // step 3: make some redo log data for this round
+ makeFirstPartRedoLogForAddressTable(newlyAddedTable);
+ if (failoverPhase == FailoverPhase.REDO_LOG) {
+ triggerFailover(
+ failoverType,
+ jobClient.getJobID(),
+ miniClusterResource.getMiniCluster(),
+ () -> sleepMs(100));
+ }
+ makeSecondPartRedoLogForAddressTable(newlyAddedTable);
+
+ // step 4: assert fetched redo log data in this round
+ // retract the old data with id 416874195632735147
+ fetchedDataList =
+ fetchedDataList.stream()
+ .filter(
+ r ->
+ !r.contains(
+ format(
+ "%s,
416874195632735147",
+ newlyAddedTable)))
+ .collect(Collectors.toList());
+ List<String> expectedRedoLogUpsertDataThisRound =
+ Arrays.asList(
+ // add the new data with id 416874195632735147
+ format(
+ "+I[%s, 416874195632735147, CHINA, %s, %s
West Town address 1]",
+ newlyAddedTable, cityName, cityName),
+ format(
+ "+I[%s, 417022095255614380, China, %s, %s
West Town address 4]",
+ newlyAddedTable, cityName, cityName));
+
+ // step 5: assert fetched redo log data in this round
+ fetchedDataList.addAll(expectedRedoLogUpsertDataThisRound);
+
+ waitForUpsertSinkSize("sink", fetchedDataList.size());
+ // the result size of sink may arrive fetchedDataList.size() with
old data, wait one
+ // checkpoint to wait retract old record and send new record
+ Thread.sleep(1000);
+ assertEqualsInAnyOrder(fetchedDataList,
TestValuesTableFactory.getResults("sink"));
+
+ // step 6: trigger savepoint
+ if (round != captureAddressTables.length - 1) {
+ finishedSavePointPath = triggerSavepointWithRetry(jobClient,
savepointDirectory);
+ }
+ jobClient.cancel().get();
+ }
+ }
+
+ private void initialAddressTables(Connection connection, String[]
addressTables)
+ throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ Statement statement = connection.createStatement();
+ for (String tableName : addressTables) {
+ // make initial data for given table
+ String tableId = ORACLE_SCHEMA + '.' + tableName;
+ String cityName = tableName.split("_")[1];
+ statement.execute(
+ "CREATE TABLE "
+ + tableId
+ + "("
+ + " ID NUMBER(19) NOT NULL,"
+ + " COUNTRY VARCHAR(255) NOT NULL,"
+ + " CITY VARCHAR(255) NOT NULL,"
+ + " DETAIL_ADDRESS VARCHAR(1024),"
+ + " PRIMARY KEY(ID)"
+ + ")");
+ statement.execute(
+ format(
+ "INSERT INTO %s "
+ + "VALUES (416874195632735147,
'China', '%s', '%s West Town address 1')",
+ tableId, cityName, cityName));
+ statement.execute(
+ format(
+ "INSERT INTO %s "
+ + "VALUES (416927583791428523,
'China', '%s', '%s West Town address 2')",
+ tableId, cityName, cityName));
+ statement.execute(
+ format(
+ "INSERT INTO %s "
+ + "VALUES (417022095255614379,
'China', '%s', '%s West Town address 3')",
+ tableId, cityName, cityName));
+ }
+ connection.commit();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void makeFirstPartRedoLogForAddressTable(String tableName) throws
Exception {
+ String tableId = ORACLE_SCHEMA + '.' + tableName;
+ executeSql(
+ format("UPDATE %s SET COUNTRY = 'CHINA' where ID =
416874195632735147", tableId));
+ }
+
+ private void makeSecondPartRedoLogForAddressTable(String tableName) throws
Exception {
+ String tableId = ORACLE_SCHEMA + '.' + tableName;
+ String cityName = tableName.split("_")[1];
+ executeSql(
+ format(
+ "INSERT INTO %s VALUES(417022095255614380,
'China','%s','%s West Town address 4')",
+ tableId, cityName, cityName));
+ }
+
+ private void makeRedoLogBeforeCaptureForAddressTable(String tableName)
throws Exception {
+ String tableId = ORACLE_SCHEMA + '.' + tableName;
+ String cityName = tableName.split("_")[1];
+ executeSql(
+ format(
+ "INSERT INTO %s VALUES(417022095255614381,
'China','%s','%s West Town address 5')",
+ tableId, cityName, cityName));
+ }
+
+ private void makeRedoLogForAddressTableInRound(String tableName, int
round) throws Exception {
+ String tableId = ORACLE_SCHEMA + '.' + tableName;
+ String cityName = tableName.split("_")[1];
+ executeSql(
+ format(
+ "UPDATE %s SET COUNTRY = 'CHINA_%s' where id =
416874195632735147",
+ tableId, round));
+ executeSql(
+ format(
+ "INSERT INTO %s VALUES(%d, 'China','%s','%s West Town
address 4')",
+ tableId, 417022095255614380L + round, cityName,
cityName));
+ }
+
+ private void sleepMs(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ private String triggerSavepointWithRetry(JobClient jobClient, String
savepointDirectory)
+ throws ExecutionException, InterruptedException {
+ int retryTimes = 0;
+ // retry 600 times, it takes 100 milliseconds per time, at most retry
1 minute
+ while (retryTimes < 600) {
+ try {
+ return jobClient.triggerSavepoint(savepointDirectory).get();
+ } catch (Exception e) {
+ Optional<CheckpointException> exception =
+ ExceptionUtils.findThrowable(e,
CheckpointException.class);
+ if (exception.isPresent()
+ && exception.get().getMessage().contains("Checkpoint
triggering task")) {
+ Thread.sleep(100);
+ retryTimes++;
+ } else {
+ throw e;
+ }
+ }
+ }
+ return null;
+ }
+
+ private StreamExecutionEnvironment
getStreamExecutionEnvironmentFromSavePoint(
+ String finishedSavePointPath, int parallelism) throws Exception {
+ Configuration configuration = new Configuration();
+ if (finishedSavePointPath != null) {
+ configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
+ }
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
+ return env;
+ }
+
+ private String getCreateTableStatement(
+ Map<String, String> otherOptions, String... captureTableNames) {
+ return String.format(
+ "CREATE TABLE address ("
+ + " table_name STRING METADATA VIRTUAL,"
+ + " ID BIGINT NOT NULL,"
+ + " COUNTRY STRING,"
+ + " CITY STRING,"
+ + " DETAIL_ADDRESS STRING,"
+ + " primary key (CITY, ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'oracle-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = 'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'debezium.log.mining.strategy' = 'online_catalog',"
+ + "
'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ + " 'scan.incremental.snapshot.chunk.size' = '2',"
+ + " 'scan.newly-added-table.enabled' = 'true',"
+ + " 'chunk-meta.group.size' = '2'"
+ + " %s"
+ + ")",
+ ORACLE_CONTAINER.getHost(),
+ ORACLE_CONTAINER.getOraclePort(),
+ ORACLE_CONTAINER.getUsername(),
+ ORACLE_CONTAINER.getPassword(),
+ ORACLE_DATABASE,
+ ORACLE_SCHEMA,
+ getTableNameRegex(captureTableNames),
+ otherOptions.isEmpty()
+ ? ""
+ : ","
+ + otherOptions.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+ "'%s'='%s'",
+ e.getKey(),
e.getValue()))
+ .collect(Collectors.joining(",")));
+ }
+
+ private void executeSql(String sql) throws Exception {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index 3c4a71590..ee5a1c947 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -25,9 +25,9 @@ import
org.apache.flink.cdc.connectors.base.options.StartupOptions;
import
org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import
org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import
org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
+import
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverPhase;
+import
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.FailoverType;
import org.apache.flink.cdc.connectors.oracle.testutils.TestTable;
-import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
-import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -58,6 +58,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@@ -707,54 +708,4 @@ public class OracleSourceITCase extends
OracleSourceTestBase {
statement.execute(sql);
}
}
-
- // ------------------------------------------------------------------------
- // test utilities
- // ------------------------------------------------------------------------
-
- /** The type of failover. */
- private enum FailoverType {
- TM,
- JM,
- NONE
- }
-
- /** The phase of failover. */
- private enum FailoverPhase {
- SNAPSHOT,
- REDO_LOG,
- NEVER
- }
-
- private static void triggerFailover(
- FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable
afterFailAction)
- throws Exception {
- switch (type) {
- case TM:
- restartTaskManager(miniCluster, afterFailAction);
- break;
- case JM:
- triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
- break;
- case NONE:
- break;
- default:
- throw new IllegalStateException("Unexpected value: " + type);
- }
- }
-
- private static void triggerJobManagerFailover(
- JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
- final HaLeadershipControl haLeadershipControl =
miniCluster.getHaLeadershipControl().get();
- haLeadershipControl.revokeJobMasterLeadership(jobId).get();
- afterFailAction.run();
- haLeadershipControl.grantJobMasterLeadership(jobId).get();
- }
-
- private static void restartTaskManager(MiniCluster miniCluster, Runnable
afterFailAction)
- throws Exception {
- miniCluster.terminateTaskManager(0).get();
- afterFailAction.run();
- miniCluster.startTaskManager();
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
index 8fb1157f4..7f8b83b49 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java
@@ -174,7 +174,7 @@ public class OracleSourceTestBase extends TestLogger {
}
// ------------------ utils -----------------------
- private static List<TableId> listTables(Connection connection) {
+ protected static List<TableId> listTables(Connection connection) {
Set<TableId> tableIdSet = new HashSet<>();
String queryTablesSql =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
index 584c8c16b..4fe043a5e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java
@@ -118,7 +118,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -154,7 +155,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -164,6 +166,7 @@ public class OracleTableSourceFactoryTest {
options.put("port", "1521");
options.put("hostname", MY_LOCALHOST);
options.put("debezium.snapshot.mode", "initial");
+ options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(),
"true");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@@ -194,7 +197,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ true);
assertEquals(expectedSource, actualSource);
}
@@ -220,6 +224,7 @@ public class OracleTableSourceFactoryTest {
options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(),
String.valueOf(fetchSize));
options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(),
"true");
options.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.key(),
"true");
+ options.put(SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(),
"true");
options.put(
JdbcSourceOptions.CONNECT_TIMEOUT.key(),
@@ -260,6 +265,7 @@ public class OracleTableSourceFactoryTest {
distributionFactorLower,
null,
true,
+ true,
true);
assertEquals(expectedSource, actualSource);
}
@@ -297,7 +303,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -334,7 +341,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -375,7 +383,8 @@ public class OracleTableSourceFactoryTest {
.defaultValue(),
null,
SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(),
-
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+
SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "table_name",
"schema_name");
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
new file mode 100644
index 000000000..2dd6e3a04
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.testutils;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Oracle test utilities. */
+public class OracleTestUtils {
+
+ /** The type of failover. */
+ public enum FailoverType {
+ TM,
+ JM,
+ NONE
+ }
+
+ /** The phase of failover. */
+ public enum FailoverPhase {
+ SNAPSHOT,
+ REDO_LOG,
+ NEVER
+ }
+
+ public static void triggerFailover(
+ FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable
afterFailAction)
+ throws Exception {
+ switch (type) {
+ case TM:
+ restartTaskManager(miniCluster, afterFailAction);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " + type);
+ }
+ }
+
+ public static void triggerJobManagerFailover(
+ JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
+ final HaLeadershipControl haLeadershipControl =
miniCluster.getHaLeadershipControl().get();
+ haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+ afterFailAction.run();
+ haLeadershipControl.grantJobMasterLeadership(jobId).get();
+ }
+
+ public static void restartTaskManager(MiniCluster miniCluster, Runnable
afterFailAction)
+ throws Exception {
+ miniCluster.terminateTaskManager(0).get();
+ afterFailAction.run();
+ miniCluster.startTaskManager();
+ }
+
+ public static void waitForSinkSize(String sinkName, int expectedSize)
+ throws InterruptedException {
+ while (sinkSize(sinkName) < expectedSize) {
+ Thread.sleep(100);
+ }
+ }
+
+ public static int sinkSize(String sinkName) {
+ synchronized (TestValuesTableFactory.class) {
+ try {
+ return TestValuesTableFactory.getRawResults(sinkName).size();
+ } catch (IllegalArgumentException e) {
+ // job is not started yet
+ return 0;
+ }
+ }
+ }
+
+ public static void waitForUpsertSinkSize(String sinkName, int expectedSize)
+ throws InterruptedException {
+ while (upsertSinkSize(sinkName) < expectedSize) {
+ Thread.sleep(100);
+ }
+ }
+
+ public static int upsertSinkSize(String sinkName) {
+ synchronized (TestValuesTableFactory.class) {
+ try {
+ return TestValuesTableFactory.getResults(sinkName).size();
+ } catch (IllegalArgumentException e) {
+ // job is not started yet
+ return 0;
+ }
+ }
+ }
+
+ public static String getTableNameRegex(String[] captureCustomerTables) {
+ checkState(captureCustomerTables.length > 0);
+ if (captureCustomerTables.length == 1) {
+ return captureCustomerTables[0];
+ } else {
+ // pattern that matches multiple tables
+ return format("(%s)", StringUtils.join(captureCustomerTables,
"|"));
+ }
+ }
+}