This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a41423e00 [FLINK-36631][source-connector][oracle] Add oracle read data 
by specific offset (#3675)
a41423e00 is described below

commit a41423e0082ad5e9eac936e07f87c6c52f9b8b1a
Author: wudi <[email protected]>
AuthorDate: Sun Oct 5 19:40:59 2025 +0800

    [FLINK-36631][source-connector][oracle] Add oracle read data by specific 
offset (#3675)
---
 .../docs/connectors/flink-sources/oracle-cdc.md    | 10 ++-
 .../docs/connectors/flink-sources/oracle-cdc.md    | 10 ++-
 .../base/config/JdbcSourceConfigFactory.java       |  1 +
 .../connectors/base/options/StartupOptions.java    | 41 +++++++---
 .../base/source/assigner/StreamSplitAssigner.java  | 13 ++-
 .../oracle/table/OracleTableSourceFactory.java     | 34 +++++++-
 .../oracle/source/OracleSourceITCase.java          | 93 ++++++++++++++++++++++
 7 files changed, 180 insertions(+), 22 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
index 596e20c1d..2e5c1dd60 100644
--- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
@@ -351,9 +351,16 @@ Connector Options
       <td style="word-wrap: break-word;">initial</td>
       <td>String</td>
       <td>Optional startup mode for Oracle CDC consumer, valid enumerations 
are "initial"
-           and "latest-offset". 
+           , "latest-offset" , specific-offset. 
            Please see <a href="#startup-reading-position">Startup Reading 
Position</a> section for more detailed information.</td>
     </tr>
+    <tr>
+      <td>scan.startup.specific-offset.scn</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Long</td>
+      <td>Optional SCN used in case of "specific-offset" startup mode</td>
+    </tr>
     <tr>
           <td>scan.incremental.snapshot.enabled</td>
           <td>optional</td>
@@ -543,6 +550,7 @@ The config option `scan.startup.mode` specifies the startup 
mode for Oracle CDC
 - `initial` (default): Performs an initial snapshot on the monitored database 
tables upon first startup, and continue to read the latest redo log.
 - `latest-offset`: Never to perform a snapshot on the monitored database 
tables upon first startup, just read from
   the change since the connector was started.
+- `specific-offset`: Skip snapshot phase and start reading redo log from a 
specific offset with scn.
 
 _Note: the mechanism of `scan.startup.mode` option relying on Debezium's 
`snapshot.mode` configuration. So please do not use them together. If you 
specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the 
table DDL, it may make `scan.startup.mode` doesn't work._
 
diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md 
b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
index 596c908b4..260e9635a 100644
--- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
@@ -352,9 +352,16 @@ Connector Options
       <td style="word-wrap: break-word;">initial</td>
       <td>String</td>
       <td>Optional startup mode for Oracle CDC consumer, valid enumerations 
are "initial"
-           and "latest-offset". 
+           , "latest-offset" , specific-offset. 
            Please see <a href="#startup-reading-position">Startup Reading 
Position</a> section for more detailed information.</td>
     </tr>
+    <tr>
+      <td>scan.startup.specific-offset.scn</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Long</td>
+      <td>Optional SCN used in case of "specific-offset" startup mode</td>
+    </tr>
     <tr>
           <td>scan.incremental.snapshot.enabled</td>
           <td>optional</td>
@@ -544,6 +551,7 @@ The config option `scan.startup.mode` specifies the startup 
mode for Oracle CDC
 - `initial` (default): Performs an initial snapshot on the monitored database 
tables upon first startup, and continue to read the latest redo log.
 - `latest-offset`: Never to perform a snapshot on the monitored database 
tables upon first startup, just read from
   the change since the connector was started.
+- `specific-offset`: Skip snapshot phase and start reading redo log from a 
specific offset with scn.
 
 _Note: the mechanism of `scan.startup.mode` option relying on Debezium's 
`snapshot.mode` configuration. So please do not use them together. If you 
specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the 
table DDL, it may make `scan.startup.mode` doesn't work._
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 82034f488..1c21a8796 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -209,6 +209,7 @@ public abstract class JdbcSourceConfigFactory implements 
Factory<JdbcSourceConfi
             case INITIAL:
             case SNAPSHOT:
             case LATEST_OFFSET:
+            case SPECIFIC_OFFSETS:
             case COMMITTED_OFFSETS:
                 break;
             default:
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
index c397a8188..21fd23b45 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.cdc.connectors.base.options;
 
+import org.apache.flink.util.CollectionUtil;
+
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -30,13 +33,14 @@ public final class StartupOptions implements Serializable {
     public final String specificOffsetFile;
     public final Integer specificOffsetPos;
     public final Long startupTimestampMillis;
+    public final Map<String, String> offset;
 
     /**
      * Performs an initial snapshot on the monitored database tables upon 
first startup, and
      * continue to read the latest change log.
      */
     public static StartupOptions initial() {
-        return new StartupOptions(StartupMode.INITIAL, null, null, null);
+        return new StartupOptions(StartupMode.INITIAL, null, null, null, null);
     }
 
     /**
@@ -44,7 +48,7 @@ public final class StartupOptions implements Serializable {
      * read the change log anymore .
      */
     public static StartupOptions snapshot() {
-        return new StartupOptions(StartupMode.SNAPSHOT, null, null, null);
+        return new StartupOptions(StartupMode.SNAPSHOT, null, null, null, 
null);
     }
 
     /**
@@ -53,7 +57,7 @@ public final class StartupOptions implements Serializable {
      * change log is guaranteed to contain the entire history of the database.
      */
     public static StartupOptions earliest() {
-        return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, 
null);
+        return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, 
null, null);
     }
 
     /**
@@ -61,7 +65,7 @@ public final class StartupOptions implements Serializable {
      * the end of the change log which means only have the changes since the 
connector was started.
      */
     public static StartupOptions latest() {
-        return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+        return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null, 
null);
     }
 
     /**
@@ -70,7 +74,7 @@ public final class StartupOptions implements Serializable {
      * the connector last stopped.
      */
     public static StartupOptions committed() {
-        return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null, 
null);
+        return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null, 
null, null);
     }
 
     /**
@@ -79,7 +83,11 @@ public final class StartupOptions implements Serializable {
      */
     public static StartupOptions specificOffset(String specificOffsetFile, int 
specificOffsetPos) {
         return new StartupOptions(
-                StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, 
specificOffsetPos, null);
+                StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, 
specificOffsetPos, null, null);
+    }
+
+    public static StartupOptions specificOffset(Map<String, String> offset) {
+        return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, null, null, 
null, offset);
     }
 
     /**
@@ -92,18 +100,20 @@ public final class StartupOptions implements Serializable {
      * @param startupTimestampMillis timestamp for the startup offsets, as 
milliseconds from epoch.
      */
     public static StartupOptions timestamp(long startupTimestampMillis) {
-        return new StartupOptions(StartupMode.TIMESTAMP, null, null, 
startupTimestampMillis);
+        return new StartupOptions(StartupMode.TIMESTAMP, null, null, 
startupTimestampMillis, null);
     }
 
     private StartupOptions(
             StartupMode startupMode,
             String specificOffsetFile,
             Integer specificOffsetPos,
-            Long startupTimestampMillis) {
+            Long startupTimestampMillis,
+            Map<String, String> offset) {
         this.startupMode = startupMode;
         this.specificOffsetFile = specificOffsetFile;
         this.specificOffsetPos = specificOffsetPos;
         this.startupTimestampMillis = startupTimestampMillis;
+        this.offset = offset;
 
         switch (startupMode) {
             case INITIAL:
@@ -113,8 +123,10 @@ public final class StartupOptions implements Serializable {
             case COMMITTED_OFFSETS:
                 break;
             case SPECIFIC_OFFSETS:
-                checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't 
be null");
-                checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't 
be null");
+                if (CollectionUtil.isNullOrEmpty(offset)) {
+                    checkNotNull(specificOffsetFile, "specificOffsetFile 
shouldn't be null");
+                    checkNotNull(specificOffsetPos, "specificOffsetPos 
shouldn't be null");
+                }
                 break;
             case TIMESTAMP:
                 checkNotNull(startupTimestampMillis, "startupTimestampMillis 
shouldn't be null");
@@ -124,6 +136,10 @@ public final class StartupOptions implements Serializable {
         }
     }
 
+    public Map<String, String> getOffset() {
+        return offset;
+    }
+
     public boolean isStreamOnly() {
         return startupMode == StartupMode.EARLIEST_OFFSET
                 || startupMode == StartupMode.LATEST_OFFSET
@@ -148,12 +164,13 @@ public final class StartupOptions implements Serializable 
{
         return startupMode == that.startupMode
                 && Objects.equals(specificOffsetFile, that.specificOffsetFile)
                 && Objects.equals(specificOffsetPos, that.specificOffsetPos)
-                && Objects.equals(startupTimestampMillis, 
that.startupTimestampMillis);
+                && Objects.equals(startupTimestampMillis, 
that.startupTimestampMillis)
+                && Objects.equals(offset, that.offset);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                startupMode, specificOffsetFile, specificOffsetPos, 
startupTimestampMillis);
+                startupMode, specificOffsetFile, specificOffsetPos, 
startupTimestampMillis, offset);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index 758538811..81860b2cd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
 import 
org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
+import org.apache.flink.util.CollectionUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -182,10 +183,14 @@ public class StreamSplitAssigner implements SplitAssigner 
{
                         
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
                 break;
             case SPECIFIC_OFFSETS:
-                startingOffset =
-                        offsetFactory.newOffset(
-                                startupOptions.specificOffsetFile,
-                                startupOptions.specificOffsetPos.longValue());
+                if (CollectionUtil.isNullOrEmpty(startupOptions.getOffset())) {
+                    startingOffset =
+                            offsetFactory.newOffset(
+                                    startupOptions.specificOffsetFile,
+                                    
startupOptions.specificOffsetPos.longValue());
+                } else {
+                    startingOffset = 
offsetFactory.newOffset(startupOptions.getOffset());
+                }
                 break;
             case COMMITTED_OFFSETS:
                 startingOffset = dialect.displayCommittedOffset(sourceConfig);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 31df24ed8..9f3455749 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -30,7 +30,9 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
@@ -70,7 +72,9 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
                 FactoryUtil.createTableFactoryHelper(this, context);
-        helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
+        helper.validateExcept(
+                DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX,
+                SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX);
 
         final ReadableConfig config = helper.getOptions();
         String url = config.get(URL);
@@ -91,7 +95,9 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         String tableName = config.get(TABLE_NAME);
         String schemaName = config.get(SCHEMA_NAME);
         int port = config.get(PORT);
-        StartupOptions startupOptions = getStartupOptions(config);
+
+        StartupOptions startupOptions =
+                getStartupOptions(config, 
context.getCatalogTable().getOptions());
         ResolvedSchema physicalSchema = 
context.getCatalogTable().getResolvedSchema();
 
         boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
@@ -195,8 +201,12 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
     private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
     private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
     private static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+    private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = 
"specific-offset";
+    private static final String 
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX =
+            "scan.startup.specific-offset.";
 
-    private static StartupOptions getStartupOptions(ReadableConfig config) {
+    private static StartupOptions getStartupOptions(
+            ReadableConfig config, Map<String, String> options) {
         String modeString = config.get(SCAN_STARTUP_MODE);
 
         switch (modeString.toLowerCase()) {
@@ -206,7 +216,9 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
                 return StartupOptions.snapshot();
             case SCAN_STARTUP_MODE_VALUE_LATEST:
                 return StartupOptions.latest();
-
+            case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+                Map<String, String> offsetMap = getSpecificOffsetMap(options);
+                return StartupOptions.specificOffset(offsetMap);
             default:
                 throw new ValidationException(
                         String.format(
@@ -219,6 +231,20 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         }
     }
 
+    private static Map<String, String> getSpecificOffsetMap(Map<String, 
String> options) {
+        Map<String, String> offset = new HashMap<>();
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+            if 
(entry.getKey().startsWith(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX)) {
+                String subKey =
+                        entry.getKey()
+                                .substring(
+                                        
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX.length());
+                offset.put(subKey, entry.getValue());
+            }
+        }
+        return offset;
+    }
+
     /** Checks the value of given integer option is valid. */
     private void validateIntegerOption(
             ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index e61c433d4..f06dd187c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
@@ -46,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -58,6 +60,7 @@ import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
+import static 
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.catalog.Column.physical;
@@ -395,6 +398,96 @@ public class OracleSourceITCase extends 
OracleSourceTestBase {
         // since `scan.incremental.snapshot.chunk.key-column` is set, an 
exception should not occur.
     }
 
+    @Test
+    public void testTableWithSpecificOffset() throws Exception {
+        createAndInitialize("customer.sql");
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        env.setParallelism(1);
+        env.enableCheckpointing(5000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        long currentScn = 0L;
+        try (Connection connection = getJdbcConnectionAsDBA();
+                Statement statement = connection.createStatement()) {
+
+            // get current scn
+            ResultSet rs = statement.executeQuery("SELECT CURRENT_SCN FROM 
V$DATABASE");
+            while (rs.next()) {
+                currentScn = rs.getLong("CURRENT_SCN");
+            }
+            LOG.info("Current Scn is {}", currentScn);
+        }
+
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            // mock incremental data
+            statement.execute(
+                    String.format(
+                            "INSERT INTO %s.%s VALUES (9999, 'user_offset', 
'Shanghai', '123567891234')",
+                            ORACLE_SCHEMA, "CUSTOMERS"));
+            LOG.info("mock incremental data success");
+        }
+
+        String sourceDDL =
+                format(
+                        "CREATE TABLE customers ("
+                                + " ID INT NOT NULL,"
+                                + " NAME STRING,"
+                                + " ADDRESS STRING,"
+                                + " PHONE_NUMBER STRING,"
+                                + " primary key (ID) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'oracle-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = 'CUSTOMERS',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'debezium.log.mining.strategy' = 
'online_catalog',"
+                                + " 
'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+                                + " 'scan.startup.mode' = 'specific-offset',"
+                                + " 'scan.startup.specific-offset.scn' = '%s'"
+                                + ")",
+                        ORACLE_CONTAINER.getHost(),
+                        ORACLE_CONTAINER.getOraclePort(),
+                        ORACLE_CONTAINER.getUsername(),
+                        ORACLE_CONTAINER.getPassword(),
+                        ORACLE_DATABASE,
+                        ORACLE_SCHEMA,
+                        currentScn);
+
+        String sinkDDL =
+                "CREATE TABLE sink ("
+                        + " ID INT NOT NULL,"
+                        + " NAME STRING,"
+                        + " ADDRESS STRING,"
+                        + " PHONE_NUMBER STRING,"
+                        + " primary key (ID) not enforced"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false',"
+                        + " 'sink-expected-messages-num' = '1'"
+                        + ")";
+
+        tEnv.executeSql(sourceDDL);
+        tEnv.executeSql(sinkDDL);
+        // async submit job
+        TableResult result =
+                tEnv.executeSql(
+                        "INSERT INTO sink SELECT ID,NAME, ADDRESS,PHONE_NUMBER 
FROM customers");
+        waitForSinkSize("sink", 1);
+
+        List<String> actual = 
TestValuesTableFactory.getResultsAsStrings("sink");
+
+        String[] expected = new String[] {"+I[9999, user_offset, Shanghai, 
123567891234]"};
+        assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+        result.getJobClient().get().cancel().get();
+    }
+
     private List<String> testBackfillWhenWritingEvents(
             boolean skipSnapshotBackfill,
             int fetchSize,

Reply via email to