github-actions[bot] commented on code in PR #64728:
URL: https://github.com/apache/doris/pull/64728#discussion_r3459078826


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -635,21 +617,23 @@ public String getPersistInfo() {
     // ============ Async split progress (driven by scheduler each tick) 
============
 
     /**
-     * One-time setup at CREATE.
-     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
-     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
-     *   binlog phase can start immediately; no snapshot splitting will happen.
+     * One-time setup at CREATE. Opens the remote reader for every mode (see 
initSourceReader) so
+     * source-side problems fail fast, then:
+     * - initial/snapshot mode: also init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): nothing further; the 
binlog phase can start
+     *   immediately and no snapshot splitting will happen.
      */
     @Override
     public void initOnCreate(List<String> syncTables) throws JobException {
-        if (!checkNeedSplitChunks(sourceProperties)) {
-            initSourceReader();
-            return;
-        }
-        synchronized (splitsLock) {
-            this.cachedSyncTables = syncTables;
-            this.committedSplitProgress = new SplitProgress();
-            this.cdcSplitProgress = new SplitProgress();

Review Comment:
   This creates a BE-local CDC reader during CREATE on whichever backend 
`selectBackend()` returns, but the provider never records that backend. The 
later split fetch/write paths and `cleanMeta()` all call `selectBackend()` 
again, so in a multi-BE cluster `initOnCreate()` can initialize BE A, the job 
can run on BE B, and DROP/cleanup can send `/api/close` to BE C. The CDC client 
keeps `Env.jobContexts` per BE, and `/api/close` only closes the context on the 
addressed BE, so the CREATE-time reader on BE A is left behind with its 
executor/config. Before this change initial/snapshot jobs did not create that 
extra BE-local reader at CREATE time. Please either pin/record the backend 
chosen here and use it for close/release, close all possible BE-side contexts, 
or use a validation endpoint that does not create a persistent reader context.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -864,17 +848,35 @@ protected List<SnapshotSplit> rpcFetchSplitsBatch(String 
table, Object[] nextSpl
             if (code != TStatusCode.OK) {
                 throw new JobException("fetchSplits backend error: " + 
result.getStatus().getErrorMsgs(0));
             }
-            ResponseBody<List<SnapshotSplit>> resp = objectMapper.readValue(
-                    result.getResponse(),
-                    new TypeReference<ResponseBody<List<SnapshotSplit>>>() {});
-            return resp.getData();
+            return parseCdcResponseData(
+                    result.getResponse(), new 
TypeReference<List<SnapshotSplit>>() {});
         } catch (TimeoutException te) {
             throw new JobException("fetchSplits RPC timeout: jobId=" + 
getJobId() + " table=" + table);
         } catch (Exception ex) {
             throw new JobException("fetchSplits failed: " + ex.getMessage());
         }
     }
 
+    /**
+     * Decode a remote response envelope. A failure is returned as {@code 
{code:1, data:"<message>"}}
+     * over HTTP 200, while success carries the typed payload in {@code data}. 
Decode the envelope
+     * with a lenient {@link JsonNode} data field first so a failure throws 
the raw response (which
+     * carries the real error in {@code data}) instead of a misleading 
type-mismatch from forcing the
+     * success type onto an error string. Package-private for unit testing.
+     */
+    <T> T parseCdcResponseData(String response, TypeReference<T> dataType) 
throws JobException {
+        ResponseBody<JsonNode> body;
+        try {
+            body = objectMapper.readValue(response, new 
TypeReference<ResponseBody<JsonNode>>() {});
+        } catch (JsonProcessingException e) {
+            throw new JobException(response);

Review Comment:
   `convertValue` is outside the normalization boundary. If the CDC client 
returns a successful envelope with an unexpected data shape, e.g. 
`{'code':0,'data':'not-a-map'}` for `fetchEndOffset`, Jackson throws an 
unchecked `IllegalArgumentException` here instead of the helper's 
`JobException` with the raw response. That reintroduces the misleading 
parse-error path this helper is meant to avoid for 
`fetchRemoteMeta`/`compareOffset`. Please catch `IllegalArgumentException` 
around the conversion, wrap it as `JobException` with the raw response or parse 
context, and add a unit test for `code=0` with incompatible `data`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to