This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d55a551627 [improve](streaming-job) Bind the incremental phase to a
fixed BE and reuse the cdc reader (#64423)
9d55a551627 is described below
commit 9d55a551627b859ba602cb0287ef170cb5c2e39c
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 19:57:17 2026 +0800
[improve](streaming-job) Bind the incremental phase to a fixed BE and reuse
the cdc reader (#64423)
### What problem does this PR solve?
Problem Summary:
For from-to (MySQL/PG CDC) streaming jobs, once a job enters the
incremental (binlog) phase, two issues hurt throughput:
- On the **FE** side, every polling round (default `max_interval` = 10s)
re-selects a BE via global round-robin, so the task drifts across BEs
with no job→BE affinity.
- On the **cdc_client** side, although per-job reader ownership and a
per-job fixed replication slot already exist, the live reader is not
actually reused: the stream reader is closed and rebuilt on every round.
As a result every round rebuilds the reader. For PG this means
reconnecting the replication slot and re-locating the WAL position (~15s
each round), which together with large-transaction buffering is a major
cause of idle / low-throughput stalls in the incremental phase.
---
.../doris/job/cdc/request/WriteRecordRequest.java | 4 +
.../org/apache/doris/job/common/FailureReason.java | 11 +-
.../insert/streaming/AbstractStreamingTask.java | 15 +-
.../insert/streaming/StreamingInsertJob.java | 73 +++++++-
.../insert/streaming/StreamingMultiTblTask.java | 49 ++++-
.../org/apache/doris/job/manager/JobManager.java | 2 +-
.../doris/job/offset/SourceOffsetProvider.java | 3 +
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 34 +++-
.../apache/doris/job/util/StreamingJobUtils.java | 18 ++
.../apache/doris/cdcclient/common/Constants.java | 12 ++
.../org/apache/doris/cdcclient/common/Env.java | 204 ++++++++++++++++++++-
.../cdcclient/controller/ClientController.java | 28 ++-
.../cdcclient/service/PipelineCoordinator.java | 73 ++++++--
.../source/reader/AbstractCdcSourceReader.java | 9 +
.../source/reader/JdbcIncrementalSourceReader.java | 61 +++++-
.../source/reader/mysql/MySqlSourceReader.java | 48 ++++-
.../reader/postgres/PostgresSourceReader.java | 144 +++++++++++++--
...gres_job_slot_dropped_during_incremental.groovy | 174 ++++++++++++++++++
18 files changed, 891 insertions(+), 71 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index 511e4fcea74..037ae137763 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -31,4 +31,8 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
private String token;
private String taskId;
private Map<String, String> streamLoadProps;
+ // previous task ended abnormally, rebuild reader instead of reusing
+ private boolean rebuildReader;
+ // off by default: an old FE omits it, so a new cdc_client falls back to
per-round reader close
+ private boolean reuseReader;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 4280d43bb66..a7c409ae74d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -41,13 +41,22 @@ public class FailureReason implements Writable {
public FailureReason(String msg) {
this.msg = msg;
- if (StringUtils.isNotEmpty(msg) && isTooManyFailureRowsErr(msg)) {
+ if (StringUtils.isEmpty(msg)) {
+ this.code = InternalErrorCode.INTERNAL_ERR;
+ } else if (isReplicationSlotInvalidatedErr(msg)) {
+ // A lost/recreated replication slot cannot be resumed without
data loss; stop auto-resume.
+ this.code = InternalErrorCode.CANNOT_RESUME_ERR;
+ } else if (isTooManyFailureRowsErr(msg)) {
this.code = InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR;
} else {
this.code = InternalErrorCode.INTERNAL_ERR;
}
}
+ private static boolean isReplicationSlotInvalidatedErr(String msg) {
+ return msg.contains("Replication slot invalidated");
+ }
+
private static boolean isTooManyFailureRowsErr(String msg) {
return msg.contains("Insert has filtered data in strict mode")
|| msg.contains("too many filtered")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 62adf21daf6..224fe7c5dbb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -43,6 +43,8 @@ public abstract class AbstractStreamingTask {
private static final int MAX_RETRY = 3;
private static final String LABEL_SPLITTER = "_";
private int retryCount = 0;
+ // in-place retry would reuse this taskId, breaking ownership-based zombie
isolation
+ protected volatile boolean noRetry;
protected String labelName;
protected Offset runningOffset;
protected UserIdentity userIdentity;
@@ -83,6 +85,14 @@ public abstract class AbstractStreamingTask {
public abstract void closeOrReleaseResources();
+ // Release the remote cdc reader (keep slot). No-op for tasks without a
cdc reader (e.g. TVF).
+ public void releaseRemoteReader() {
+ }
+
+ public long getRunningBackendId() {
+ return -1;
+ }
+
public void execute() throws JobException {
while (retryCount <= MAX_RETRY) {
try {
@@ -96,8 +106,9 @@ public abstract class AbstractStreamingTask {
}
this.errMsg = e.getMessage();
retryCount++;
- if (retryCount > MAX_RETRY) {
- log.error("Task execution failed after {} retries.",
MAX_RETRY, e);
+ if (noRetry || retryCount > MAX_RETRY) {
+ log.error("Task execution failed, job id {}, task id {},
noRetry {}, retry {}.",
+ jobId, taskId, noRetry, retryCount, e);
onFail(e.getMessage());
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 855ee75183a..b22ccea0b19 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -77,6 +77,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
@@ -185,6 +186,15 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("st")
private List<String> syncTables;
+ // Incremental(binlog) phase: bound BE for reader reuse; <=0 = unbound
(replay-safe default).
+ @SerializedName("bbe")
+ private volatile long boundBackendId;
+
+ // previous task ended abnormally (or FE restarted), next dispatch must
rebuild the cdc reader
+ @Getter
+ @Setter
+ private transient volatile boolean needRebuildReader = true;
+
// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the
sampling fails.
@Setter
@@ -398,6 +408,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
provider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType, jdbcSourceProps);
}
provider.setCloudCluster(this.cloudCluster);
+ provider.setBoundBackendId(boundBackendId);
return provider;
}
@@ -633,8 +644,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
} else {
this.runningStreamTask = createStreamingMultiTblTask();
}
-
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
+ // Set PENDING before registering, else the scheduler thread may set
RUNNING first and we clobber it.
this.runningStreamTask.setStatus(TaskStatus.PENDING);
+
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
log.info("create new streaming insert task for job {}, task {} ",
getJobId(), runningStreamTask.getTaskId());
recordTasks(runningStreamTask);
@@ -651,6 +663,18 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
getCreateUser(), cloudCluster);
}
+ // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on
change.
+ public Backend resolveBoundBackend() throws JobException {
+ Backend selected = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
+ if (selected.getId() != boundBackendId) {
+ log.info("bind job {} to backend {} (was {})", getJobId(),
selected.getId(), boundBackendId);
+ boundBackendId = selected.getId();
+ logUpdateOperation();
+ }
+ offsetProvider.setBoundBackendId(boundBackendId);
+ return selected;
+ }
+
private Map<String, String> getConvertedSourceProperties() throws
JobException {
if (convertedSourceProperties == null) {
this.convertedSourceProperties =
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
@@ -787,6 +811,28 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ // Command entry for a manual status change: reset the failure/retry
budget, and on manual pause
+ // release the reader (keep slot). "Manual" is decided by the caller,
never by reading failureReason.
+ public void onManualStatusAltered(JobStatus newStatus, FailureReason
reason) {
+ AbstractStreamingTask taskToRelease = null;
+ lock.writeLock().lock();
+ try {
+ resetFailureInfo(reason);
+ if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask !=
null) {
+ // Force resume to swap in a fresh reader, in case the release
RPC races or fails.
+ this.needRebuildReader = true;
+ taskToRelease = runningStreamTask;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ // Release outside the write lock: the RPC may block on first brpc
connect and this is
+ // best-effort (needRebuildReader already forces a fresh reader; a
stale release is a no-op).
+ if (taskToRelease != null) {
+ taskToRelease.releaseRemoteReader();
+ }
+ }
+
public boolean hasMoreDataToConsume() {
return offsetProvider.hasMoreDataToConsume();
}
@@ -808,9 +854,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
public void onStreamTaskFail(AbstractStreamingTask task) throws
JobException {
try {
+ this.needRebuildReader = true;
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
- this.failureReason = new FailureReason(task.getErrMsg());
+ // Don't overwrite a manual pause: a late failure callback would
otherwise let auto resume wake it.
+ if (this.getFailureReason() == null
+ ||
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
+ this.failureReason = new FailureReason(task.getErrMsg());
+ }
if (MetricRepo.isInit) {
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
}
@@ -822,6 +873,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
public void onStreamTaskSuccess(AbstractStreamingTask task) throws
JobException {
try {
+ this.needRebuildReader = false;
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
lastTaskSuccessTime = System.currentTimeMillis();
@@ -997,6 +1049,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
setFailedTaskCount(replayJob.getFailedTaskCount());
setCanceledTaskCount(replayJob.getCanceledTaskCount());
setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime());
+ this.boundBackendId = replayJob.boundBackendId;
}
/**
@@ -1419,6 +1472,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (null == lock) {
this.lock = new ReentrantReadWriteLock(true);
}
+
+ // a stale reader may survive on the bound BE across FE
restart/failover
+ this.needRebuildReader = true;
}
/**
@@ -1493,6 +1549,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
getJobId(), this.runningStreamTask.getTaskId(),
offsetRequest.getTaskId());
return;
}
+ // Reject a late commit from an already-failed task (e.g. brpc
timeout) reviving a paused job.
+ if
(!TaskStatus.RUNNING.equals(this.runningStreamTask.getStatus())) {
+ log.info("Streaming multi table job {} skip late commit
offset on non-running task {} "
+ + "(status: {}, actual: {})",
+ getJobId(), this.runningStreamTask.getTaskId(),
+ this.runningStreamTask.getStatus(),
offsetRequest.getTaskId());
+ return;
+ }
if (this.runningStreamTask.getTaskId() !=
offsetRequest.getTaskId()) {
throw new JobException("Task id mismatch when commit
offset. expected: "
+ this.runningStreamTask.getTaskId() + ", actual:
" + offsetRequest.getTaskId());
@@ -1576,6 +1640,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (offsetProvider != null) {
// when fe restart, offsetProvider.jobId/sourceProperties may be
null
offsetProvider.ensureInitialized(getJobId(), getProviderProps());
+ // replayOnUpdated skips the transient provider; resync routing BE.
+ offsetProvider.setBoundBackendId(boundBackendId);
offsetProvider.replayIfNeed(this);
}
}
@@ -1618,7 +1684,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
} catch (JobException ex) {
log.warn("refresh provider props before cleanMeta failed, job
id: {}", getJobId(), ex);
}
- ((JdbcSourceOffsetProvider)
this.offsetProvider).cleanMeta(getJobId());
+ long runtimeBackendId = runningStreamTask != null ?
runningStreamTask.getRunningBackendId() : -1;
+ ((JdbcSourceOffsetProvider)
this.offsetProvider).cleanMeta(getJobId(), runtimeBackendId);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 85525beb1f8..bf41d674afc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -131,7 +131,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
}
private void sendWriteRequest() throws JobException {
- Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ Backend backend = resolveBackend();
log.info("start to run streaming multi task {} in backend {}/{},
offset is {}",
taskId, backend.getId(), backend.getHost(),
runningOffset.toString());
this.runningBackendId = backend.getId();
@@ -172,13 +172,36 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={}
jobId={} backend={}:{} timeout_sec={}",
taskId, getJobId(), backend.getHost(),
backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
+ // the request may have been dispatched and still running remotely
+ noRetry = true;
throw new JobException("cdc_client RPC timeout: /api/writeRecords
taskId=" + taskId);
} catch (ExecutionException | InterruptedException ex) {
+ if (ex instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
log.error("Send write request failed: ", ex);
+ noRetry = true;
throw new JobException(ex);
}
}
+ private Backend resolveBackend() throws JobException {
+ // Snapshot phase keeps per-round selection; binlog phase binds to a
fixed BE for reuse.
+ if (((JdbcOffset) runningOffset).snapshotSplit()) {
+ return StreamingJobUtils.selectBackend(cloudCluster);
+ }
+ return getStreamingJob().resolveBoundBackend();
+ }
+
+ // Fail loud on a dropped/wrong-type job rather than return null and risk
a downstream NPE.
+ private StreamingInsertJob getStreamingJob() throws JobException {
+ Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+ if (job == null) {
+ throw new JobException("Streaming job " + getJobId() + " not
found");
+ }
+ return (StreamingInsertJob) job;
+ }
+
private String getToken() throws JobException {
String token = "";
try {
@@ -214,6 +237,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
request.setTaskTimeoutMs(getTaskTimeoutMs());
+ request.setRebuildReader(getStreamingJob().isNeedRebuildReader());
+ // Reader reuse applies only to the binlog phase (snapshot
rebinds/closes per split).
+ request.setReuseReader(!offset.snapshotSplit());
if (offsetProvider instanceof JdbcSourceOffsetProvider) {
String schemas = ((JdbcSourceOffsetProvider)
offsetProvider).getTableSchemas();
if (schemas != null) {
@@ -312,28 +338,31 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
@Override
protected void onFail(String errMsg) throws JobException {
- // Release this task's reader before reschedule so it stops competing
for the shared slot.
+ // Stop a possibly still-running reader now, so the PG slot frees
before auto-resume re-acquires it.
releaseRemoteReader();
super.onFail(errMsg);
}
@Override
public void cancel(boolean needWaitCancelComplete) {
- // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing
would orphan the engine.
+ // No release here: drop/stop free via /api/close and manual pause via
/api/releaseReader;
+ // releasing in cancel would orphan the reused engine.
super.cancel(needWaitCancelComplete);
}
@Override
public void closeOrReleaseResources() {
- // No-op: reader is shared across tasks; release on reschedule is done
in onFail().
+ // No-op: the reader is async and reused; releasing here
(per-iteration finally) would kill it.
}
- /**
- * Best-effort, onFail reschedule only: stop this job's reader on {@link
#runningBackendId} so a
- * reschedule to another backend never leaves two readers competing for
the same source (e.g. one
- * PG replication slot, which is kept, not dropped). Failures are
swallowed and must not block
- * rescheduling.
- */
+ @Override
+ public long getRunningBackendId() {
+ return runningBackendId;
+ }
+
+ // Best-effort release on runningBackendId (keep slot): on task failure to
stop a stuck/zombie
+ // reader early, and on manual pause so resume can rebind. Failures
swallowed; idle reaper backs up.
+ @Override
public void releaseRemoteReader() {
if (runningBackendId <= 0) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 23f51890da5..8c55856da57 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -282,7 +282,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
checkSameStatus(a, jobStatus);
alterJobStatus(a.getJobId(), jobStatus);
if (a instanceof StreamingInsertJob) {
- ((StreamingInsertJob) a).resetFailureInfo(reason);
+ ((StreamingInsertJob)
a).onManualStatusAltered(jobStatus, reason);
}
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is
%s, errorMsg is %s",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index fe03208bd18..58c83eac632 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -94,6 +94,9 @@ public interface SourceOffsetProvider {
*/
default void setCloudCluster(String cloudCluster) {}
+ /** Bind the BE this job is pinned to in the binlog phase, for
reader-reuse heartbeat routing. */
+ default void setBoundBackendId(long boundBackendId) {}
+
/**
* Fetch remote meta information, such as listing files in S3 or getting
latest offsets in Kafka.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index f80147e970f..bb8e59326ef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -107,6 +107,9 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
transient volatile String cloudCluster;
+ // Route fetchEndOffset/compareOffset to the bound BE (synced from job,
not persisted).
+ transient volatile long boundBackendId;
+
/** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on
restart. */
transient SplitProgress cdcSplitProgress = new SplitProgress();
@@ -254,9 +257,14 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
this.currentOffset = newOffset;
}
+ @Override
+ public void setBoundBackendId(long boundBackendId) {
+ this.boundBackendId = boundBackendId;
+ }
+
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
- Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -354,7 +362,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
private boolean compareOffset(Map<String, String> offsetFirst, Map<String,
String> offsetSecond)
throws JobException {
- Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(),
sourceProperties,
getFrontendAddress(), offsetFirst, offsetSecond);
@@ -999,10 +1007,20 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
}
- public void cleanMeta(Long jobId) throws JobException {
+ public void cleanMeta(Long jobId, long runtimeBackendId) throws
JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
- Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ // Dropping the slot only succeeds on the BE owning the live reader
(it stops its own engine
+ // first, freeing the slot). Prefer the runtime BE (covers the unbound
snapshot phase), then
+ // the bound BE; both may be alive but not load-available, so route by
isAlive. Only when
+ // neither is alive fall back to a random BE to drop the now-inactive
slot.
+ Backend backend = aliveBackend(runtimeBackendId);
+ if (backend == null) {
+ backend = aliveBackend(boundBackendId);
+ }
+ if (backend == null) {
+ backend = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
+ }
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
@@ -1027,6 +1045,14 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
}
+ private static Backend aliveBackend(long backendId) {
+ if (backendId <= 0) {
+ return null;
+ }
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ return be != null && be.isAlive() ? be : null;
+ }
+
private String getFrontendAddress() {
return Env.getCurrentEnv().getMasterHost() + ":" +
Env.getCurrentEnv().getMasterHttpPort();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index cff663e0142..ff67d2429e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -274,6 +274,11 @@ public class StreamingJobUtils {
}
public static Backend selectBackend(String cloudCluster) throws
JobException {
+ return selectBackend(cloudCluster, -1);
+ }
+
+ // Prefer preferredBackendId if it is in the cluster's available BEs (also
enforces cloud group).
+ public static Backend selectBackend(String cloudCluster, long
preferredBackendId) throws JobException {
if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
List<Backend> bes = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
.getBackendsByClusterName(cloudCluster)
@@ -284,10 +289,23 @@ public class StreamingJobUtils {
throw new
JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
+ ", compute_group: " + cloudCluster);
}
+ if (preferredBackendId > 0) {
+ for (Backend be : bes) {
+ if (be.getId() == preferredBackendId) {
+ return be;
+ }
+ }
+ }
int idx = getLastSelectedBackendIndexAndUpdate();
return bes.get(Math.floorMod(idx, bes.size()));
}
+ if (preferredBackendId > 0) {
+ Backend bound =
Env.getCurrentSystemInfo().getBackend(preferredBackendId);
+ if (bound != null && bound.isLoadAvailable()) {
+ return bound;
+ }
+ }
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true).needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
index 953903a8032..a9eea173d4d 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java
@@ -25,4 +25,16 @@ public class Constants {
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
public static final String DORIS_TARGET_DB = "doris_target_db";
+
+ // Background cleanup tick: idle-reader release + retrying deferred slot
drops.
+ public static final long BACKGROUND_CLEANUP_INTERVAL_MS = 15_000L;
+ // Idle from-to reader cleanup: release (keep slot) when idle past
MULTIPLIER * max_interval.
+ public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 10;
+ // Floor the idle timeout: PG reader rebuild is costly, absorb heartbeat
jitter at small
+ // interval.
+ public static final long IDLE_READER_MIN_TIMEOUT_MS = 90_000L;
+
+ // Retry dropping a slot still held by a dead BE until it frees
(wal_sender_timeout) or this
+ // elapses.
+ public static final long SLOT_DROP_RETRY_WINDOW_MS = 300_000L;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 28da598053b..b39efd01324 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -19,13 +19,18 @@ package org.apache.doris.cdcclient.common;
import org.apache.doris.cdcclient.source.factory.DataSource;
import org.apache.doris.cdcclient.source.factory.SourceReaderFactory;
+import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.job.cdc.request.JobBaseConfig;
+import org.apache.doris.job.cdc.request.WriteRecordRequest;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -40,6 +45,8 @@ public class Env {
private static volatile Env INSTANCE;
private final Map<String, JobContext> jobContexts;
private final Map<String, Lock> jobLocks;
+ private final Map<String, SlotDropTask> pendingSlotDrops;
+ private final ScheduledExecutorService backgroundCleaner;
@Setter private int backendHttpPort;
@Setter @Getter private String clusterToken;
@Setter @Getter private volatile String feMasterAddress;
@@ -47,6 +54,19 @@ public class Env {
private Env() {
this.jobContexts = new ConcurrentHashMap<>();
this.jobLocks = new ConcurrentHashMap<>();
+ this.pendingSlotDrops = new ConcurrentHashMap<>();
+ this.backgroundCleaner =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t = new Thread(r, "cdc-background-cleaner");
+ t.setDaemon(true);
+ return t;
+ });
+ this.backgroundCleaner.scheduleWithFixedDelay(
+ this::runBackgroundCleanup,
+ Constants.BACKGROUND_CLEANUP_INTERVAL_MS,
+ Constants.BACKGROUND_CLEANUP_INTERVAL_MS,
+ TimeUnit.MILLISECONDS);
}
public String getBackendHostPort() {
@@ -79,6 +99,22 @@ public class Env {
return context == null ? null : context.reader;
}
+ /**
+ * Reader for stateless metadata ops (end offset / compare): reuse the
live one if present, else
+ * a throwaway instance. Never create/cache/initialize a heavy reader, so
a metadata RPC for an
+ * idle/absent job can't trigger pub/slot/schema (re)initialization or
leak an unreaped context.
+ */
+ public SourceReader getMetaReader(JobBaseConfig jobConfig) {
+ if (jobConfig.getFrontendAddress() != null &&
!jobConfig.getFrontendAddress().isEmpty()) {
+ this.feMasterAddress = jobConfig.getFrontendAddress();
+ }
+ SourceReader existing = getReaderIfPresent(jobConfig.getJobId());
+ if (existing != null) {
+ return existing;
+ }
+ return
SourceReaderFactory.createSourceReader(resolveDataSource(jobConfig.getDataSource()));
+ }
+
/**
* Get-or-create this job's reader and claim ownership for {@code taskId}
atomically under the
* per-job lock, so a concurrent stale release cannot stop a reader this
task is about to use.
@@ -90,9 +126,24 @@ public class Env {
DataSource ds = resolveDataSource(jobConfig.getDataSource());
String jobId = jobConfig.getJobId();
Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+ SourceReader staleReader = null;
+ JobBaseConfig staleConfig = null;
+ SourceReader reader;
lock.lock();
try {
JobContext context = jobContexts.get(jobId);
+ if (context != null
+ && jobConfig instanceof WriteRecordRequest
+ && ((WriteRecordRequest) jobConfig).isRebuildReader()) {
+ // FE declared the previous task abnormal: swap in a fresh
reader instance so the
+ // old task's thread can never reach the new fetcher.
+ LOG.info(
+ "Rebuild reader for job {} on FE request, discard
current instance", jobId);
+ jobContexts.remove(jobId);
+ staleReader = context.reader;
+ staleConfig = context.jobConfig != null ? context.jobConfig :
jobConfig;
+ context = null;
+ }
if (context == null) {
LOG.info("Creating new reader for job {}, dataSource {}",
jobId, ds);
context = new JobContext(jobId, ds, jobConfig.getConfig());
@@ -100,10 +151,30 @@ public class Env {
jobContexts.put(jobId, context);
}
context.ownerTaskId = taskId;
- return context.getReader(ds);
+ context.jobConfig = jobConfig;
+ if (jobConfig instanceof WriteRecordRequest) {
+ context.maxIntervalMs = ((WriteRecordRequest)
jobConfig).getMaxInterval() * 1000;
+ }
+ context.lastAliveTime = System.currentTimeMillis();
+ reader = context.getReader(ds);
} finally {
lock.unlock();
}
+ if (staleReader != null) {
+ // free the engine/slot connection before the caller submits the
new fetcher
+ try {
+ staleReader.release(staleConfig);
+ } catch (Exception ex) {
+ LOG.warn("Failed to release stale reader for job {}", jobId,
ex);
+ }
+ }
+ return reader;
+ }
+
+ /** Whether {@code taskId} is still the current claimer of this job's
reader. */
+ public boolean isOwner(String jobId, String taskId) {
+ JobContext context = jobContexts.get(jobId);
+ return context != null && Objects.equals(context.ownerTaskId, taskId);
}
/**
@@ -192,12 +263,143 @@ public class Env {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
+
+ // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 =
untracked (e.g. TVF),
+ // skip.
+ private void releaseIdleReaders() {
+ long now = System.currentTimeMillis();
+ for (String jobId : jobContexts.keySet()) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null || !lock.tryLock()) {
+ continue;
+ }
+ SourceReader toRelease = null;
+ JobBaseConfig releaseConfig = null;
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || context.lastAliveTime <= 0 ||
context.maxIntervalMs <= 0) {
+ continue;
+ }
+ long timeout =
+ Math.max(
+ (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+ * context.maxIntervalMs,
+ Constants.IDLE_READER_MIN_TIMEOUT_MS);
+ if (now - context.lastAliveTime <= timeout) {
+ continue;
+ }
+ LOG.info(
+ "Releasing idle reader for job {}, idle {} ms, keep
slot",
+ jobId,
+ now - context.lastAliveTime);
+ jobContexts.remove(jobId);
+ toRelease = context.reader;
+ releaseConfig = context.jobConfig;
+ } finally {
+ lock.unlock();
+ }
+ // Release outside the lock so blocking IO never stalls
getReaderAndClaim/detach.
+ if (toRelease != null && releaseConfig != null) {
+ try {
+ toRelease.release(releaseConfig);
+ } catch (Exception ex) {
+ LOG.warn("Failed to release idle reader for job {}",
jobId, ex);
+ }
+ }
+ }
+ }
+
+ // Each chore is guarded independently: one failing must not skip the
other, and an uncaught
+ // throwable here would silently cancel the whole periodic task.
+ private void runBackgroundCleanup() {
+ try {
+ releaseIdleReaders();
+ } catch (Exception e) {
+ LOG.warn("releaseIdleReaders failed", e);
+ }
+ try {
+ retryPendingSlotDrops();
+ } catch (Exception e) {
+ LOG.warn("retryPendingSlotDrops failed", e);
+ }
+ }
+
+ /**
+ * Run source-side cleanup; if incomplete (e.g. slot still held by a dead
BE), retry in
+ * background.
+ */
+ public void releaseSourceResourcesOrRetry(SourceReader reader,
JobBaseConfig jobConfig) {
+ if (!releaseSourceResources(reader, jobConfig)) {
+ scheduleSlotDrop(jobConfig);
+ }
+ }
+
+ public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+ long deadline = System.currentTimeMillis() +
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+ pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new
SlotDropTask(jobConfig, deadline));
+ LOG.info("Scheduled background slot drop for job {}",
jobConfig.getJobId());
+ }
+
+ private boolean releaseSourceResources(SourceReader reader, JobBaseConfig
jobConfig) {
+ return ((AbstractCdcSourceReader)
reader).releaseSourceResources(jobConfig);
+ }
+
+ private void retryPendingSlotDrops() {
+ long now = System.currentTimeMillis();
+ for (Map.Entry<String, SlotDropTask> entry :
pendingSlotDrops.entrySet()) {
+ String jobId = entry.getKey();
+ SlotDropTask task = entry.getValue();
+ boolean done = false;
+ try {
+ SourceReader reader =
+ SourceReaderFactory.createSourceReader(
+
resolveDataSource(task.jobConfig.getDataSource()));
+ done = releaseSourceResources(reader, task.jobConfig);
+ } catch (Exception ex) {
+ LOG.warn(
+ "Background slot drop attempt failed for job {}: {}",
+ jobId,
+ ex.getMessage());
+ }
+ if (done) {
+ pendingSlotDrops.remove(jobId);
+ LOG.info("Background slot drop succeeded for job {}", jobId);
+ } else if (now >= task.deadlineMs) {
+ pendingSlotDrops.remove(jobId);
+ LOG.warn(
+ "Gave up dropping replication slot for job {} after
retry window; "
+ + "manual cleanup may be needed",
+ jobId);
+ }
+ }
+ }
+
+ private static final class SlotDropTask {
+ private final JobBaseConfig jobConfig;
+ private final long deadlineMs;
+
+ private SlotDropTask(JobBaseConfig jobConfig, long deadlineMs) {
+ this.jobConfig = jobConfig;
+ this.deadlineMs = deadlineMs;
+ }
+ }
+
private static final class JobContext {
private final String jobId;
private volatile SourceReader reader;
private volatile String ownerTaskId;
private volatile Map<String, String> config;
private volatile DataSource dataSource;
+ private volatile JobBaseConfig jobConfig;
+ private volatile long maxIntervalMs;
+ private volatile long lastAliveTime;
private JobContext(String jobId, DataSource dataSource, Map<String,
String> config) {
this.jobId = jobId;
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 1ac874ceeaa..b1dbe773844 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -106,7 +106,8 @@ public class ClientController {
public Object fetchEndOffset(@RequestBody JobBaseConfig jobConfig) {
LOG.info("Fetching end offset for job {}", jobConfig.getJobId());
try {
- SourceReader reader = Env.getCurrentEnv().getReader(jobConfig);
+ SourceReader reader = Env.getCurrentEnv().getMetaReader(jobConfig);
+ Env.getCurrentEnv().keepAlive(jobConfig.getJobId());
return RestResponse.success(reader.getEndOffset(jobConfig));
} catch (Exception ex) {
LOG.error("Failed to fetch end offset, jobId={}",
jobConfig.getJobId(), ex);
@@ -118,7 +119,7 @@ public class ClientController {
@RequestMapping(path = "/api/compareOffset", method = RequestMethod.POST)
public Object compareOffset(@RequestBody CompareOffsetRequest
compareOffsetRequest) {
try {
- SourceReader reader =
Env.getCurrentEnv().getReader(compareOffsetRequest);
+ SourceReader reader =
Env.getCurrentEnv().getMetaReader(compareOffsetRequest);
return
RestResponse.success(reader.compareOffset(compareOffsetRequest));
} catch (Exception ex) {
LOG.error("Failed to compare offset, jobId={}",
compareOffsetRequest.getJobId(), ex);
@@ -129,12 +130,25 @@ public class ClientController {
/** Close job */
@RequestMapping(path = "/api/close", method = RequestMethod.POST)
public Object close(@RequestBody JobBaseConfig jobConfig) {
- LOG.info("Closing job {}", jobConfig.getJobId());
+ String jobId = jobConfig.getJobId();
+ LOG.info("Closing job {}", jobId);
Env env = Env.getCurrentEnv();
- SourceReader reader = env.getReader(jobConfig);
- reader.close(jobConfig);
- env.close(jobConfig.getJobId());
- pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
+ // Don't rebuild a reader to close it; an absent reader (owner BE
gone) just needs its slot
+ // dropped.
+ SourceReader reader = env.getReaderIfPresent(jobId);
+ try {
+ if (reader != null) {
+ reader.release(jobConfig);
+ }
+ SourceReader dropper = reader != null ? reader :
env.getMetaReader(jobConfig);
+ env.releaseSourceResourcesOrRetry(dropper, jobConfig);
+ } catch (Exception ex) {
+ LOG.warn("Close job {} teardown failed: {}", jobId,
ex.getMessage());
+ env.scheduleSlotDrop(jobConfig);
+ } finally {
+ env.close(jobId);
+ pipelineCoordinator.closeJobStreamLoad(jobId);
+ }
return RestResponse.success(true);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 51871db92b1..735b2b6b815 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -383,7 +383,8 @@ public class PipelineCoordinator {
System.currentTimeMillis() - startTime,
fetchRecord.getJobId());
} finally {
- cleanupReaderResources(sourceReader, fetchRecord.getJobId(),
readResult);
+ // Debug fetch path is out of reuse scope: finish the reader each
round.
+ cleanupReaderResources(sourceReader, fetchRecord.getJobId(),
readResult, false);
}
// Extract and set offset metadata
@@ -410,7 +411,13 @@ public class PipelineCoordinator {
writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId());
} catch (Exception ex) {
- closeJobStreamLoad(writeRecordRequest.getJobId());
+ // a displaced task must not close the streamload the
successor is using
+ if (Env.getCurrentEnv()
+ .isOwner(
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId())) {
+ closeJobStreamLoad(writeRecordRequest.getJobId());
+ }
String rootCauseMessage =
ExceptionUtils.getRootCauseMessage(ex);
taskErrorMaps.put(writeRecordRequest.getTaskId(),
rootCauseMessage);
taskProgressMap.remove(writeRecordRequest.getTaskId());
@@ -468,6 +475,7 @@ public class PipelineCoordinator {
SplitReadResult readResult = null;
boolean hasExecuteDDL = false;
boolean isSnapshotSplit = false;
+ boolean stillOwner = false;
try {
// 1. submit split async
readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
@@ -491,6 +499,8 @@ public class PipelineCoordinator {
// 2. poll record
while (!shouldStop) {
+ // Active poll keeps the reader alive so the reaper won't
reclaim it mid-task.
+ Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId());
Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
if (!recordIterator.hasNext()) {
@@ -544,6 +554,21 @@ public class PipelineCoordinator {
}
while (recordIterator.hasNext()) {
+ // streamload backpressure can stall this loop past the
reaper timeout
+
Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId());
+ // A successor task took over: stop draining into the
shared batchStreamLoad.
+ if (!Env.getCurrentEnv()
+ .isOwner(
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId())) {
+ LOG.info(
+ "Task {} displaced mid-write for job {} after
{} rows, stop writing",
+ writeRecordRequest.getTaskId(),
+ writeRecordRequest.getJobId(),
+ scannedRows);
+ shouldStop = true;
+ break;
+ }
SourceRecord element = recordIterator.next();
// Check if this is a heartbeat message
@@ -609,7 +634,25 @@ public class PipelineCoordinator {
writeRecordRequest.getTaskId());
} finally {
- cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ stillOwner =
+ Env.getCurrentEnv()
+ .isOwner(writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId());
+ // A displaced task must not touch the reader (finishSplitRecords
would kill the
+ // successor's fetcher) nor commit anything.
+ if (stillOwner) {
+ cleanupReaderResources(
+ sourceReader,
+ writeRecordRequest.getJobId(),
+ readResult,
+ writeRecordRequest.isReuseReader());
+ }
+ }
+ if (!stillOwner) {
+ LOG.info(
+ "Skip commit for job {} task {}: reader released or taken
over",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+ return;
}
// 3. Extract offset from split state
@@ -618,7 +661,6 @@ public class PipelineCoordinator {
batchStreamLoad.forceFlush();
// 5. request fe api update offset
- String currentTaskId = batchStreamLoad.getCurrentTaskId();
// The offset must be reset before commitOffset to prevent the next
taskId from being create
// by the fe.
batchStreamLoad.resetTaskId();
@@ -632,13 +674,14 @@ public class PipelineCoordinator {
if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) {
tableSchemas = sourceReader.serializeTableSchemas();
}
+ // own taskId, never the shared currentTaskId: FE rejects it if
another task took over
batchStreamLoad.commitOffset(
- currentTaskId,
+ writeRecordRequest.getTaskId(),
metaResponse,
scannedRows,
batchStreamLoad.getLoadStatistic(),
tableSchemas);
- taskProgressMap.remove(currentTaskId);
+ taskProgressMap.remove(writeRecordRequest.getTaskId());
}
public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -772,7 +815,14 @@ public class PipelineCoordinator {
* @param readResult the read result containing split information
*/
private void cleanupReaderResources(
- SourceReader sourceReader, String jobId, SplitReadResult
readResult) {
+ SourceReader sourceReader,
+ String jobId,
+ SplitReadResult readResult,
+ boolean reuseReader) {
+ boolean isSnapshotSplit =
+ readResult != null
+ && readResult.getSplit() != null
+ && sourceReader.isSnapshotSplit(readResult.getSplit());
try {
// The LSN in the commit is the current offset, which is the
offset from the last
// successful write.
@@ -781,11 +831,10 @@ public class PipelineCoordinator {
sourceReader.commitSourceOffset(jobId, readResult.getSplit());
}
} finally {
- // This must be called after `commitSourceOffset`; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
`batchStreamLoad.commitOffset`;
- // otherwise, fe might create the next task for this job.
- sourceReader.finishSplitRecords();
+ // Keep the binlog reader alive only when FE asked to reuse it;
else close each round.
+ if (isSnapshotSplit || !reuseReader) {
+ sourceReader.finishSplitRecords();
+ }
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
index 74eb534bc5f..bc941663050 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
@@ -74,6 +74,15 @@ public abstract class AbstractCdcSourceReader implements
SourceReader {
// backend to take over.
LOG.info("Release source reader for job {}", jobConfig.getJobId());
finishSplitRecords();
+ shutdownSnapshotPollExecutor();
+ }
+
+ /** Stop the snapshot-phase poll thread pool; called when this reader
instance is discarded. */
+ protected void shutdownSnapshotPollExecutor() {}
+
+ /** Drop source-side owned resources. Returns false if cleanup is
incomplete (retry later). */
+ public boolean releaseSourceResources(JobBaseConfig jobConfig) {
+ return true;
}
protected abstract Class<?> probeSplitKeyClass(
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index f0987eb97ee..ea70c432e8e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -99,7 +99,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
// Parallel polling support
- private ExecutorService pollExecutor;
+ private ExecutorService snapshotPollExecutor;
private volatile List<CompletableFuture<PollResult>> activePollFutures;
// Stream/binlog reader (single reader for stream split)
@@ -123,7 +123,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
config.getOrDefault(
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
- this.pollExecutor =
+ this.snapshotPollExecutor =
Executors.newFixedThreadPool(
parallelism,
r -> {
@@ -358,7 +358,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
split.splitId(),
split.getTableId().identifier());
},
- pollExecutor));
+ snapshotPollExecutor));
}
CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
@@ -407,11 +407,40 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
baseReq.getJobId());
}
Tuple2<SourceSplitBase, Boolean> splitFlag =
createStreamSplit(offsetMeta, baseReq);
- this.streamSplit = splitFlag.f0.asStreamSplit();
+ StreamSplit newStreamSplit = splitFlag.f0.asStreamSplit();
+
+ // offset guard: reuse only when request start == reader's consumed
position. Compare by
+ // compareTo (LSN), NOT equals -- PG drops lsn_proc/commit so same
position differs by map.
+ if (this.streamReader != null && this.streamSplitState != null) {
+ Offset requestStart = newStreamSplit.getStartingOffset();
+ Offset readerPos = this.streamSplitState.getStartingOffset();
+ if (requestStart != null
+ && readerPos != null
+ && requestStart.compareTo(readerPos) == 0) {
+ LOG.info(
+ "Reuse live stream reader for job {} at offset {}",
+ baseReq.getJobId(),
+ requestStart);
+ // Refresh split so commitSourceOffset advances PG
confirmed_lsn to the FE-committed
+ // offset (== reader pos); poll/offset keep using
streamSplitState.
+ this.streamSplit = newStreamSplit;
+ SplitReadResult reuseResult = new SplitReadResult();
+
reuseResult.setSplits(Collections.singletonList(this.streamSplit));
+ Map<String, Object> reuseStates = new HashMap<>();
+ reuseStates.put(this.streamSplit.splitId(),
this.streamSplitState);
+ reuseResult.setSplitStates(reuseStates);
+ return reuseResult;
+ }
+ LOG.info(
+ "Rebuild stream reader for job {}: requestStart={},
readerPos={}",
+ baseReq.getJobId(),
+ requestStart,
+ readerPos);
+ }
- // Close previous stream reader to release resources (e.g. PG
replication slot)
- // before creating a new one. This prevents connection leaks when a
cancelled
- // task's reader is still active while a new task arrives.
+ this.streamSplit = newStreamSplit;
+ // Close previous stream reader (rebuild path) before creating a new
one. This prevents
+ // connection leaks when a cancelled task's reader is still active
while a new task arrives.
if (this.streamReader != null) {
LOG.info(
"Closing previous stream reader before creating new one
for job {}",
@@ -420,6 +449,10 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
this.streamReader = null;
}
+ // Rebuild path: fail loudly if the source position is gone (e.g. slot
dropped) instead of
+ // silently re-locating from a lost offset.
+ validateStreamSource(offsetMeta, baseReq);
+
this.streamReader = getBinlogSplitReader(baseReq);
LOG.info("Prepare stream split: {}", this.streamSplit.toString());
@@ -443,6 +476,10 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
return result;
}
+ // Source-specific check before (re)building the stream reader; default
no-op.
+ protected void validateStreamSource(
+ Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {}
+
@Override
public Iterator<SourceRecord> pollRecords() throws Exception {
if (!snapshotReaderContexts.isEmpty()) {
@@ -550,7 +587,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
}
return null;
},
- pollExecutor);
+ snapshotPollExecutor);
activePollFutures.add(future);
}
@@ -965,12 +1002,20 @@ public abstract class JdbcIncrementalSourceReader
extends AbstractCdcSourceReade
public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
finishSplitRecords();
+ shutdownSnapshotPollExecutor();
if (tableSchemas != null) {
tableSchemas.clear();
tableSchemas = null;
}
}
+ @Override
+ protected void shutdownSnapshotPollExecutor() {
+ if (snapshotPollExecutor != null) {
+ snapshotPollExecutor.shutdownNow();
+ }
+ }
+
@Override
public DeserializeResult deserialize(Map<String, String> config,
SourceRecord element)
throws IOException {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 72a8338580d..efea979a62c 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -127,7 +127,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
private Set<String> completedSplitIds = ConcurrentHashMap.newKeySet();
// Parallel polling support
- private ExecutorService pollExecutor;
+ private ExecutorService snapshotPollExecutor;
private volatile List<CompletableFuture<PollResult>> activePollFutures;
// Binlog reader (single reader for binlog split)
@@ -149,7 +149,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
config.getOrDefault(
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
- this.pollExecutor =
+ this.snapshotPollExecutor =
Executors.newFixedThreadPool(
parallelism,
r -> {
@@ -404,7 +404,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
split.splitId(),
split.getTableId().identifier());
},
- pollExecutor));
+ snapshotPollExecutor));
}
CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
@@ -434,7 +434,37 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
// Load tableSchemas from FE if available (avoids re-discover on
restart)
tryLoadTableSchemasFromRequest(baseReq);
Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta,
baseReq);
- this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+ MySqlBinlogSplit newBinlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+
+ // offset guard: reuse the live binlog reader only when the request
start offset equals the
+ // reader's real consumed position, so steady-state rounds skip
reconnect + binlog
+ // re-position.
+ if (this.binlogReader != null && this.binlogSplitState != null) {
+ BinlogOffset requestStart = newBinlogSplit.getStartingOffset();
+ BinlogOffset readerPos = this.binlogSplitState.getStartingOffset();
+ if (requestStart != null
+ && readerPos != null
+ && requestStart.compareTo(readerPos) == 0) {
+ LOG.info(
+ "Reuse live binlog reader for job {} at offset {}",
+ baseReq.getJobId(),
+ requestStart);
+ this.binlogSplit = newBinlogSplit;
+ SplitReadResult reuseResult = new SplitReadResult();
+
reuseResult.setSplits(Collections.singletonList(this.binlogSplit));
+ Map<String, Object> reuseStates = new HashMap<>();
+ reuseStates.put(this.binlogSplit.splitId(),
this.binlogSplitState);
+ reuseResult.setSplitStates(reuseStates);
+ return reuseResult;
+ }
+ LOG.info(
+ "Rebuild binlog reader for job {}: requestStart={},
readerPos={}",
+ baseReq.getJobId(),
+ requestStart,
+ readerPos);
+ }
+
+ this.binlogSplit = newBinlogSplit;
// Close previous binlog reader to release resources before creating a
new one.
// This prevents connection leaks when a cancelled task's reader is
still active
@@ -568,7 +598,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
return null;
},
- pollExecutor);
+ snapshotPollExecutor);
activePollFutures.add(future);
}
@@ -1195,12 +1225,20 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
public synchronized void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
finishSplitRecords();
+ shutdownSnapshotPollExecutor();
if (tableSchemas != null) {
tableSchemas.clear();
tableSchemas = null;
}
}
+ @Override
+ protected void shutdownSnapshotPollExecutor() {
+ if (snapshotPollExecutor != null) {
+ snapshotPollExecutor.shutdownNow();
+ }
+ }
+
@Override
public DeserializeResult deserialize(Map<String, String> config,
SourceRecord element)
throws IOException {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 027a4c7af94..fe37d5c392f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -27,6 +27,7 @@ import org.apache.doris.cdcclient.utils.SmallFileMgr;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
+import org.apache.doris.job.cdc.request.JobBaseRecordRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -73,6 +74,7 @@ import
io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.AutoCreateMode;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
@@ -491,6 +493,59 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
+ // Detect a replication slot that was dropped (or dropped and recreated)
out from under us while
+ // the job was paused/retrying. Recreating it silently would resume from a
position whose WAL is
+ // already gone -> data loss. Fail with a fixed marker so FE classifies it
as non-resumable.
+ @Override
+ protected void validateStreamSource(
+ Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
+ PostgresSourceConfig sourceConfig = getSourceConfig(baseReq);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ SlotState slotState =
+ connection.getReplicationSlotState(
+ dialect.getSlotName(), dialect.getPluginName());
+ if (slotState == null) {
+ throw new CdcClientException(
+ String.format(
+ "Replication slot invalidated for job %s: slot
%s not found on the"
+ + " upstream (dropped externally),
cannot resume from the"
+ + " committed position without data
loss.",
+ baseReq.getJobId(), dialect.getSlotName()));
+ }
+ Lsn requestedLsn = extractRequestedLsn(offsetMeta);
+ Lsn restartLsn = slotState.slotRestartLsn();
+ // restart_lsn must stay <= committed position; a higher one means
the slot was
+ // recreated
+ // and the WAL between them was discarded, so resuming would
silently skip data.
+ if (requestedLsn != null
+ && requestedLsn.asLong() > 0
+ && restartLsn != null
+ && restartLsn.compareTo(requestedLsn) > 0) {
+ throw new CdcClientException(
+ String.format(
+ "Replication slot invalidated for job %s: slot
%s restart_lsn %s is"
+ + " ahead of the committed position %s
(slot recreated),"
+ + " cannot resume without data loss.",
+ baseReq.getJobId(),
+ dialect.getSlotName(),
+ restartLsn,
+ requestedLsn));
+ }
+ }
+ }
+
+ private Lsn extractRequestedLsn(Map<String, Object> offsetMeta) {
+ if (offsetMeta == null || offsetMeta.get(SourceInfo.LSN_KEY) == null) {
+ return null;
+ }
+ try {
+ return
Lsn.valueOf(Long.parseLong(String.valueOf(offsetMeta.get(SourceInfo.LSN_KEY))));
+ } catch (NumberFormatException ex) {
+ return null;
+ }
+ }
+
@Override
public int compareOffset(CompareOffsetRequest compareOffsetRequest) {
Map<String, String> offsetFirst =
compareOffsetRequest.getOffsetFirst();
@@ -606,6 +661,15 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
@Override
public void close(JobBaseConfig jobConfig) {
super.close(jobConfig);
+ releaseSourceResources(jobConfig);
+ }
+
+ /**
+ * Drop the Doris-owned slot/publication. Returns false if either is still
present (e.g. a dead
+ * BE's stale walsender holds the slot until PG reclaims it), so the
caller can retry later.
+ */
+ @Override
+ public boolean releaseSourceResources(JobBaseConfig jobConfig) {
Map<String, String> config = jobConfig.getConfig();
String jobId = jobConfig.getJobId();
String slotName = resolveSlotName(config, jobId);
@@ -618,29 +682,75 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
slotName,
pubName,
jobId);
- return;
+ return true;
}
- try {
- PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig);
- PostgresDialect dialect = new PostgresDialect(sourceConfig);
- if (dropSlot) {
- LOG.info("Dropping auto-created replication slot {} for job
{}", slotName, jobId);
+ PostgresDialect dialect = new
PostgresDialect(getSourceConfig(jobConfig));
+ boolean cleaned = true;
+ if (dropPub) {
+ LOG.info("Dropping auto-created publication {} for job {}",
pubName, jobId);
+ try (PostgresConnection connection = dialect.openJdbcConnection())
{
+ connection.execute("DROP PUBLICATION IF EXISTS " + pubName);
+ } catch (Exception ex) {
+ LOG.warn(
+ "Failed to drop publication {} for job {}: {}",
+ pubName,
+ jobId,
+ ex.getMessage());
+ }
+ if (publicationExists(dialect, pubName)) {
+ LOG.warn(
+ "Publication {} for job {} still present after drop,
will retry",
+ pubName,
+ jobId);
+ cleaned = false;
+ }
+ }
+ if (dropSlot) {
+ LOG.info("Dropping auto-created replication slot {} for job {}",
slotName, jobId);
+ try {
dialect.removeSlot(slotName);
- } else {
- LOG.info("Skipping drop of user-provided slot {} for job {}",
slotName, jobId);
+ } catch (Exception ex) {
+ LOG.warn(
+ "Drop of replication slot {} for job {} failed: {}",
+ slotName,
+ jobId,
+ ex.getMessage());
}
- if (dropPub) {
- LOG.info("Dropping auto-created publication {} for job {}",
pubName, jobId);
- try (PostgresConnection connection =
dialect.openJdbcConnection()) {
- connection.execute("DROP PUBLICATION IF EXISTS " +
pubName);
- }
- } else {
- LOG.info(
- "Skipping drop of user-provided publication {} for job
{}", pubName, jobId);
+ if (slotExists(dialect, slotName)) {
+ LOG.warn(
+ "Replication slot {} for job {} still present after
drop, will retry",
+ slotName,
+ jobId);
+ cleaned = false;
}
+ }
+ return cleaned;
+ }
+
+ private boolean slotExists(PostgresDialect dialect, String slotName) {
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ return connection.queryAndMap(
+ "SELECT 1 FROM pg_replication_slots WHERE slot_name = '" +
slotName + "'",
+ rs -> rs.next());
} catch (Exception ex) {
+ // Can't verify -> treat as present so the bounded retry keeps
trying instead of
+ // leaking.
LOG.warn(
- "Failed to clean up postgres resources for job {}: {}",
jobId, ex.getMessage());
+ "Failed to check replication slot {} existence: {}",
slotName, ex.getMessage());
+ return true;
+ }
+ }
+
+ private boolean publicationExists(PostgresDialect dialect, String pubName)
{
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ return connection.queryAndMap(
+ "SELECT 1 FROM pg_publication WHERE pubname = '" + pubName
+ "'",
+ rs -> rs.next());
+ } catch (Exception ex) {
+ // Can't verify -> treat as present so the bounded retry keeps
trying instead of
+ // leaking.
+ LOG.warn("Failed to check publication {} existence: {}", pubName,
ex.getMessage());
+ return true;
}
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
new file mode 100644
index 00000000000..bbca6e22830
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_slot_dropped_during_incremental.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// When the replication slot is dropped out from under a running incremental
job,
+// resuming from the committed position would silently skip the WAL discarded
with
+// the slot. On rebuild, validateStreamSource detects the missing slot and
fails with
+// the "Replication slot invalidated" marker; FE classifies that as
CANNOT_RESUME_ERR,
+// so the job settles in PAUSED and is NOT pulled back to RUNNING by
auto-resume.
+//
+// Uses a user-provided slot so (1) the slot name is known up front and (2)
Doris does
+// not auto-recreate it (createSlotForGlobalStreamSplit only fires for
Doris-owned
+// slots), keeping the "slot not found" branch deterministic.
+//
+// We assert state + error marker only — NOT a hard slot-count check, which is
+// TOCTOU-flaky against the cdc_client winding down its connection.
+suite("test_streaming_postgres_job_slot_dropped_during_incremental",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_slot_dropped_job"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "slot_dropped_pg_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def userSlot = "slot_dropped_user_slot"
+ def userPub = "slot_dropped_user_pub"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "id" int PRIMARY KEY,
+ "name" varchar(200)
+ )"""
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """CREATE PUBLICATION ${userPub} FOR TABLE
${pgDB}.${pgSchema}.${table1}"""
+ def existing = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${userSlot}'"""
+ if (existing[0][0] != 0) {
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ }
+ sql """SELECT pg_create_logical_replication_slot('${userSlot}',
'pgoutput')"""
+ }
+
+ // offset=latest skips snapshot and goes straight to streaming;
max_interval=3
+ // keeps tasks short so the rebuild after the drop happens quickly.
+ sql """CREATE JOB ${jobName}
+ PROPERTIES ("max_interval" = "3")
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "slot_name" = "${userSlot}",
+ "publication_name" = "${userPub}",
+ "offset" = "latest"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ Awaitility.await().atMost(120, SECONDS).pollInterval(1,
SECONDS).until({
+ def st = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ st.size() == 1 && st.get(0).get(0) == "RUNNING"
+ })
+
+ // ===== Phase 1: confirm steady-state incremental sync =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ for (int i = 1; i <= 10; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i},
'name_${i}')"""
+ }
+ }
+ Awaitility.await().atMost(180, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
+ cnt.size() == 1 && cnt.get(0).get(0) >= 10
+ })
+
+ // ===== Phase 2: drop the slot out from under the running job =====
+ // Terminate whatever consumer holds the slot, then drop it. Retry
until gone:
+ // an auto-resume task may briefly re-acquire it before we manage to
drop.
+ Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({
+ boolean dropped = false
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SELECT pg_terminate_backend(active_pid) FROM
pg_replication_slots
+ WHERE slot_name = '${userSlot}' AND active_pid IS NOT
NULL"""
+ def cnt = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${userSlot}'"""
+ if (cnt[0][0] == 0) {
+ dropped = true
+ } else {
+ try {
+ sql """SELECT
pg_drop_replication_slot('${userSlot}')"""
+ dropped = true
+ } catch (Exception e) {
+ log.info("slot still active, retry drop: " +
e.getMessage())
+ }
+ }
+ }
+ dropped
+ })
+
+ // Generate WAL so the rebuilt reader is forced to locate the
now-missing slot.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ for (int i = 100; i < 110; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (${i},
'after_drop_${i}')"""
+ }
+ }
+
+ // ===== Phase 3: job settles in PAUSED with the slot-invalidated
marker =====
+ Awaitility.await().atMost(240, SECONDS).pollInterval(3,
SECONDS).until({
+ def r = sql """select status, ErrorMsg from jobs("type"="insert")
where Name='${jobName}'"""
+ if (r.size() != 1) {
+ return false
+ }
+ def status = r.get(0).get(0)
+ def errMsg = r.get(0).get(1)
+ log.info("waiting slot-invalidated PAUSED: status=${status}
errMsg=${errMsg}")
+ status == "PAUSED" && errMsg != null &&
errMsg.toString().contains("Replication slot invalidated")
+ })
+
+ // ===== Phase 4: CANNOT_RESUME — must stay PAUSED, not auto-resumed
to RUNNING =====
+ for (int i = 0; i < 8; i++) {
+ sleep(5000)
+ def r = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ assert r.size() == 1 && r.get(0).get(0) == "PAUSED" :
+ "job must stay PAUSED after slot invalidation, got ${r}"
+ }
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ def slotLeft = sql """SELECT COUNT(1) FROM pg_replication_slots
WHERE slot_name = '${userSlot}'"""
+ if (slotLeft[0][0] != 0) {
+ sql """SELECT pg_drop_replication_slot('${userSlot}')"""
+ }
+ sql """DROP PUBLICATION IF EXISTS ${userPub}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ }
+ sql """drop table if exists ${currentDb}.${table1} force"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]