This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 39af044ac3f [opt](editlog) batch flush editlog (#52971)
39af044ac3f is described below

commit 39af044ac3ff6f47d3d578ddfb2c3563dced28e2
Author: Yongqiang YANG <[email protected]>
AuthorDate: Tue Jul 15 16:23:58 2025 +0800

    [opt](editlog) batch flush editlog (#52971)
    
    ### What problem does this PR solve?
    1.  group editlog from multi threads.
    2.  multi threading publish editlog to group
    
    Test using below scripts.
    
    `for i in `seq 1 1000`; do
        echo "Starting Sysbench for db$i"
    nohup sysbench --db-driver=mysql --mysql-host=127.0.0.1
    --mysql-port=9030 --mysql-user=root --mysql-db=paralldb${i} --tables=1
    --threads=1 --time=180 --report-interval=10 oltp_insert run >result_${i}
    2>&1 &
    done
    wait`
    
    Insert completed in 180s.
    
    Without batch:
    
    result_99:    total number of events:              14
    result_99:    events (avg/stddev):           14.0000/0.00
    result_990:    total number of events:              14
    result_990:    events (avg/stddev):           14.0000/0.00
    result_991:    total number of events:              14
    result_991:    events (avg/stddev):           14.0000/0.00
    result_992:    total number of events:              14
    result_992:    events (avg/stddev):           14.0000/0.00
    result_993:    total number of events:              14
    result_993:    events (avg/stddev):           14.0000/0.00
    result_994:    total number of events:              14
    result_994:    events (avg/stddev):           14.0000/0.00
    result_995:    total number of events:              14
    result_995:    events (avg/stddev):           14.0000/0.00
    result_996:    total number of events:              14
    result_996:    events (avg/stddev):           14.0000/0.00
    result_997:    total number of events:              14
    result_997:    events (avg/stddev):           14.0000/0.00
    result_998:    total number of events:              14
    result_998:    events (avg/stddev):           14.0000/0.00
    result_999:    total number of events:              14
    result_999:    events (avg/stddev):           14.0000/0.00
    
    Enable batch:
    
    result_99:    events (avg/stddev):           263.0000/0.00
    result_990:    total number of events:              276
    result_990:    events (avg/stddev):           276.0000/0.00
    result_991:    total number of events:              275
    result_991:    events (avg/stddev):           275.0000/0.00
    result_992:    total number of events:              279
    result_992:    events (avg/stddev):           279.0000/0.00
    result_993:    total number of events:              274
    result_993:    events (avg/stddev):           274.0000/0.00
    result_994:    total number of events:              269
    result_994:    events (avg/stddev):           269.0000/0.00
    result_995:    total number of events:              272
    result_995:    events (avg/stddev):           272.0000/0.00
    result_996:    total number of events:              272
    result_996:    events (avg/stddev):           272.0000/0.00
    result_997:    total number of events:              271
    result_997:    events (avg/stddev):           271.0000/0.00
    result_998:    total number of events:              255
    result_998:    events (avg/stddev):           255.0000/0.00
    result_999:    total number of events:              258
    result_999:    events (avg/stddev):           258.0000/0.00
    
    doris_fe_journal_write_batch_data_size{quantile="0.75"} 39172.0
    doris_fe_journal_write_batch_data_size{quantile="0.95"} 67803.0
    doris_fe_journal_write_batch_data_size{quantile="0.98"} 72049.0
    doris_fe_journal_write_batch_data_size{quantile="0.99"} 72707.0
    doris_fe_journal_write_batch_data_size{quantile="0.999"} 79449.0
    doris_fe_journal_write_batch_data_size_sum {} 1.3869330432422793E9
    doris_fe_journal_write_batch_data_size_count {} 53715
    
    doris_fe_journal_write_batch_size{quantile="0.75"} 73.0
    doris_fe_journal_write_batch_size{quantile="0.95"} 100.0
    doris_fe_journal_write_batch_size{quantile="0.98"} 100.0
    doris_fe_journal_write_batch_size{quantile="0.99"} 100.0
    doris_fe_journal_write_batch_size{quantile="0.999"} 100.0
    doris_fe_journal_write_batch_size_sum {} 2379961.017209651
    doris_fe_journal_write_batch_size_count {} 53715
    
    doris_fe_journal_write_latency_ms{quantile="0.75"} 6.0
    doris_fe_journal_write_latency_ms{quantile="0.95"} 10.0
    doris_fe_journal_write_latency_ms{quantile="0.98"} 21.0
    doris_fe_journal_write_latency_ms{quantile="0.99"} 54.0
    doris_fe_journal_write_latency_ms{quantile="0.999"} 175.0
    doris_fe_journal_write_latency_ms_sum {} 329275.8826554282
    doris_fe_journal_write_latency_ms_count {} 53715
    
    ---------
    
    Co-authored-by: Yongqiang YANG <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  16 ++
 .../org/apache/doris/journal/JournalBatch.java     |   5 +
 .../apache/doris/journal/bdbje/BDBJEJournal.java   |  10 +-
 .../java/org/apache/doris/metric/MetricRepo.java   |   3 +
 .../java/org/apache/doris/persist/EditLog.java     | 187 ++++++++++++++++++---
 .../doris/transaction/PublishVersionDaemon.java    | 159 +++++++++++++-----
 6 files changed, 308 insertions(+), 72 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0eb9d46a729..71ab35692a6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -251,6 +251,10 @@ public class Config extends ConfigBase {
                     + "Indicates the writting count before a rest"})
     public static long batch_edit_log_continuous_count_for_rest = 1000;
 
+    @ConfField(description = {
+            "攒批写 EditLog。", "Batch EditLog writing"})
+    public static boolean enable_batch_editlog = false;
+
     @ConfField(description = {"元数据同步的容忍延迟时间,单位为秒。如果元数据的延迟超过这个值,非主 FE 会停止提供服务",
             "The toleration delay time of meta data synchronization, in 
seconds. "
                     + "If the delay of meta data exceeds this value, 
non-master FE will stop offering service"})
@@ -546,6 +550,18 @@ public class Config extends ConfigBase {
             "the upper limit of failure logs of PUBLISH_VERSION task"})
     public static long publish_version_task_failed_log_threshold = 80;
 
+    @ConfField(masterOnly = true, description = {"Publish 线程池的数目",
+            "Num of thread to handle publish task"})
+    public static int publish_thread_pool_num = 128;
+
+    @ConfField(masterOnly = true, description = {"Publish 线程池的队列大小",
+            "Queue size to store publish task in publish thread pool"})
+    public static int publish_queue_size = 128;
+
+    @ConfField(mutable = true, description = {"是否启用并行发布版本",
+            "Whether to enable parallel publish version"})
+    public static boolean enable_parallel_publish_version = false;
+
     @ConfField(mutable = true, masterOnly = true, description = 
{"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction 
to be committed, in seconds. "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
index e56bf34dfe5..1236a08d0db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.journal;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.io.DataOutputBuffer;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.OperationType;
@@ -70,6 +71,10 @@ public class JournalBatch {
         return entities;
     }
 
+    public boolean shouldFlush() {
+        return size >= Config.batch_edit_log_max_byte_size || entities.size() 
>= Config.batch_edit_log_max_item_num;
+    }
+
     public static class Entity {
         short op;
         DataOutputBuffer data;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index f03fac85ddb..281826505e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -146,10 +146,9 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                     DatabaseEntry theData = new 
DatabaseEntry(entity.getBinaryData());
                     currentJournalDB.put(txn, theKey, theData);  // Put with 
overwrite, it always success
                     dataSize += theData.getSize();
-                    if (i == 0) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("opCode = {}, journal size = {}", 
entity.getOpCode(), theData.getSize());
-                        }
+                    if (i == 0 && LOG.isDebugEnabled()) {
+                        LOG.debug("opCode = {}, journal size = {}, batchNum = 
{}", entity.getOpCode(),
+                                theData.getSize(), entitySize);
                     }
                 }
 
@@ -212,6 +211,9 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                     LOG.warn("write bdb is too slow, cost {}ms, the first 
journal id, batch size {}, data size{}",
                             watch.getTime(), firstId, entitySize, dataSize);
                 }
+                if (MetricRepo.isInit) {
+                    
MetricRepo.HISTO_JOURNAL_WRITE_LATENCY.update(watch.getTime());
+                }
             }
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index ce69d6f993c..ee39f8e90f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -115,6 +115,7 @@ public final class MetricRepo {
     public static LongCounterMetric COUNTER_LARGE_EDIT_LOG;
 
     public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
+    public static Histogram HISTO_JOURNAL_WRITE_LATENCY;
     public static Histogram HISTO_JOURNAL_BATCH_SIZE;
     public static Histogram HISTO_JOURNAL_BATCH_DATA_SIZE;
     public static Histogram HISTO_HTTP_COPY_INTO_UPLOAD_LATENCY;
@@ -455,6 +456,8 @@ public final class MetricRepo {
 
         HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
                 MetricRegistry.name("editlog", "write", "latency", "ms"));
+        HISTO_JOURNAL_WRITE_LATENCY = METRIC_REGISTER.histogram(
+                MetricRegistry.name("journal", "write", "latency", "ms"));
         HISTO_JOURNAL_BATCH_SIZE = METRIC_REGISTER.histogram(
                 MetricRegistry.name("journal", "write", "batch_size"));
         HISTO_JOURNAL_BATCH_DATA_SIZE = METRIC_REGISTER.histogram(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e91b13e32aa..cba5cfd0296 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -109,8 +109,13 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * EditLog maintains a log of the memory modifications.
@@ -119,13 +124,30 @@ import java.util.Map;
 public class EditLog {
     public static final Logger LOG = LogManager.getLogger(EditLog.class);
 
+    // Helper class to hold log edit requests
+    private static class EditLogItem {
+        final short op;
+        final Writable writable;
+        final Object lock = new Object();
+        boolean finished = false;
+        long logId = -1;
+
+        EditLogItem(short op, Writable writable) {
+            this.op = op;
+            this.writable = writable;
+        }
+    }
+
+    private final BlockingQueue<EditLogItem> logEditQueue = new 
LinkedBlockingQueue<>();
+    private final Thread flushThread;
+
     private EditLogOutputStream editStream = null;
 
     private long txId = 0;
 
-    private long numTransactions;
-    private long totalTimeTransactions;
 
+    private AtomicLong numTransactions = new AtomicLong(0);
+    private AtomicLong totalTimeTransactions = new AtomicLong(0);
     private Journal journal;
 
     /**
@@ -140,6 +162,84 @@ public class EditLog {
         } else {
             throw new IllegalArgumentException("Unknown edit log type: " + 
journalType);
         }
+
+        // Flush thread initialization block
+        flushThread = new Thread(() -> {
+            while (true) {
+                flushEditLog();
+            }
+        }, "EditLog-Flusher");
+        flushThread.setDaemon(true);
+        flushThread.start();
+    }
+
+    private void flushEditLog() {
+        List<EditLogItem> batch = new ArrayList<>();
+        try {
+            batch.clear();
+            EditLogItem first = logEditQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (first == null) {
+                return;
+            }
+            batch.add(first);
+            logEditQueue.drainTo(batch, Config.batch_edit_log_max_item_num - 
1);
+
+            int itemNum = Math.max(1, 
Math.min(Config.batch_edit_log_max_item_num, batch.size()));
+            JournalBatch journalBatch = new JournalBatch(itemNum);
+
+            // Array to record pairs of logId and num
+            List<long[]> logIdNumPairs = new ArrayList<>();
+            for (EditLogItem req : batch) {
+                journalBatch.addJournal(req.op, req.writable);
+                if (journalBatch.shouldFlush()) {
+                    long logId = journal.write(journalBatch);
+                    logIdNumPairs.add(new long[]{logId, 
journalBatch.getJournalEntities().size()});
+                    journalBatch = new JournalBatch(itemNum);
+                }
+            }
+            // Write any remaining entries in the batch
+            if (!journalBatch.getJournalEntities().isEmpty()) {
+                long logId = journal.write(journalBatch);
+                logIdNumPairs.add(new long[]{logId, 
journalBatch.getJournalEntities().size()});
+            }
+
+            // Notify all producers
+            // For batch with index, assign logId to each request according to 
the batch flushes
+            int reqIndex = 0;
+            for (long[] pair : logIdNumPairs) {
+                long logId = pair[0];
+                int num = (int) pair[1];
+                for (int i = 0; i < num && reqIndex < batch.size(); i++, 
reqIndex++) {
+                    EditLogItem req = batch.get(reqIndex);
+                    req.logId = logId + i;
+                    synchronized (req.lock) {
+                        req.finished = true;
+                        req.lock.notifyAll();
+                    }
+                }
+            }
+
+        } catch (Throwable t) {
+            // Throwable contains all Exception and Error, such as IOException 
and
+            // OutOfMemoryError
+            if (journal instanceof BDBJEJournal) {
+                LOG.error("BDBJE stats : {}", ((BDBJEJournal) 
journal).getBDBStats());
+            }
+            LOG.error("Fatal Error : write stream Exception", t);
+            System.exit(-1);
+        }
+
+        txId += batch.size();
+        // update statistics, etc. (optional, can be added as needed)
+        if (txId >= Config.edit_log_roll_num) {
+            LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, 
will roll edit.", txId,
+                    Config.edit_log_roll_num);
+            rollEditLog();
+            txId = 0;
+        }
+        if (MetricRepo.isInit) {
+            
MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(Long.valueOf(batch.size()));
+        }
     }
 
     public long getMaxJournalId() {
@@ -1354,8 +1454,7 @@ public class EditLog {
         JournalBatch batch = new JournalBatch(itemNum);
         long batchCount = 0;
         for (T entry : entries) {
-            if (batch.getJournalEntities().size() >= 
Config.batch_edit_log_max_item_num
-                    || batch.getSize() >= Config.batch_edit_log_max_byte_size) 
{
+            if (batch.shouldFlush()) {
                 journal.write(batch);
                 batch = new JournalBatch(itemNum);
 
@@ -1376,18 +1475,43 @@ public class EditLog {
         if (!batch.getJournalEntities().isEmpty()) {
             journal.write(batch);
         }
+        txId += entries.size();
     }
 
     /**
-     * Write an operation to the edit log. Do not sync to persistent store yet.
+     * Asynchronously log an edit by putting it into a blocking queue and 
waiting for completion.
+     * This method blocks until the log is written and returns the logId.
      */
-    private synchronized long logEdit(short op, Writable writable) {
-        if (this.getNumEditStreams() == 0) {
-            LOG.error("Fatal Error : no editLog stream", new Exception());
-            throw new Error("Fatal Error : no editLog stream");
+    public long logEditWithQueue(short op, Writable writable) {
+        EditLogItem req = new EditLogItem(op, writable);
+        while (true) {
+            try {
+                logEditQueue.put(req);
+                break;
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted during put, will sleep and retry.");
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ex) {
+                    LOG.warn(" interrupted during sleep, will retry.", ex);
+                }
+            }
+        }
+        synchronized (req.lock) {
+            while (!req.finished) {
+                try {
+                    req.lock.wait();
+                } catch (InterruptedException e) {
+                    LOG.error("Fatal Error : write stream Exception");
+                    System.exit(-1);
+                }
+            }
         }
 
-        long start = System.currentTimeMillis();
+        return req.logId;
+    }
+
+    private synchronized long logEditDirectly(short op, Writable writable) {
         long logId = -1;
         try {
             logId = journal.write(op, writable);
@@ -1404,13 +1528,41 @@ public class EditLog {
         // get a new transactionId
         txId++;
 
+        if (txId >= Config.edit_log_roll_num) {
+            LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, 
will roll edit.", txId,
+                    Config.edit_log_roll_num);
+            rollEditLog();
+            txId = 0;
+        }
+
+        return logId;
+    }
+
+    /**
+     * Write an operation to the edit log. Do not sync to persistent store yet.
+     */
+    private long logEdit(short op, Writable writable) {
+        if (this.getNumEditStreams() == 0) {
+            LOG.error("Fatal Error : no editLog stream", new Exception());
+            throw new Error("Fatal Error : no editLog stream");
+        }
+
+        long start = System.currentTimeMillis();
+        long logId = -1;
+        if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
+            logId = logEditWithQueue(op, writable);
+        } else {
+            logId = logEditDirectly(op, writable);
+        }
+
         // update statistics
         long end = System.currentTimeMillis();
-        numTransactions++;
-        totalTimeTransactions += (end - start);
+        numTransactions.incrementAndGet();
+        totalTimeTransactions.addAndGet(end - start);
         if (MetricRepo.isInit) {
             MetricRepo.HISTO_EDIT_LOG_WRITE_LATENCY.update((end - start));
             MetricRepo.COUNTER_EDIT_LOG_CURRENT.increase(1L);
+            MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(1L);
         }
 
         if (LOG.isDebugEnabled()) {
@@ -1418,17 +1570,6 @@ public class EditLog {
                     txId, numTransactions, totalTimeTransactions, op, end - 
start);
         }
 
-        if (txId >= Config.edit_log_roll_num) {
-            LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, 
will roll edit.", txId,
-                    Config.edit_log_roll_num);
-            rollEditLog();
-            txId = 0;
-        }
-
-        if (MetricRepo.isInit) {
-            MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(1L);
-        }
-
         return logId;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 11219e7267f..46241169101 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.metric.MetricRepo;
@@ -43,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -50,6 +53,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -57,24 +61,33 @@ public class PublishVersionDaemon extends MasterDaemon {
 
     private static final Logger LOG = 
LogManager.getLogger(PublishVersionDaemon.class);
 
+    private static ArrayList<ExecutorService> dbExecutors = new 
ArrayList(Config.publish_thread_pool_num);
+
+    private Set<Long> publishingTxnIds = Sets.newConcurrentHashSet();
+
+    private final MonitoredReentrantReadWriteLock visibleVersionsLock = new 
MonitoredReentrantReadWriteLock(true);
+    private Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+    private Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+
     public PublishVersionDaemon() {
         super("PUBLISH_VERSION", Config.publish_version_interval_ms);
+        for (int i = 0; i < Config.publish_thread_pool_num; i++) {
+            dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, 
Config.publish_queue_size,
+                    "PUBLISH_VERSION_EXEC-" + i, true));
+        }
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
-        Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
-
         try {
-            publishVersion(partitionVisibleVersions, backendPartitions);
-            sendBackendVisibleVersion(partitionVisibleVersions, 
backendPartitions);
+            publishVersion();
+            sendBackendVisibleVersion();
         } catch (Throwable t) {
             LOG.error("errors while publish version to all backends", t);
         }
     }
 
-    private void publishVersion(Map<Long, Long> partitionVisibleVersions, 
Map<Long, Set<Long>> backendPartitions) {
+    private void publishVersion() {
         if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
             return;
         }
@@ -93,8 +106,7 @@ public class PublishVersionDaemon extends MasterDaemon {
             return;
         }
         traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, 
allBackends);
-        tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr,
-                partitionVisibleVersions, backendPartitions);
+        tryFinishTxn(readyTransactionStates, infoService, 
globalTransactionMgr);
     }
 
     private void 
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> 
readyTransactionStates,
@@ -159,13 +171,11 @@ public class PublishVersionDaemon extends MasterDaemon {
     }
 
     private void tryFinishTxn(List<TransactionState> readyTransactionStates,
-                                     SystemInfoService infoService, 
GlobalTransactionMgrIface globalTransactionMgr,
-                                     Map<Long, Long> partitionVisibleVersions, 
Map<Long, Set<Long>> backendPartitions) {
+            SystemInfoService infoService, GlobalTransactionMgrIface 
globalTransactionMgr) {
         for (TransactionState transactionState : readyTransactionStates) {
             try {
                 // try to finish the transaction, if failed just retry in next 
loop
-                tryFinishOneTxn(transactionState, infoService, 
globalTransactionMgr, partitionVisibleVersions,
-                        backendPartitions);
+                tryFinishOneTxn(transactionState, infoService, 
globalTransactionMgr);
             } catch (Throwable t) {
                 LOG.error("errors while finish transaction: {}, publish tasks: 
{}", transactionState,
                         transactionState.getPublishVersionTasks(), t);
@@ -174,8 +184,7 @@ public class PublishVersionDaemon extends MasterDaemon {
     }
 
     private void tryFinishOneTxn(TransactionState transactionState, 
SystemInfoService infoService,
-            GlobalTransactionMgrIface globalTransactionMgr,
-            Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> 
backendPartitions) {
+            GlobalTransactionMgrIface globalTransactionMgr) {
         Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = 
Maps.newHashMap();
         AtomicBoolean hasBackendAliveAndUnfinishedTask = new 
AtomicBoolean(false);
         Set<Long> notFinishTaskBe = Sets.newHashSet();
@@ -219,37 +228,90 @@ public class PublishVersionDaemon extends MasterDaemon {
                 || isPublishSlow
                 || 
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
         if (shouldFinishTxn) {
-            try {
-                // one transaction exception should not affect other 
transaction
-                
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                        transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
-            } catch (Exception e) {
-                LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
+            if (Config.enable_parallel_publish_version) {
+                tryFinishTxnAsync(transactionState, globalTransactionMgr);
+            } else {
+                tryFinishTxnSync(transactionState, globalTransactionMgr);
             }
-            if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
-                // if finish transaction state failed, then update publish 
version time, should check
-                // to finish after some interval
-                transactionState.updateSendTaskTime();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("publish version for transaction {} failed", 
transactionState);
-                }
+        }
+    }
+
+    private void tryFinishTxnAsync(TransactionState transactionState, 
GlobalTransactionMgrIface globalTransactionMgr) {
+        if (publishingTxnIds.contains(transactionState.getTransactionId())) {
+            return;
+        }
+
+        publishingTxnIds.add(transactionState.getTransactionId());
+        LOG.info("try to finish transaction {}, dbId: {}, txnId: {}",
+                transactionState.getTransactionId(), 
transactionState.getDbId(), transactionState.getTransactionId());
+        try {
+            dbExecutors.get((int) (transactionState.getDbId() % 
Config.publish_thread_pool_num)).execute(() -> {
+                tryFinishTxnSync(transactionState, globalTransactionMgr);
+                publishingTxnIds.remove(transactionState.getTransactionId());
+            });
+        } catch (Throwable e) {
+            LOG.warn("failed to finish transaction {}, dbId: {}, txnId: {}, 
exception: {}",
+                    transactionState.getTransactionId(), 
transactionState.getDbId(),
+                    transactionState.getTransactionId(), e);
+            publishingTxnIds.remove(transactionState.getTransactionId());
+        }
+    }
+
+    private void tryFinishTxnSync(TransactionState transactionState, 
GlobalTransactionMgrIface globalTransactionMgr) {
+        try {
+            partitionVisibleVersions = Maps.newHashMap();
+            backendPartitions = Maps.newHashMap();
+            // one transaction exception should not affect other transaction
+            globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+                    transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
+            addBackendVisibleVersions(partitionVisibleVersions, 
backendPartitions);
+        } catch (Exception e) {
+            LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
+        }
+        if (transactionState.getTransactionStatus() != 
TransactionStatus.VISIBLE) {
+            // if finish transaction state failed, then update publish version 
time, should check
+            // to finish after some interval
+            transactionState.updateSendTaskTime();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("publish version for transaction {} failed", 
transactionState);
             }
         }
 
         if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
             transactionState.getPublishVersionTasks().values().forEach(tasks 
-> {
                 for (PublishVersionTask task : tasks) {
-                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
+                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION,
+                            task.getSignature());
                 }
             });
             transactionState.pruneAfterVisible();
             if (MetricRepo.isInit) {
-                long publishTime = 
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+                long publishTime = transactionState.getLastPublishVersionTime()
+                        - transactionState.getCommitTime();
                 MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
             }
         }
     }
 
+    private void addBackendVisibleVersions(Map<Long, Long> 
partitionVisibleVersions,
+            Map<Long, Set<Long>> backendPartitions) {
+        visibleVersionsLock.writeLock().lock();
+        try {
+            this.partitionVisibleVersions.putAll(partitionVisibleVersions);
+            // merge backend partitions if exists merge value as set else add 
a new set
+            for (Entry<Long, Set<Long>> entry : backendPartitions.entrySet()) {
+                this.backendPartitions.computeIfPresent(entry.getKey(),
+                        (backendId, existingPartitions) -> {
+                            existingPartitions.addAll(entry.getValue());
+                            return existingPartitions;
+                        });
+                this.backendPartitions.putIfAbsent(entry.getKey(), 
entry.getValue());
+            }
+        } finally {
+            visibleVersionsLock.writeLock().unlock();
+        }
+    }
+
     // Merge task tablets update rows to tableToTabletsDelta.
     private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>> 
tableIdToTabletDeltaRows, PublishVersionTask task) {
         if (CollectionUtils.isEmpty(task.getErrorTablets())) {
@@ -296,23 +358,30 @@ public class PublishVersionDaemon extends MasterDaemon {
                         Collectors.mapping(p -> p.second, 
Collectors.toSet())));
     }
 
-    private void sendBackendVisibleVersion(Map<Long, Long> 
partitionVisibleVersions,
-            Map<Long, Set<Long>> backendPartitions) {
-        if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) 
{
-            return;
-        }
+    private void sendBackendVisibleVersion() {
+        visibleVersionsLock.writeLock().lock();
+        try {
+            if (partitionVisibleVersions.isEmpty() || 
backendPartitions.isEmpty()) {
+                return;
+            }
 
-        long createTime = System.currentTimeMillis();
-        AgentBatchTask batchTask = new AgentBatchTask();
-        backendPartitions.forEach((backendId, partitionIds) -> {
-            Map<Long, Long> backendPartitionVersions = 
partitionVisibleVersions.entrySet().stream()
-                    .filter(entry -> partitionIds.contains(entry.getKey()))
-                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-            UpdateVisibleVersionTask task = new 
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
-                    createTime);
-            batchTask.addTask(task);
-        });
-        AgentTaskExecutor.submit(batchTask);
+            long createTime = System.currentTimeMillis();
+            AgentBatchTask batchTask = new AgentBatchTask();
+            backendPartitions.forEach((backendId, partitionIds) -> {
+                Map<Long, Long> backendPartitionVersions = 
partitionVisibleVersions.entrySet().stream()
+                        .filter(entry -> partitionIds.contains(entry.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                UpdateVisibleVersionTask task = new 
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
+                        createTime);
+                batchTask.addTask(task);
+            });
+            AgentTaskExecutor.submit(batchTask);
+
+            this.partitionVisibleVersions.clear();
+            this.backendPartitions.clear();
+        } finally {
+            visibleVersionsLock.writeLock().unlock();
+        }
     }
 
     private List<TPartitionVersionInfo> 
generatePartitionVersionInfos(Collection<TableCommitInfo> tableCommitInfos,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to