deardeng commented on code in PR #52971:
URL: https://github.com/apache/doris/pull/52971#discussion_r2194236868


##########
fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java:
##########
@@ -1367,18 +1464,43 @@ private <T extends Writable> void logEdit(short op, 
List<T> entries) throws IOEx
         if (!batch.getJournalEntities().isEmpty()) {
             journal.write(batch);
         }
+        txId += entries.size();
     }
 
     /**
-     * Write an operation to the edit log. Do not sync to persistent store yet.
+     * Asynchronously log an edit by putting it into a blocking queue and 
waiting for completion.
+     * This method blocks until the log is written and returns the logId.
      */
-    private synchronized long logEdit(short op, Writable writable) {
-        if (this.getNumEditStreams() == 0) {
-            LOG.error("Fatal Error : no editLog stream", new Exception());
-            throw new Error("Fatal Error : no editLog stream");
+    public long logEditWithFlush(short op, Writable writable) {
+        LogEditRequest req = new LogEditRequest(op, writable);
+        while (true) {
+            try {
+                logEditQueue.put(req);
+                break;
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted during put, will sleep and retry.");
+                try {
+                    Thread.sleep(100); // 等一会儿再试

Review Comment:
   english



##########
fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java:
##########
@@ -139,6 +159,84 @@ public EditLog(String nodeName) {
         } else {
             throw new IllegalArgumentException("Unknown edit log type: " + 
journalType);
         }
+
+        // Flush thread initialization block
+        flushThread = new Thread(() -> {
+            while (true) {
+                flushEditLog();
+            }
+        }, "EditLog-Flusher");
+        flushThread.setDaemon(true);
+        flushThread.start();
+    }
+
+    private void flushEditLog() {
+        java.util.List<LogEditRequest> batch = new java.util.ArrayList<>();

Review Comment:
   java.util这些包结构前缀,应该可以删掉



##########
fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java:
##########
@@ -139,6 +159,84 @@ public EditLog(String nodeName) {
         } else {
             throw new IllegalArgumentException("Unknown edit log type: " + 
journalType);
         }
+
+        // Flush thread initialization block
+        flushThread = new Thread(() -> {
+            while (true) {
+                flushEditLog();
+            }
+        }, "EditLog-Flusher");
+        flushThread.setDaemon(true);
+        flushThread.start();
+    }
+
+    private void flushEditLog() {
+        java.util.List<LogEditRequest> batch = new java.util.ArrayList<>();
+        try {
+            batch.clear();
+            LogEditRequest first = logEditQueue.poll(100, 
java.util.concurrent.TimeUnit.MILLISECONDS);
+            if (first == null) {
+                return;
+            }
+            batch.add(first);
+            logEditQueue.drainTo(batch, Config.batch_edit_log_max_item_num - 
1);
+
+            int itemNum = Math.max(1, 
Math.min(Config.batch_edit_log_max_item_num, batch.size()));

Review Comment:
   这里是不是 不用Math.max了? 181行已经add了一个,batch.size这里至少>=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to