This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4337c56e98c [fix](streamingjob) Cap debezium ChangeEventQueue with a
heap-adaptive byte limit to avoid OOM (#64511)
4337c56e98c is described below
commit 4337c56e98c18702162995f940ed00538482a84c
Author: wudi <[email protected]>
AuthorDate: Mon Jun 22 10:17:56 2026 +0800
[fix](streamingjob) Cap debezium ChangeEventQueue with a heap-adaptive byte
limit to avoid OOM (#64511)
### What problem does this PR solve?
The cdc_client builds debezium's `ChangeEventQueue` with only a
count-based bound (`max.queue.size=8192`) while the byte bound
(`max.queue.size.in.bytes`) defaults to `0` (disabled). With wide rows
(e.g. ~2MB each), the in-memory queue can grow to `2MB * 8192 ≈ 16GB`
and OOM the process. Both PostgreSQL and MySQL paths build the queue
from `getMaxQueueSizeInBytes()`, so a single property covers both, and
it applies to both the snapshot and streaming phases.
### What this PR does
Set a heap-adaptive byte cap on the queue buffer in
`ConfigUtil.getDefaultDebeziumProps()`, which is shared by the Postgres
and MySQL source readers:
- Default cap is `clamp(heap/16, 64MB, 256MB)`: heap 1G -> 64MB, 2G ->
128MB, >= 4G -> 256MB.
- The cap is intentionally conservative because a single cdc_client JVM
can run many queues concurrently (one per split, across multiple jobs),
and the real batching/backpressure happens downstream in the sink rather
than in this queue.
- Escape hatch: `-Dcdc.max.queue.size.in.bytes=<bytes>` overrides the
adaptive value (absolute bytes; `<= 0` disables the byte bound).
Narrow tables are unaffected: 8192 rows stay well under 64MB, so the
count bound is reached first and behavior is unchanged.
---
be/src/runtime/cdc_client_mgr.cpp | 5 ++
fs_brokers/cdc_client/pom.xml | 14 +++++
.../apache/doris/cdcclient/utils/ConfigUtil.java | 27 ++++++++++
.../doris/cdcclient/utils/ConfigUtilTest.java | 59 ++++++++++++++++++++++
4 files changed, 105 insertions(+)
diff --git a/be/src/runtime/cdc_client_mgr.cpp
b/be/src/runtime/cdc_client_mgr.cpp
index b37cadc980c..b60c2c60bb1 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -214,6 +214,11 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
argv_storage.emplace_back(java_opts);
// OOM safety net (last-wins, user opts cannot disable).
argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
+ // JDK17 opens for debezium ObjectSizeCalculator reflection.
+ argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED");
+ argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED");
argv_storage.emplace_back("-jar");
argv_storage.emplace_back(cdc_jar_path);
argv_storage.emplace_back(cdc_jar_port);
diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index a88bddad683..0a0c7355318 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -76,6 +76,11 @@ under the License.
<assertj.version>3.27.7</assertj.version>
<awaitility.version>4.2.1</awaitility.version>
<maven-failsafe-plugin.version>3.2.5</maven-failsafe-plugin.version>
+ <maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
+ <!-- JDK17 opens for debezium ObjectSizeCalculator reflection
(byte-sized queue).
+ Mirrors the set CdcClientMgr passes when BE forks cdc-client.jar;
needed because
+ surefire/failsafe run the reader path directly without going
through that fork. -->
+ <test.add.opens>--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.math=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED</test.add.opens>
</properties>
<dependencies>
@@ -258,6 +263,14 @@ under the License.
<target>17</target>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <argLine>${test.add.opens}</argLine>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
@@ -268,6 +281,7 @@ under the License.
</includes>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
+ <argLine>${test.add.opens}</argLine>
</configuration>
<executions>
<execution>
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index a999f532ea9..cc2fa9da68b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.mysql.cj.conf.ConnectionUrl;
+import io.debezium.config.CommonConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,9 +124,35 @@ public class ConfigUtil {
return ZoneId.systemDefault();
}
+ public static final String MAX_QUEUE_BYTES_SYS_PROP =
"cdc.max.queue.size.in.bytes";
+
+ // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer.
+ // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D<MAX_QUEUE_BYTES_SYS_PROP>
overrides
+ // (<=0 disables); a malformed override is logged and ignored, falling
back to the cap.
+ private static long resolveMaxQueueSizeInBytes() {
+ String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP);
+ if (override != null) {
+ try {
+ long bytes = Long.parseLong(override.trim());
+ return bytes <= 0 ? 0 : bytes;
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Ignoring invalid -D{}={}, expected an integer byte
count; "
+ + "falling back to the adaptive cap",
+ MAX_QUEUE_BYTES_SYS_PROP,
+ override);
+ }
+ }
+ long target = Runtime.getRuntime().maxMemory() / 16;
+ return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 *
1024));
+ }
+
/** Optimized debezium parameters */
public static Properties getDefaultDebeziumProps() {
Properties properties = new Properties();
+ properties.put(
+ CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(),
+ String.valueOf(resolveMaxQueueSizeInBytes()));
return properties;
}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index 980f83bdc8d..cf70b6b075c 100644
---
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -19,11 +19,13 @@ package org.apache.doris.cdcclient.utils;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import io.debezium.config.CommonConnectorConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -128,6 +130,63 @@ class ConfigUtilTest {
assertEquals(0, result.length);
}
+ // ─── getDefaultDebeziumProps: queue byte cap
──────────────────────────────
+
+ private static long queueBytes(Properties props) {
+ return Long.parseLong(
+
props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name()));
+ }
+
+ @Test
+ void defaultQueueBytesWithinClamp() {
+ long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+ assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
+ "expected clamp to [64MB, 256MB] but got " + bytes);
+ }
+
+ @Test
+ void sysPropOverridesAdaptiveValue() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576");
+ assertEquals(1048576L,
queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+ } finally {
+ restore(prev);
+ }
+ }
+
+ @Test
+ void negativeSysPropDisablesByteBound() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1");
+ assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+ } finally {
+ restore(prev);
+ }
+ }
+
+ @Test
+ void malformedSysPropFallsBackToClamp() {
+ String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ try {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB");
+ long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+ assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 *
1024,
+ "malformed override should fall back to [64MB, 256MB] but
got " + bytes);
+ } finally {
+ restore(prev);
+ }
+ }
+
+ private static void restore(String prev) {
+ if (prev == null) {
+ System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+ } else {
+ System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev);
+ }
+ }
+
// ─── server timezone parsing
──────────────────────────────────────────────
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]