JNSimba commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2715155832


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -170,39 +169,14 @@ public SplitReadResult 
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
-        } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
-        }
+        // Create a new snapshot/binlog split based on offset and start the 
reader.
+        LOG.info("create new split reader for {} with offset {}", 
baseReq.getJobId(), offsetMeta);
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        MySqlSplit split = splitFlag.f0;
+        // it's necessary to ensure that the binlog reader is already closed.
+        this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
+        this.currentSplit = split;

Review Comment:
   Currently, the reader is closed in finishSplitRecords after each read 
operation, so the other branches here are probably useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to