github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3401480106
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,68 @@ public void close(String jobId) {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
+
+ // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 =
untracked (e.g. TVF),
+ // skip.
+ private void releaseIdleReaders() {
+ long now = System.currentTimeMillis();
+ for (String jobId : jobContexts.keySet()) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null || !lock.tryLock()) {
+ continue;
+ }
+ SourceReader toRelease = null;
+ JobBaseConfig releaseConfig = null;
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || context.lastAliveTime <= 0 ||
context.maxIntervalMs <= 0) {
+ continue;
+ }
+ long timeout =
+ Math.max(
+ (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+ * context.maxIntervalMs,
+ Constants.IDLE_READER_MIN_TIMEOUT_MS);
+ if (now - context.lastAliveTime <= timeout) {
+ continue;
+ }
+ LOG.info(
+ "Releasing idle reader for job {}, idle {} ms, keep
slot",
+ jobId,
+ now - context.lastAliveTime);
+ jobContexts.remove(jobId);
+ toRelease = context.reader;
+ releaseConfig = context.jobConfig;
+ } finally {
+ lock.unlock();
+ }
+ // Release outside the lock so blocking IO never stalls
getReaderAndClaim/detach.
+ if (toRelease != null && releaseConfig != null) {
+ try {
+ toRelease.release(releaseConfig);
+ } catch (Exception ex) {
Review Comment:
The idle reaper has the same untracking-before-release problem on a
different path. It removes `jobContexts` before the keep-slot release succeeds;
if `toRelease.release()` fails, the live reader is no longer reachable by
`isOwner()`, `detachReaderIfOwner()`, or the next reaper pass, and the next
`/api/writeRecords` will create another reader for the same source. Please only
remove the context after release succeeds, or keep a tombstone/failed-release
state that prevents a new claim from creating a second live reader.
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -1002,7 +1010,8 @@ private void initSourceReader() throws JobException {
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
- Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+ // Route to the bound BE so close tears down its live reader;
dead/unbound falls back to random.
+ Backend backend = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
JobBaseConfig requestParams =
Review Comment:
For final cleanup this fallback can send `/api/close` to a random
load-available BE instead of the BE that owns the live reader.
`selectBackend(cloudCluster, boundBackendId)` deliberately ignores the bound BE
when `isLoadAvailable()` is false, but that BE's cdc_client process can still
be alive and holding the reused PG slot/MySQL reader. On DROP/cleanMeta,
closing a random BE is a no-op for that live reader, and the idle reaper only
calls `release()` (keep slot), so source-side state can be leaked after the job
metadata is deleted. This is a separate instance from the rebind thread because
this is final job cleanup; please address the remembered bound BE directly when
it exists/reachable, or fail/retry cleanup instead of silently falling back.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -90,20 +107,55 @@ public SourceReader getReaderAndClaim(JobBaseConfig
jobConfig, String taskId) {
DataSource ds = resolveDataSource(jobConfig.getDataSource());
String jobId = jobConfig.getJobId();
Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+ SourceReader staleReader = null;
+ JobBaseConfig staleConfig = null;
+ SourceReader reader;
lock.lock();
try {
JobContext context = jobContexts.get(jobId);
+ if (context != null
+ && jobConfig instanceof WriteRecordRequest
+ && ((WriteRecordRequest) jobConfig).isRebuildReader()) {
+ // FE declared the previous task abnormal: swap in a fresh
reader instance so the
+ // old task's thread can never reach the new fetcher.
+ LOG.info(
+ "Rebuild reader for job {} on FE request, discard
current instance", jobId);
+ jobContexts.remove(jobId);
+ staleReader = context.reader;
+ staleConfig = context.jobConfig != null ? context.jobConfig :
jobConfig;
+ context = null;
+ }
if (context == null) {
LOG.info("Creating new reader for job {}, dataSource {}",
jobId, ds);
context = new JobContext(jobId, ds, jobConfig.getConfig());
context.initializeReader();
jobContexts.put(jobId, context);
}
context.ownerTaskId = taskId;
- return context.getReader(ds);
+ context.jobConfig = jobConfig;
+ if (jobConfig instanceof WriteRecordRequest) {
+ context.maxIntervalMs = ((WriteRecordRequest)
jobConfig).getMaxInterval() * 1000;
+ }
+ context.lastAliveTime = System.currentTimeMillis();
+ reader = context.getReader(ds);
} finally {
lock.unlock();
}
+ if (staleReader != null) {
+ // free the engine/slot connection before the caller submits the
new fetcher
+ try {
+ staleReader.release(staleConfig);
+ } catch (Exception ex) {
Review Comment:
This rebuild path removes the old context and installs a new reader before
`staleReader.release()` is known to have stopped the previous engine. If
`release()` throws (for example from `finishSplitRecords()` / reader close) or
leaves the old Debezium fetcher alive, the exception is only logged and this
method still returns the fresh reader, so the caller can submit a second binlog
stream for the same job/slot while the stale task continues polling. This is
distinct from the existing ownership-gate threads: those prevent stale commits,
but they do not prevent duplicate source consumption. Please make the rebuild
fail/retry, or keep the old reader tracked, until the old engine is
deterministically stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]