This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a159972fb12 [fix](streaming-job) cdc client PostgreSQL snapshot honors
scan.snapshot.fetch.size to avoid wide-table OOM (#64938)
a159972fb12 is described below
commit a159972fb12923fcb42fc41ffe66cf7255d7fa0e
Author: wudi <[email protected]>
AuthorDate: Thu Jul 2 10:40:35 2026 +0800
[fix](streaming-job) cdc client PostgreSQL snapshot honors
scan.snapshot.fetch.size to avoid wide-table OOM (#64938)
## Proposed changes
This PR fixes three PostgreSQL streaming job issues:
- Honor `scan.snapshot.fetch.size` during snapshot reads to avoid
loading a
large snapshot chunk into memory at once.
- Drain the current CDC batch before stopping at a heartbeat. Records
after the
heartbeat have already been dequeued and could otherwise be lost when
the
reader is reused.
- Use a dedicated non-pooled connection for publication and
replication-slot
cleanup, so background cleanup still works when the reader pool is
exhausted.
---
.../cdcclient/service/PipelineCoordinator.java | 8 +--
.../reader/postgres/PostgresSourceReader.java | 26 ++++++----
.../source/fetch/PostgresScanFetchTask.java | 10 ++--
.../reader/postgres/PostgresSourceReaderTest.java | 57 ++++++++++++++++++++++
4 files changed, 84 insertions(+), 17 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 56d64546c69..4a88dfcc02b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -589,13 +589,13 @@ public class PipelineCoordinator {
&& maxIntervalMillis > 0
&& elapsedTime >= maxIntervalMillis;
- if (!isSnapshotSplit && timeoutReached) {
+ if (!isSnapshotSplit && timeoutReached && !shouldStop)
{
LOG.info(
- "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ "Binlog split max interval reached;
draining current batch before stopping");
shouldStop = true;
- break;
}
- // Skip heartbeat messages during normal processing
+ // Drain the rest of this batch instead of breaking:
records after the
+ // heartbeat are already dequeued and the reused
reader won't re-read them.
continue;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 20b5694fd76..2574b53a196 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -78,6 +78,7 @@ import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
@@ -690,11 +691,12 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
jobId);
return true;
}
- PostgresDialect dialect = new
PostgresDialect(getSourceConfig(jobConfig));
+ JdbcConfiguration jdbcConfig =
+
getSourceConfig(jobConfig).getDbzConnectorConfig().getJdbcConfig();
boolean cleaned = true;
if (dropPub) {
LOG.info("Dropping auto-created publication {} for job {}",
pubName, jobId);
- try (PostgresConnection connection = dialect.openJdbcConnection())
{
+ try (PostgresConnection connection =
createCleanupConnection(jdbcConfig)) {
connection.execute("DROP PUBLICATION IF EXISTS " + pubName);
} catch (Exception ex) {
LOG.warn(
@@ -703,7 +705,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
jobId,
ex.getMessage());
}
- if (publicationExists(dialect, pubName)) {
+ if (publicationExists(jdbcConfig, pubName)) {
LOG.warn(
"Publication {} for job {} still present after drop,
will retry",
pubName,
@@ -713,8 +715,8 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
if (dropSlot) {
LOG.info("Dropping auto-created replication slot {} for job {}",
slotName, jobId);
- try {
- dialect.removeSlot(slotName);
+ try (PostgresConnection connection =
createCleanupConnection(jdbcConfig)) {
+ connection.dropReplicationSlot(slotName);
} catch (Exception ex) {
LOG.warn(
"Drop of replication slot {} for job {} failed: {}",
@@ -722,7 +724,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
jobId,
ex.getMessage());
}
- if (slotExists(dialect, slotName)) {
+ if (slotExists(jdbcConfig, slotName)) {
LOG.warn(
"Replication slot {} for job {} still present after
drop, will retry",
slotName,
@@ -733,8 +735,12 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
return cleaned;
}
- private boolean slotExists(PostgresDialect dialect, String slotName) {
- try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ static PostgresConnection createCleanupConnection(JdbcConfiguration
jdbcConfig) {
+ return new PostgresConnection(jdbcConfig,
PostgresConnection.CONNECTION_GENERAL);
+ }
+
+ private boolean slotExists(JdbcConfiguration jdbcConfig, String slotName) {
+ try (PostgresConnection connection =
createCleanupConnection(jdbcConfig)) {
return connection.queryAndMap(
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '" +
slotName + "'",
rs -> rs.next());
@@ -747,8 +753,8 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
- private boolean publicationExists(PostgresDialect dialect, String pubName)
{
- try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ private boolean publicationExists(JdbcConfiguration jdbcConfig, String
pubName) {
+ try (PostgresConnection connection =
createCleanupConnection(jdbcConfig)) {
return connection.queryAndMap(
"SELECT 1 FROM pg_publication WHERE pubname = '" + pubName
+ "'",
rs -> rs.next());
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index 6c26cf4e74a..0462cb9d1ed 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -65,6 +65,8 @@ import static
io.debezium.connector.postgresql.Utils.refreshSchema;
* Copied from Flink Cdc 3.6.0
*
* <p>Line 333~336: modified createDataEventsForTable to fix FLINK-39748.
+ *
+ * <p>Line 326: use sourceConfig.getFetchSize() for the snapshot fetch size to
fix FLINK-40007.
*/
public class PostgresScanFetchTask extends AbstractScanFetchTask {
@@ -103,6 +105,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
PostgresSnapshotSplitReadTask snapshotSplitReadTask =
new PostgresSnapshotSplitReadTask(
+ (PostgresSourceConfig) ctx.getSourceConfig(),
ctx.getConnection(),
ctx.getDbzConnectorConfig(),
ctx.getDatabaseSchema(),
@@ -219,7 +222,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
private final PostgresConnection jdbcConnection;
- private final PostgresConnectorConfig connectorConfig;
+ private final PostgresSourceConfig sourceConfig;
private final PostgresEventDispatcher<TableId> eventDispatcher;
private final SnapshotSplit snapshotSplit;
private final PostgresOffsetContext offsetContext;
@@ -228,6 +231,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
private final Clock clock;
public PostgresSnapshotSplitReadTask(
+ PostgresSourceConfig sourceConfig,
PostgresConnection jdbcConnection,
PostgresConnectorConfig connectorConfig,
PostgresSchema databaseSchema,
@@ -237,7 +241,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
this.jdbcConnection = jdbcConnection;
- this.connectorConfig = connectorConfig;
+ this.sourceConfig = sourceConfig;
this.snapshotProgressListener = snapshotProgressListener;
this.databaseSchema = databaseSchema;
this.eventDispatcher = eventDispatcher;
@@ -319,7 +323,7 @@ public class PostgresScanFetchTask extends
AbstractScanFetchTask {
snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
- connectorConfig.getSnapshotFetchSize());
+ sourceConfig.getFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs,
table);
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
new file mode 100644
index 00000000000..617013ce4d7
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+
+class PostgresSourceReaderTest {
+
+ @Test
+ void cleanupConnectionUsesDebeziumDirectConnectionFactory() throws
Exception {
+ JdbcConfiguration config =
+ JdbcConfiguration.adapt(
+ Configuration.create()
+ .with(JdbcConfiguration.HOSTNAME, "localhost")
+ .with(JdbcConfiguration.PORT, 5432)
+ .with(JdbcConfiguration.DATABASE, "postgres")
+ .with(JdbcConfiguration.USER, "user")
+ .with(JdbcConfiguration.PASSWORD, "password")
+ .build());
+
+ try (PostgresConnection expected =
+ new PostgresConnection(config,
PostgresConnection.CONNECTION_GENERAL);
+ PostgresConnection actual =
+ PostgresSourceReader.createCleanupConnection(config)) {
+ assertSame(connectionFactory(expected), connectionFactory(actual));
+ }
+ }
+
+ private static Object connectionFactory(JdbcConnection connection) throws
Exception {
+ Field field = JdbcConnection.class.getDeclaredField("factory");
+ field.setAccessible(true);
+ return field.get(connection);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]