This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e025050cdb0 [fix](streaming-job) streaming CDC reliability fixes
(scheduling latch, fetch-meta reason, PG db name, flaky case) (#64310)
e025050cdb0 is described below
commit e025050cdb01305c6080fb48c1bd1562565de996
Author: wudi <[email protected]>
AuthorDate: Wed Jun 17 10:46:57 2026 +0800
[fix](streaming-job) streaming CDC reliability fixes (scheduling latch,
fetch-meta reason, PG db name, flaky case) (#64310)
## Proposed changes
Several independent reliability fixes for streaming CDC jobs.
1. **Drop the 0-row latch in
`JdbcSourceOffsetProvider.onTaskCommitted`.** Setting `hasMoreData =
false` on a 0-row/0-byte commit was wrong: a 0-row task does not mean
the source is caught up (e.g. a large upstream transaction is still
buffered on the cdc_client side). The latch made the scheduler stop
dispatching tasks and could freeze streaming. The interface default is a
no-op, so the override is simply removed; `hasMoreData` is still
advanced by `fetchRemoteMeta()` and `hasMoreDataToConsume()`.
2. **Set the fetch-meta failure reason after pausing.** `fetchMeta()`
set the failure reason and then paused. A concurrent task-success
callback (which clears the reason under the job write lock) could wipe
the freshly-set reason, leaving the job `PAUSED` with an empty reason
and auto-resume disabled. Pausing first orders the reason update after
any in-flight success callback, so the reason survives.
3. **Reject an over-long PostgreSQL database name at CREATE JOB.**
PostgreSQL truncates a database name longer than its identifier limit,
but the replication-slot lookup compares the full configured name, so an
existing slot looks missing and the job then fails creating a duplicate
slot. The connection itself still succeeds (PG truncates at connect
time), so no connectivity check catches it. A new config
`streaming_pg_max_identifier_length` (default 63, adjustable for a
larger NAMEDATALEN build) gates a fail-fast check.
4. **Stabilize `test_streaming_postgres_job_special_offset`.** PAUSE
cancels the FE task but the in-flight cdc_client reader keeps polling
(up to `max_interval`) and could stream-load rows inserted right after
the pause, defeating the ALTER-offset reposition. The test now
deterministically drains the lingering reader before inserting the
before-mark rows.
---
.../main/java/org/apache/doris/common/Config.java | 4 ++
.../streaming/PostgresResourceValidator.java | 17 ++++++++
.../insert/streaming/StreamingInsertJob.java | 6 +--
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 7 ----
.../streaming/PostgresResourceValidatorTest.java | 45 ++++++++++++++++++++++
...st_streaming_postgres_job_special_offset.groovy | 31 +++++++++++++++
6 files changed, 100 insertions(+), 10 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 92ba0b7d5ca..33432425a53 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1199,6 +1199,10 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_heavy_rpc_timeout_sec = 600;
+ // Max byte length of a PG database name for a CDC job; raise only for a
larger NAMEDATALEN build.
+ @ConfField(mutable = true, masterOnly = true)
+ public static int streaming_pg_max_identifier_length = 63;
+
@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_fetch_splits_batch_size = 100;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
index d35cda9fe41..ae6277b3028 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
@@ -17,6 +17,7 @@
package org.apache.doris.job.extensions.insert.streaming;
+import org.apache.doris.common.Config;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
@@ -25,6 +26,7 @@ import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.commons.lang3.StringUtils;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -44,6 +46,8 @@ public class PostgresResourceValidator {
public static void validate(Map<String, String> sourceProperties, String
jobId, List<String> tableNames)
throws JobException {
+ // PG truncates an over-long db name, so the slot lookup never matches
it; reject up front.
+
checkDatabaseNameLength(sourceProperties.get(DataSourceConfigKeys.DATABASE));
String slotName = resolveSlotName(sourceProperties, jobId);
String publicationName = resolvePublicationName(sourceProperties,
jobId);
// Pattern-match ownership: name equals the default = Doris-owned
(auto); otherwise user.
@@ -123,6 +127,19 @@ public class PostgresResourceValidator {
return StringUtils.isNotBlank(name) ? name :
DataSourceConfigKeys.defaultPublicationName(jobId);
}
+ private static void checkDatabaseNameLength(String database) throws
JobException {
+ if (StringUtils.isBlank(database)) {
+ return;
+ }
+ // PG measures the identifier limit in bytes (NAMEDATALEN-1), so
compare encoded bytes.
+ int bytes = database.getBytes(StandardCharsets.UTF_8).length;
+ if (bytes > Config.streaming_pg_max_identifier_length) {
+ throw new JobException("database name '" + database + "' is " +
bytes + " bytes, exceeding "
+ + Config.streaming_pg_max_identifier_length + ";
PostgreSQL truncates it and the"
+ + " replication-slot lookup would fail.");
+ }
+ }
+
private static boolean publicationExists(Connection conn, String
publicationName) throws Exception {
try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM
pg_publication WHERE pubname = ?")) {
ps.setString(1, publicationName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 09090ba3ad1..be14f79fa34 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -709,12 +709,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
||
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
// When a job is manually paused, it does not need to be set
again,
// otherwise, it may be woken up by auto resume.
+ // Pause before setting the reason: updateJobStatus's
writeLock orders this after any
+ // task-success callback that clears failureReason, so a
success can't wipe the reason.
+ this.updateJobStatus(JobStatus.PAUSED);
this.setFailureReason(
new
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to fetch meta, " + ex.getMessage()));
- // If fetching meta fails, the job is paused
- // and auto resume will automatically wake it up.
- this.updateJobStatus(JobStatus.PAUSED);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 9b7d364895b..3cfe6734564 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -924,13 +924,6 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
return -1;
}
- @Override
- public void onTaskCommitted(long scannedRows, long loadBytes) {
- if (scannedRows == 0 && loadBytes == 0) {
- hasMoreData = false;
- }
- }
-
@Override
public boolean hasReachedEnd() {
if (!isSnapshotOnlyMode()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
new file mode 100644
index 00000000000..27e3b04b2cd
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.exception.JobException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PostgresResourceValidatorTest {
+
+ // A 22-char CJK database name is length()==22 but 66 bytes in UTF-8; PG
truncates it to 63 bytes.
+ // The byte-based check must reject it before connecting (validate fails
on the very first line).
+ @Test
+ public void testRejectMultibyteOverLongDatabaseName() {
+ String dbName = StringUtils.repeat("εΊ“", 22);
+ Assert.assertEquals(22, dbName.length());
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.DATABASE, dbName);
+ JobException e = Assert.assertThrows(JobException.class,
+ () -> PostgresResourceValidator.validate(props, "1",
Collections.emptyList()));
+ Assert.assertTrue(e.getMessage(), e.getMessage().contains("bytes"));
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
index c4214145151..d9cf5886bcc 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
@@ -143,6 +143,37 @@ suite("test_streaming_postgres_job_special_offset",
"p0,external,pg,external_doc
def jobStatus = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
return jobStatus[0][0] == "PAUSED"
})
+
+ // PAUSE cancels the FE task but does NOT stop the in-flight
cdc_client reader started before
+ // the pause: it keeps polling up to max_interval (default 10s) and
would stream-load whatever
+ // rows we insert next, defeating the ALTER-offset reposition (this is
the historical source of
+ // flakiness). No new reader is dispatched while PAUSED, so drain the
lingering one
+ // deterministically: insert a disposable probe row, then wait until
the row count stays stable
+ // for a window longer than max_interval. An active reader would have
loaded the probe within
+ // max_interval, so a stable window proves no reader is consuming
anymore. Finally delete the
+ // probe from both sides so it never pollutes the before/after-mark
assertions or the .out.
+ def probeId = 10
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
(${probeId}, 'drain_probe')"""
+ }
+ int stablePolls = 0
+ long lastCnt = -1L
+ Awaitility.await().atMost(120, SECONDS).pollInterval(3,
SECONDS).until({
+ long c = sql("""SELECT count(*) FROM
${currentDb}.${table1}""")[0][0] as long
+ if (c == lastCnt) {
+ stablePolls++
+ } else {
+ stablePolls = 0
+ lastCnt = c
+ }
+ // 5 * 3s = 15s stable > max_interval (10s) => the lingering
reader has fully drained.
+ return stablePolls >= 5
+ })
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id =
${probeId}"""
+ }
+ sql """DELETE FROM ${currentDb}.${table1} WHERE id = ${probeId}"""
+
def alterLsn = ""
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
// insert data BEFORE the LSN mark
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]