This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a41423e00 [FLINK-36631][source-connector][oracle] Add oracle read data
by specific offset (#3675)
a41423e00 is described below
commit a41423e0082ad5e9eac936e07f87c6c52f9b8b1a
Author: wudi <[email protected]>
AuthorDate: Sun Oct 5 19:40:59 2025 +0800
[FLINK-36631][source-connector][oracle] Add oracle read data by specific
offset (#3675)
---
.../docs/connectors/flink-sources/oracle-cdc.md | 10 ++-
.../docs/connectors/flink-sources/oracle-cdc.md | 10 ++-
.../base/config/JdbcSourceConfigFactory.java | 1 +
.../connectors/base/options/StartupOptions.java | 41 +++++++---
.../base/source/assigner/StreamSplitAssigner.java | 13 ++-
.../oracle/table/OracleTableSourceFactory.java | 34 +++++++-
.../oracle/source/OracleSourceITCase.java | 93 ++++++++++++++++++++++
7 files changed, 180 insertions(+), 22 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
index 596e20c1d..2e5c1dd60 100644
--- a/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md
@@ -351,9 +351,16 @@ Connector Options
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td>Optional startup mode for Oracle CDC consumer, valid enumerations
are "initial"
- and "latest-offset".
+ , "latest-offset" , specific-offset.
Please see <a href="#startup-reading-position">Startup Reading
Position</a> section for more detailed information.</td>
</tr>
+ <tr>
+ <td>scan.startup.specific-offset.scn</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional SCN used in case of "specific-offset" startup mode</td>
+ </tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
<td>optional</td>
@@ -543,6 +550,7 @@ The config option `scan.startup.mode` specifies the startup
mode for Oracle CDC
- `initial` (default): Performs an initial snapshot on the monitored database
tables upon first startup, and continue to read the latest redo log.
- `latest-offset`: Never to perform a snapshot on the monitored database
tables upon first startup, just read from
the change since the connector was started.
+- `specific-offset`: Skip snapshot phase and start reading redo log from a
specific offset with scn.
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's
`snapshot.mode` configuration. So please do not use them together. If you
specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the
table DDL, it may make `scan.startup.mode` doesn't work._
diff --git a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
index 596c908b4..260e9635a 100644
--- a/docs/content/docs/connectors/flink-sources/oracle-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/oracle-cdc.md
@@ -352,9 +352,16 @@ Connector Options
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td>Optional startup mode for Oracle CDC consumer, valid enumerations
are "initial"
- and "latest-offset".
+ , "latest-offset" , specific-offset.
Please see <a href="#startup-reading-position">Startup Reading
Position</a> section for more detailed information.</td>
</tr>
+ <tr>
+ <td>scan.startup.specific-offset.scn</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional SCN used in case of "specific-offset" startup mode</td>
+ </tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
<td>optional</td>
@@ -544,6 +551,7 @@ The config option `scan.startup.mode` specifies the startup
mode for Oracle CDC
- `initial` (default): Performs an initial snapshot on the monitored database
tables upon first startup, and continue to read the latest redo log.
- `latest-offset`: Never to perform a snapshot on the monitored database
tables upon first startup, just read from
the change since the connector was started.
+- `specific-offset`: Skip snapshot phase and start reading redo log from a
specific offset with scn.
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's
`snapshot.mode` configuration. So please do not use them together. If you
specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the
table DDL, it may make `scan.startup.mode` doesn't work._
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
index 82034f488..1c21a8796 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java
@@ -209,6 +209,7 @@ public abstract class JdbcSourceConfigFactory implements
Factory<JdbcSourceConfi
case INITIAL:
case SNAPSHOT:
case LATEST_OFFSET:
+ case SPECIFIC_OFFSETS:
case COMMITTED_OFFSETS:
break;
default:
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
index c397a8188..21fd23b45 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java
@@ -17,7 +17,10 @@
package org.apache.flink.cdc.connectors.base.options;
+import org.apache.flink.util.CollectionUtil;
+
import java.io.Serializable;
+import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -30,13 +33,14 @@ public final class StartupOptions implements Serializable {
public final String specificOffsetFile;
public final Integer specificOffsetPos;
public final Long startupTimestampMillis;
+ public final Map<String, String> offset;
/**
* Performs an initial snapshot on the monitored database tables upon
first startup, and
* continue to read the latest change log.
*/
public static StartupOptions initial() {
- return new StartupOptions(StartupMode.INITIAL, null, null, null);
+ return new StartupOptions(StartupMode.INITIAL, null, null, null, null);
}
/**
@@ -44,7 +48,7 @@ public final class StartupOptions implements Serializable {
* read the change log anymore .
*/
public static StartupOptions snapshot() {
- return new StartupOptions(StartupMode.SNAPSHOT, null, null, null);
+ return new StartupOptions(StartupMode.SNAPSHOT, null, null, null,
null);
}
/**
@@ -53,7 +57,7 @@ public final class StartupOptions implements Serializable {
* change log is guaranteed to contain the entire history of the database.
*/
public static StartupOptions earliest() {
- return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null,
null);
+ return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null,
null, null);
}
/**
@@ -61,7 +65,7 @@ public final class StartupOptions implements Serializable {
* the end of the change log which means only have the changes since the
connector was started.
*/
public static StartupOptions latest() {
- return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
+ return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null,
null);
}
/**
@@ -70,7 +74,7 @@ public final class StartupOptions implements Serializable {
* the connector last stopped.
*/
public static StartupOptions committed() {
- return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null,
null);
+ return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null,
null, null);
}
/**
@@ -79,7 +83,11 @@ public final class StartupOptions implements Serializable {
*/
public static StartupOptions specificOffset(String specificOffsetFile, int
specificOffsetPos) {
return new StartupOptions(
- StartupMode.SPECIFIC_OFFSETS, specificOffsetFile,
specificOffsetPos, null);
+ StartupMode.SPECIFIC_OFFSETS, specificOffsetFile,
specificOffsetPos, null, null);
+ }
+
+ public static StartupOptions specificOffset(Map<String, String> offset) {
+ return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, null, null,
null, offset);
}
/**
@@ -92,18 +100,20 @@ public final class StartupOptions implements Serializable {
* @param startupTimestampMillis timestamp for the startup offsets, as
milliseconds from epoch.
*/
public static StartupOptions timestamp(long startupTimestampMillis) {
- return new StartupOptions(StartupMode.TIMESTAMP, null, null,
startupTimestampMillis);
+ return new StartupOptions(StartupMode.TIMESTAMP, null, null,
startupTimestampMillis, null);
}
private StartupOptions(
StartupMode startupMode,
String specificOffsetFile,
Integer specificOffsetPos,
- Long startupTimestampMillis) {
+ Long startupTimestampMillis,
+ Map<String, String> offset) {
this.startupMode = startupMode;
this.specificOffsetFile = specificOffsetFile;
this.specificOffsetPos = specificOffsetPos;
this.startupTimestampMillis = startupTimestampMillis;
+ this.offset = offset;
switch (startupMode) {
case INITIAL:
@@ -113,8 +123,10 @@ public final class StartupOptions implements Serializable {
case COMMITTED_OFFSETS:
break;
case SPECIFIC_OFFSETS:
- checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't
be null");
- checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't
be null");
+ if (CollectionUtil.isNullOrEmpty(offset)) {
+ checkNotNull(specificOffsetFile, "specificOffsetFile
shouldn't be null");
+ checkNotNull(specificOffsetPos, "specificOffsetPos
shouldn't be null");
+ }
break;
case TIMESTAMP:
checkNotNull(startupTimestampMillis, "startupTimestampMillis
shouldn't be null");
@@ -124,6 +136,10 @@ public final class StartupOptions implements Serializable {
}
}
+ public Map<String, String> getOffset() {
+ return offset;
+ }
+
public boolean isStreamOnly() {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
@@ -148,12 +164,13 @@ public final class StartupOptions implements Serializable
{
return startupMode == that.startupMode
&& Objects.equals(specificOffsetFile, that.specificOffsetFile)
&& Objects.equals(specificOffsetPos, that.specificOffsetPos)
- && Objects.equals(startupTimestampMillis,
that.startupTimestampMillis);
+ && Objects.equals(startupTimestampMillis,
that.startupTimestampMillis)
+ && Objects.equals(offset, that.offset);
}
@Override
public int hashCode() {
return Objects.hash(
- startupMode, specificOffsetFile, specificOffsetPos,
startupTimestampMillis);
+ startupMode, specificOffsetFile, specificOffsetPos,
startupTimestampMillis, offset);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index 758538811..81860b2cd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -30,6 +30,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import
org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
+import org.apache.flink.util.CollectionUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -182,10 +183,14 @@ public class StreamSplitAssigner implements SplitAssigner
{
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
break;
case SPECIFIC_OFFSETS:
- startingOffset =
- offsetFactory.newOffset(
- startupOptions.specificOffsetFile,
- startupOptions.specificOffsetPos.longValue());
+ if (CollectionUtil.isNullOrEmpty(startupOptions.getOffset())) {
+ startingOffset =
+ offsetFactory.newOffset(
+ startupOptions.specificOffsetFile,
+
startupOptions.specificOffsetPos.longValue());
+ } else {
+ startingOffset =
offsetFactory.newOffset(startupOptions.getOffset());
+ }
break;
case COMMITTED_OFFSETS:
startingOffset = dialect.displayCommittedOffset(sourceConfig);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
index 31df24ed8..9f3455749 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java
@@ -30,7 +30,9 @@ import
org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
@@ -70,7 +72,9 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
- helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
+ helper.validateExcept(
+ DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX,
+ SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX);
final ReadableConfig config = helper.getOptions();
String url = config.get(URL);
@@ -91,7 +95,9 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
- StartupOptions startupOptions = getStartupOptions(config);
+
+ StartupOptions startupOptions =
+ getStartupOptions(config,
context.getCatalogTable().getOptions());
ResolvedSchema physicalSchema =
context.getCatalogTable().getResolvedSchema();
boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
@@ -195,8 +201,12 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST =
"latest-offset";
+ private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS =
"specific-offset";
+ private static final String
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX =
+ "scan.startup.specific-offset.";
- private static StartupOptions getStartupOptions(ReadableConfig config) {
+ private static StartupOptions getStartupOptions(
+ ReadableConfig config, Map<String, String> options) {
String modeString = config.get(SCAN_STARTUP_MODE);
switch (modeString.toLowerCase()) {
@@ -206,7 +216,9 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
-
+ case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+ Map<String, String> offsetMap = getSpecificOffsetMap(options);
+ return StartupOptions.specificOffset(offsetMap);
default:
throw new ValidationException(
String.format(
@@ -219,6 +231,20 @@ public class OracleTableSourceFactory implements
DynamicTableSourceFactory {
}
}
+ private static Map<String, String> getSpecificOffsetMap(Map<String,
String> options) {
+ Map<String, String> offset = new HashMap<>();
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ if
(entry.getKey().startsWith(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX)) {
+ String subKey =
+ entry.getKey()
+ .substring(
+
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX.length());
+ offset.put(subKey, entry.getValue());
+ }
+ }
+ return offset;
+ }
+
/** Checks the value of given integer option is valid. */
private void validateIntegerOption(
ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
index e61c433d4..f06dd187c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java
@@ -34,6 +34,7 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -46,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
@@ -58,6 +60,7 @@ import java.util.stream.Collectors;
import static java.lang.String.format;
import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.triggerFailover;
+import static
org.apache.flink.cdc.connectors.oracle.testutils.OracleTestUtils.waitForSinkSize;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.catalog.Column.physical;
@@ -395,6 +398,96 @@ public class OracleSourceITCase extends
OracleSourceTestBase {
// since `scan.incremental.snapshot.chunk.key-column` is set, an
exception should not occur.
}
+ @Test
+ public void testTableWithSpecificOffset() throws Exception {
+ createAndInitialize("customer.sql");
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ env.setParallelism(1);
+ env.enableCheckpointing(5000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ long currentScn = 0L;
+ try (Connection connection = getJdbcConnectionAsDBA();
+ Statement statement = connection.createStatement()) {
+
+ // get current scn
+ ResultSet rs = statement.executeQuery("SELECT CURRENT_SCN FROM
V$DATABASE");
+ while (rs.next()) {
+ currentScn = rs.getLong("CURRENT_SCN");
+ }
+ LOG.info("Current Scn is {}", currentScn);
+ }
+
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ // mock incremental data
+ statement.execute(
+ String.format(
+ "INSERT INTO %s.%s VALUES (9999, 'user_offset',
'Shanghai', '123567891234')",
+ ORACLE_SCHEMA, "CUSTOMERS"));
+ LOG.info("mock incremental data success");
+ }
+
+ String sourceDDL =
+ format(
+ "CREATE TABLE customers ("
+ + " ID INT NOT NULL,"
+ + " NAME STRING,"
+ + " ADDRESS STRING,"
+ + " PHONE_NUMBER STRING,"
+ + " primary key (ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'oracle-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = 'CUSTOMERS',"
+ + " 'scan.incremental.snapshot.enabled' =
'true',"
+ + " 'debezium.log.mining.strategy' =
'online_catalog',"
+ + "
'debezium.database.history.store.only.captured.tables.ddl' = 'true',"
+ + " 'scan.startup.mode' = 'specific-offset',"
+ + " 'scan.startup.specific-offset.scn' = '%s'"
+ + ")",
+ ORACLE_CONTAINER.getHost(),
+ ORACLE_CONTAINER.getOraclePort(),
+ ORACLE_CONTAINER.getUsername(),
+ ORACLE_CONTAINER.getPassword(),
+ ORACLE_DATABASE,
+ ORACLE_SCHEMA,
+ currentScn);
+
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " ID INT NOT NULL,"
+ + " NAME STRING,"
+ + " ADDRESS STRING,"
+ + " PHONE_NUMBER STRING,"
+ + " primary key (ID) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false',"
+ + " 'sink-expected-messages-num' = '1'"
+ + ")";
+
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+ // async submit job
+ TableResult result =
+ tEnv.executeSql(
+ "INSERT INTO sink SELECT ID,NAME, ADDRESS,PHONE_NUMBER
FROM customers");
+ waitForSinkSize("sink", 1);
+
+ List<String> actual =
TestValuesTableFactory.getResultsAsStrings("sink");
+
+ String[] expected = new String[] {"+I[9999, user_offset, Shanghai,
123567891234]"};
+ assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+ result.getJobClient().get().cancel().get();
+ }
+
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,