This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7c4dfe9f289 [improve](streaming-job) support user-specified mysql
server_id with per-reader assignment (#63490)
7c4dfe9f289 is described below
commit 7c4dfe9f2892284792c5df61994ea82e3e3add97
Author: wudi <[email protected]>
AuthorDate: Thu May 28 15:11:02 2026 +0800
[improve](streaming-job) support user-specified mysql server_id with
per-reader assignment (#63490)
## Summary
- Add an optional `server_id` source property for MySQL CDC streaming
jobs. Accepts a single value (e.g. `5400`) or a range (e.g.
`5400-5408`). When unset, the value is derived from the jobId hash so
existing jobs keep their current server_id when `snapshot_parallelism =
1`.
- Fix a latent collision: when `snapshot_parallelism > 1` and
source-side DML happens during snapshot, all parallel
`SnapshotSplitReader` instances previously shared the same server_id and
their backfill BinaryLogClient connections kicked each other out of
MySQL's dump-thread slot, dropping binlog events between low and high
watermark. Each subtask now gets a distinct server_id from the resolved
range; the single binlog reader uses the range start.
- Cross-field check: reject `server_id` range width smaller than
`snapshot_parallelism` at job startup with a clear fix-it suggestion.
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 2 +
.../streaming/DataSourceConfigValidator.java | 88 +++++++++++++-
.../CdcStreamTableValuedFunction.java | 6 +
.../streaming/DataSourceConfigValidatorTest.java | 122 +++++++++++++++++++
.../source/reader/mysql/MySqlSourceReader.java | 46 ++++---
.../apache/doris/cdcclient/utils/ConfigUtil.java | 29 ++++-
.../doris/cdcclient/utils/ConfigUtilTest.java | 61 ++++++++--
.../cdc/test_streaming_mysql_job_server_id.groovy | 133 +++++++++++++++++++++
8 files changed, 456 insertions(+), 31 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 72322da2668..3708e8dc6a3 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,8 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+ // MySQL CDC client identity. Single value "5400" or range "5400-5408".
+ public static final String SERVER_ID = "server_id";
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";
// PG-style spelling; MySQL normalizes to underscore form.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 1b633605d71..4ca1e605ef5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -53,7 +53,8 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.SSL_MODE,
DataSourceConfigKeys.SSL_ROOTCERT,
DataSourceConfigKeys.SLOT_NAME,
- DataSourceConfigKeys.PUBLICATION_NAME
+ DataSourceConfigKeys.PUBLICATION_NAME,
+ DataSourceConfigKeys.SERVER_ID
);
private static final Set<String> ALLOW_SSL_MODES = Sets.newHashSet(
@@ -122,6 +123,45 @@ public class DataSourceConfigValidator {
"ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA
+ "' requires ssl_rootcert to be set");
}
+
+ validateServerIdConfig(input);
+ }
+
+ // Shared by validateSource and the cdc_stream TVF entrypoint so both
reject malformed
+ // server_id at SQL-analysis time, not as a cdc_client runtime error.
+ public static void validateServerIdConfig(Map<String, String> input)
+ throws IllegalArgumentException {
+ String serverIdValue = input.get(DataSourceConfigKeys.SERVER_ID);
+ if (serverIdValue == null) {
+ return;
+ }
+ int[] range = parseServerIdRange(serverIdValue);
+ if (range == null) {
+ throw new IllegalArgumentException(
+ "Invalid value for key '" + DataSourceConfigKeys.SERVER_ID
+ "': "
+ + serverIdValue
+ + ". Expected a single value (e.g. '5400') or
range (e.g. '5400-5408')"
+ + " with start >= 1 and start <= end.");
+ }
+ String parallelismValue = input.getOrDefault(
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT);
+ Integer parallelism = parsePositiveInt(parallelismValue);
+ if (parallelism == null) {
+ throw new IllegalArgumentException(
+ "Invalid value for key '" +
DataSourceConfigKeys.SNAPSHOT_PARALLELISM
+ + "': " + parallelismValue + ". Expected a
positive integer.");
+ }
+ int width = range[1] - range[0] + 1;
+ // Range must cover every parallel SnapshotSplitReader; cdc_client
throws otherwise.
+ if (width < parallelism) {
+ throw new IllegalArgumentException(
+ "server_id range size " + width
+ + " must be >= snapshot_parallelism " + parallelism
+ + ". Widen the range (e.g. '" + range[0] + "-"
+ + (range[0] + parallelism - 1)
+ + "') or reduce parallelism.");
+ }
}
public static void validateTarget(Map<String, String> input) throws
IllegalArgumentException {
@@ -168,6 +208,9 @@ public class DataSourceConfigValidator {
|| key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
return isPositiveInt(value);
}
+ if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
+ return parseServerIdRange(value) != null;
+ }
return true;
}
@@ -195,6 +238,49 @@ public class DataSourceConfigValidator {
return ALLOW_SSL_MODES.contains(value);
}
+ // Parse "5400" or "5400-5408" into {start, end} inclusive; null on any
malformed input.
+ // Lower bound is 1 because MySQL server_id=0 disables replication.
+ static int[] parseServerIdRange(String value) {
+ if (value == null) {
+ return null;
+ }
+ String trimmed = value.trim();
+ if (trimmed.isEmpty()) {
+ return null;
+ }
+ try {
+ int start;
+ int end;
+ int dash = trimmed.indexOf('-');
+ if (dash < 0) {
+ start = end = Integer.parseInt(trimmed);
+ } else {
+ String left = trimmed.substring(0, dash).trim();
+ String right = trimmed.substring(dash + 1).trim();
+ if (left.isEmpty() || right.isEmpty()) {
+ return null;
+ }
+ start = Integer.parseInt(left);
+ end = Integer.parseInt(right);
+ }
+ if (start < 1 || start > end) {
+ return null;
+ }
+ return new int[] {start, end};
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private static Integer parsePositiveInt(String value) {
+ try {
+ int n = Integer.parseInt(value.trim());
+ return n >= 1 ? n : null;
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
/**
* Check if the offset value is valid for the given data source type.
* Supported: initial, snapshot, latest, JSON binlog/lsn position.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 885a3ec5a33..aa60924601c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -156,6 +156,12 @@ public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunctio
}
validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+ // TVF entrypoint shares server_id checks with the from-to path's
validateSource.
+ try {
+ DataSourceConfigValidator.validateServerIdConfig(properties);
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException(e.getMessage());
+ }
}
private static void validatePositiveIntIfPresent(Map<String, String>
properties, String key)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index 769517d9f65..84cc3bdf5e0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -276,4 +276,126 @@ public class DataSourceConfigValidatorTest {
assertReject(props);
}
}
+
+ // ─── server_id
────────────────────────────────────────────────────────────
+
+ private static Map<String, String> serverIdInput(String value) {
+ Map<String, String> input = new HashMap<>();
+ input.put(DataSourceConfigKeys.SERVER_ID, value);
+ return input;
+ }
+
+ @Test
+ public void testServerIdAcceptsSingleValue() {
+ DataSourceConfigValidator.validateSource(serverIdInput("5400"),
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testServerIdAcceptsRange() {
+ DataSourceConfigValidator.validateSource(
+ serverIdInput("5400-5408"), DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testServerIdAcceptsRangeWithSpaces() {
+ DataSourceConfigValidator.validateSource(
+ serverIdInput(" 5400 - 5408 "), DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testServerIdRejectsMalformed() {
+ // Each entry trips a different code path in parseServerIdRange.
+ String[] invalids = {
+ "abc", // not numeric
+ "5400-", // missing end
+ "-5408", // missing start
+ "5400--", // empty after second dash split
+ "5400-abc", // non-numeric end
+ " ", // blank
+ "" // empty
+ };
+ for (String invalid : invalids) {
+ try {
+ DataSourceConfigValidator.validateSource(
+ serverIdInput(invalid), DataSourceType.MYSQL.name());
+ Assert.fail("Expected IllegalArgumentException for
server_id='" + invalid + "'");
+ } catch (IllegalArgumentException expected) {
+ Assert.assertTrue(
+ "Error message should reference server_id, got: " +
expected.getMessage(),
+ expected.getMessage().contains("server_id"));
+ }
+ }
+ }
+
+ @Test
+ public void testServerIdRejectsZero() {
+ try {
+ DataSourceConfigValidator.validateSource(serverIdInput("0"),
DataSourceType.MYSQL.name());
+ Assert.fail("Expected IllegalArgumentException for server_id='0'");
+ } catch (IllegalArgumentException expected) {
+ Assert.assertTrue(expected.getMessage().contains("server_id"));
+ }
+ }
+
+ @Test
+ public void testServerIdRejectsBackwardRange() {
+ try {
+ DataSourceConfigValidator.validateSource(
+ serverIdInput("5408-5400"), DataSourceType.MYSQL.name());
+ Assert.fail("Expected IllegalArgumentException for
server_id='5408-5400'");
+ } catch (IllegalArgumentException expected) {
+ Assert.assertTrue(expected.getMessage().contains("server_id"));
+ }
+ }
+
+ @Test
+ public void testServerIdRejectsNegative() {
+ try {
+ DataSourceConfigValidator.validateSource(serverIdInput("-5"),
DataSourceType.MYSQL.name());
+ Assert.fail("Expected IllegalArgumentException for
server_id='-5'");
+ } catch (IllegalArgumentException expected) {
+ Assert.assertTrue(expected.getMessage().contains("server_id"));
+ }
+ }
+
+ @Test
+ public void testServerIdCrossFieldWidthRejected() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.SERVER_ID, "5400-5402");
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "8");
+ try {
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ Assert.fail("Expected IllegalArgumentException for range size 3 <
parallelism 8");
+ } catch (IllegalArgumentException expected) {
+ String msg = expected.getMessage();
+ Assert.assertTrue("Message should reference snapshot_parallelism:
" + msg,
+ msg.contains("snapshot_parallelism"));
+ Assert.assertTrue("Message should reference server_id: " + msg,
+ msg.contains("server_id"));
+ }
+ }
+
+ @Test
+ public void testServerIdCrossFieldWidthSinglePassesWhenParallelismOne() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.SERVER_ID, "5400");
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "1");
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testServerIdCrossFieldUsesDefaultParallelism() {
+ // No snapshot_parallelism set -> default ("1"). Single-value
server_id passes.
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.SERVER_ID, "5400");
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testServerIdOptional() {
+ // server_id is optional; absence must not trip cross-field check.
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "4");
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 99ac1e0355b..e5115d1c51a 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -49,6 +49,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter
import
org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
@@ -143,7 +144,6 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
public void initialize(String jobId, DataSource dataSource, Map<String,
String> config) {
this.serializer.init(config);
- // Initialize thread pool for parallel polling
int parallelism =
Integer.parseInt(
config.getOrDefault(
@@ -815,39 +815,49 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config,
int subtaskId) {
- MySqlSourceConfig sourceConfig = getSourceConfig(config);
+ MySqlSourceConfig sourceConfig = getSourceConfig(config, subtaskId);
+ LOG.info(
+ "MySQL CDC snapshot reader[{}] for job {} using server_id={}",
+ subtaskId,
+ config.getJobId(),
+ sourceConfig.getServerIdRange().getServerId(subtaskId));
final MySqlConnection jdbcConnection =
DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient,
jdbcConnection);
- SnapshotSplitReader snapshotReader =
- new SnapshotSplitReader(statefulTaskContext, subtaskId);
- return snapshotReader;
+ return new SnapshotSplitReader(statefulTaskContext, subtaskId);
}
private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) {
- MySqlSourceConfig sourceConfig = getSourceConfig(config);
+ MySqlSourceConfig sourceConfig = getSourceConfig(config, 0);
+ LOG.info(
+ "MySQL CDC binlog reader for job {} using server_id={}",
+ config.getJobId(),
+ sourceConfig.getServerIdRange().getStartServerId());
final MySqlConnection jdbcConnection =
DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient,
jdbcConnection);
- BinlogSplitReader binlogReader = new
BinlogSplitReader(statefulTaskContext, 0);
- return binlogReader;
+ return new BinlogSplitReader(statefulTaskContext, 0);
}
private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
return generateMySqlConfig(config);
}
- /** Generate MySQL source config from JobBaseConfig */
+ private MySqlSourceConfig getSourceConfig(JobBaseConfig config, int
subtaskId) {
+ return generateMySqlConfig(config.getConfig(), config.getJobId(),
subtaskId);
+ }
+
private MySqlSourceConfig generateMySqlConfig(JobBaseConfig config) {
- return generateMySqlConfig(config.getConfig(),
ConfigUtil.getServerId(config.getJobId()));
+ return generateMySqlConfig(config.getConfig(), config.getJobId(), 0);
}
- /** Generate MySQL source config from Map config */
- private MySqlSourceConfig generateMySqlConfig(Map<String, String>
cdcConfig, String serverId) {
+ // Per-subtask config so each reader binds a distinct server_id from the
resolved range.
+ private MySqlSourceConfig generateMySqlConfig(
+ Map<String, String> cdcConfig, String jobId, int subtaskId) {
MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
ConnectionUrl cu =
ConnectionUrl.getConnectionUrlInstance(
@@ -858,7 +868,15 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
String databaseName = cdcConfig.get(DataSourceConfigKeys.DATABASE);
configFactory.databaseList(databaseName);
- configFactory.serverId(serverId);
+ int parallelism =
+ Integer.parseInt(
+ cdcConfig.getOrDefault(
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+ ServerIdRange serverIdRange =
+ ConfigUtil.resolveServerIdRange(
+ jobId, parallelism,
cdcConfig.get(DataSourceConfigKeys.SERVER_ID));
+ configFactory.serverId(serverIdRange.toString());
configFactory.serverTimeZone(
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
@@ -974,7 +992,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
objectPath,
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
}
- return configFactory.createConfig(0);
+ return configFactory.createConfig(subtaskId);
}
private BinlogOffset initializeEffectiveOffset(
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 5aa46753a26..a999f532ea9 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.utils;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import java.time.ZoneId;
import java.util.Arrays;
@@ -43,11 +44,29 @@ public class ConfigUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
private static final Logger LOG =
LoggerFactory.getLogger(ConfigUtil.class);
- public static String getServerId(String jobId) {
- // Use bitwise AND with Integer.MAX_VALUE to strip the sign bit,
- // which avoids the edge case where Math.abs(Integer.MIN_VALUE)
returns MIN_VALUE
- // (negative).
- return String.valueOf(jobId.hashCode() & Integer.MAX_VALUE);
+ // Resolve user-configured range, or derive from jobId hash with width =
parallelism.
+ // Value validation lives in FE DataSourceConfigValidator; here we trust
the input.
+ public static ServerIdRange resolveServerIdRange(
+ String jobId, int snapshotParallelism, String userInput) {
+ ServerIdRange userRange = userInput == null ? null :
ServerIdRange.from(userInput.trim());
+ if (userRange != null) {
+ if (userRange.getNumberOfServerIds() < snapshotParallelism) {
+ throw new IllegalArgumentException(
+ "server_id range size "
+ + userRange.getNumberOfServerIds()
+ + " must be >= snapshot_parallelism "
+ + snapshotParallelism);
+ }
+ return userRange;
+ }
+ int hash = jobId.hashCode() & Integer.MAX_VALUE;
+ int safeMax = Integer.MAX_VALUE - snapshotParallelism + 1;
+ // Use `>` (not `>=`) so parallelism=1 preserves hash==MAX_VALUE for
back-compat.
+ int base = hash > safeMax ? hash % safeMax : hash;
+ if (base == 0) {
+ base = 1;
+ }
+ return new ServerIdRange(base, base + snapshotParallelism - 1);
}
public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index 66d2a76d7c2..9fd6a61cdce 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.cdcclient.utils;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@@ -32,22 +33,60 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Unit tests for {@link ConfigUtil}. */
class ConfigUtilTest {
- // ─── getServerId
──────────────────────────────────────────────────────────
+ // ─── resolveServerIdRange
─────────────────────────────────────────────────
+
+ // Value/format validation lives in FE DataSourceConfigValidator; here we
only verify the
+ // derivation algorithm and that valid input is passed through to
flink-cdc's ServerIdRange.
+
+ @Test
+ void resolveDefaultDeriveSingle() {
+ ServerIdRange range = ConfigUtil.resolveServerIdRange("12345", 1,
null);
+ assertEquals(1, range.getNumberOfServerIds());
+ assertTrue(range.getStartServerId() >= 1);
+ }
+
+ @Test
+ void resolveDefaultDeriveExpandsToParallelism() {
+ ServerIdRange range = ConfigUtil.resolveServerIdRange("12345", 4,
null);
+ assertEquals(4, range.getNumberOfServerIds());
+ assertEquals(range.getStartServerId() + 3, range.getEndServerId());
+ }
+
+ @Test
+ void resolveDefaultDeriveHandlesMinHashCode() {
+ // "polygenelubricants" hashCode() == Integer.MIN_VALUE; & MAX_VALUE
strips sign bit.
+ ServerIdRange range =
+ ConfigUtil.resolveServerIdRange("polygenelubricants", 4, null);
+ assertTrue(range.getStartServerId() >= 1);
+ assertTrue(range.getEndServerId() <= Integer.MAX_VALUE);
+ }
+
+ @Test
+ void resolveDefaultDeriveBumpsZeroHashToOne() {
+ // Empty string hashCode() == 0; bump to 1 because MySQL server_id=0
disables replication.
+ ServerIdRange range = ConfigUtil.resolveServerIdRange("", 1, null);
+ assertEquals(1, range.getStartServerId());
+ }
+
+ @Test
+ void resolveUserSingleValue() {
+ ServerIdRange range = ConfigUtil.resolveServerIdRange("anyjob", 1,
"5400");
+ assertEquals(5400L, range.getStartServerId());
+ assertEquals(5400L, range.getEndServerId());
+ }
@Test
- void serverIdIsNonNegative() {
- // Any jobId hash should produce a non-negative result (bitwise AND
strips sign bit).
- String result = ConfigUtil.getServerId("12345");
- assertTrue(Long.parseLong(result) >= 0, "serverId must be
non-negative");
+ void resolveUserRange() {
+ ServerIdRange range = ConfigUtil.resolveServerIdRange("anyjob", 4,
"5400-5408");
+ assertEquals(5400L, range.getStartServerId());
+ assertEquals(5408L, range.getEndServerId());
}
@Test
- void serverIdHandlesMinHashCode() {
- // Find a string whose hashCode() == Integer.MIN_VALUE to exercise the
edge case
- // where Math.abs(Integer.MIN_VALUE) would return a negative number.
- // "polygenelubricants" is a well-known such string.
- String result = ConfigUtil.getServerId("polygenelubricants");
- assertTrue(Long.parseLong(result) >= 0, "serverId must be non-negative
for MIN_VALUE hash");
+ void resolveRejectsWidthLessThanParallelism() {
+ IllegalArgumentException ex =
assertThrows(IllegalArgumentException.class,
+ () -> ConfigUtil.resolveServerIdRange("anyjob", 4, "5400"));
+ assertTrue(ex.getMessage().contains("snapshot_parallelism"));
}
// ─── getTableList
─────────────────────────────────────────────────────────
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
new file mode 100644
index 00000000000..d6fcbd21978
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_server_id",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ def mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ def externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def s3_endpoint = getS3Endpoint()
+ def bucket = getS3BucketName()
+ def driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ def currentDb = (sql "select database()")[0][0]
+ def mysqlDb = "test_server_id_db"
+ def srcTable = "user_server_id"
+
+ // Prepare source schema + initial data once
+ connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${srcTable}"""
+ sql """CREATE TABLE ${mysqlDb}.${srcTable} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${srcTable} VALUES ('A', 1), ('B', 2)"""
+ }
+
+ def buildCreateJob = { String jobName, String extraProps ->
+ """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${srcTable}",
+ ${extraProps}
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+ }
+
+ def assertCreateFails = { String jobName, String extraProps, String
expectFragment ->
+ sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+ Exception thrown = null
+ try {
+ sql buildCreateJob(jobName, extraProps)
+ } catch (Exception ex) {
+ thrown = ex
+ }
+ assert thrown != null, "CREATE JOB ${jobName} should have failed"
+ def msg = String.valueOf(thrown.message).toLowerCase()
+ assert msg.contains(expectFragment.toLowerCase()),
+ "${jobName}: expected error containing '${expectFragment}',
got: ${thrown.message}"
+ }
+
+ def runHappyPath = { String jobName, String extraProps, Closure
binlogCheck = null ->
+ sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+ sql "DROP TABLE IF EXISTS ${currentDb}.${srcTable} FORCE"
+ try {
+ sql buildCreateJob(jobName, extraProps + ', "offset" = "initial"')
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """
+ select SucceedTaskCount from jobs("type"="insert")
+ where Name = '${jobName}' and ExecuteType='STREAMING'
+ """
+ cnt.size() == 1 &&
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+ })
+ def rows = sql """SELECT name FROM ${currentDb}.${srcTable} ORDER
BY name"""
+ assert rows.size() == 2, "${jobName}: expected 2 rows, got
${rows.size()}"
+ if (binlogCheck != null) {
+ binlogCheck()
+ }
+ } finally {
+ sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+ }
+ }
+
+ // ─── Section 1: FE validator rejects bad server_id at CREATE
─────────────
+ assertCreateFails("test_serverid_reject_format",
+ '"offset" = "initial", "server_id" = "abc"', "server_id")
+ assertCreateFails("test_serverid_reject_zero",
+ '"offset" = "initial", "server_id" = "0"', "server_id")
+ assertCreateFails("test_serverid_reject_backward",
+ '"offset" = "initial", "server_id" = "5408-5400"', "server_id")
+ assertCreateFails("test_serverid_reject_width",
+ '"offset" = "initial", "server_id" = "99500",
"snapshot_parallelism" = "2"',
+ "snapshot_parallelism")
+
+ // ─── Section 2: happy path — job runs, data syncs
────────────────────────
+ // single value + binlog increment covers BinlogSplitReader binding
startServerId.
+ runHappyPath("test_serverid_single", '"server_id" = "99001"') {
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql "INSERT INTO ${mysqlDb}.${srcTable} VALUES ('C', 3)"
+ sql "UPDATE ${mysqlDb}.${srcTable} SET age = 99 WHERE name = 'A'"
+ sql "DELETE FROM ${mysqlDb}.${srcTable} WHERE name = 'B'"
+ }
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def after = sql "SELECT name, age FROM ${currentDb}.${srcTable}
ORDER BY name"
+ after.size() == 2 && after[0][0] == 'A' && after[0][1] == 99 &&
after[1][0] == 'C'
+ })
+ }
+ runHappyPath("test_serverid_range",
+ '"server_id" = "99100-99103", "snapshot_parallelism" = "4",
"snapshot_split_size" = "1"')
+ runHappyPath("test_serverid_default", '"snapshot_parallelism" = "2"')
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]