This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 39af044ac3f [opt](editlog) batch flush editlog (#52971)
39af044ac3f is described below
commit 39af044ac3ff6f47d3d578ddfb2c3563dced28e2
Author: Yongqiang YANG <[email protected]>
AuthorDate: Tue Jul 15 16:23:58 2025 +0800
[opt](editlog) batch flush editlog (#52971)
### What problem does this PR solve?
1. group editlog from multi threads.
2. multi threading publish editlog to group
Test using below scripts.
`for i in `seq 1 1000`; do
echo "Starting Sysbench for db$i"
nohup sysbench --db-driver=mysql --mysql-host=127.0.0.1
--mysql-port=9030 --mysql-user=root --mysql-db=paralldb${i} --tables=1
--threads=1 --time=180 --report-interval=10 oltp_insert run >result_${i}
2>&1 &
done
wait`
Insert completed in 180s.
Without batch:
result_99: total number of events: 14
result_99: events (avg/stddev): 14.0000/0.00
result_990: total number of events: 14
result_990: events (avg/stddev): 14.0000/0.00
result_991: total number of events: 14
result_991: events (avg/stddev): 14.0000/0.00
result_992: total number of events: 14
result_992: events (avg/stddev): 14.0000/0.00
result_993: total number of events: 14
result_993: events (avg/stddev): 14.0000/0.00
result_994: total number of events: 14
result_994: events (avg/stddev): 14.0000/0.00
result_995: total number of events: 14
result_995: events (avg/stddev): 14.0000/0.00
result_996: total number of events: 14
result_996: events (avg/stddev): 14.0000/0.00
result_997: total number of events: 14
result_997: events (avg/stddev): 14.0000/0.00
result_998: total number of events: 14
result_998: events (avg/stddev): 14.0000/0.00
result_999: total number of events: 14
result_999: events (avg/stddev): 14.0000/0.00
Enable batch:
result_99: events (avg/stddev): 263.0000/0.00
result_990: total number of events: 276
result_990: events (avg/stddev): 276.0000/0.00
result_991: total number of events: 275
result_991: events (avg/stddev): 275.0000/0.00
result_992: total number of events: 279
result_992: events (avg/stddev): 279.0000/0.00
result_993: total number of events: 274
result_993: events (avg/stddev): 274.0000/0.00
result_994: total number of events: 269
result_994: events (avg/stddev): 269.0000/0.00
result_995: total number of events: 272
result_995: events (avg/stddev): 272.0000/0.00
result_996: total number of events: 272
result_996: events (avg/stddev): 272.0000/0.00
result_997: total number of events: 271
result_997: events (avg/stddev): 271.0000/0.00
result_998: total number of events: 255
result_998: events (avg/stddev): 255.0000/0.00
result_999: total number of events: 258
result_999: events (avg/stddev): 258.0000/0.00
doris_fe_journal_write_batch_data_size{quantile="0.75"} 39172.0
doris_fe_journal_write_batch_data_size{quantile="0.95"} 67803.0
doris_fe_journal_write_batch_data_size{quantile="0.98"} 72049.0
doris_fe_journal_write_batch_data_size{quantile="0.99"} 72707.0
doris_fe_journal_write_batch_data_size{quantile="0.999"} 79449.0
doris_fe_journal_write_batch_data_size_sum {} 1.3869330432422793E9
doris_fe_journal_write_batch_data_size_count {} 53715
doris_fe_journal_write_batch_size{quantile="0.75"} 73.0
doris_fe_journal_write_batch_size{quantile="0.95"} 100.0
doris_fe_journal_write_batch_size{quantile="0.98"} 100.0
doris_fe_journal_write_batch_size{quantile="0.99"} 100.0
doris_fe_journal_write_batch_size{quantile="0.999"} 100.0
doris_fe_journal_write_batch_size_sum {} 2379961.017209651
doris_fe_journal_write_batch_size_count {} 53715
doris_fe_journal_write_latency_ms{quantile="0.75"} 6.0
doris_fe_journal_write_latency_ms{quantile="0.95"} 10.0
doris_fe_journal_write_latency_ms{quantile="0.98"} 21.0
doris_fe_journal_write_latency_ms{quantile="0.99"} 54.0
doris_fe_journal_write_latency_ms{quantile="0.999"} 175.0
doris_fe_journal_write_latency_ms_sum {} 329275.8826554282
doris_fe_journal_write_latency_ms_count {} 53715
---------
Co-authored-by: Yongqiang YANG <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 16 ++
.../org/apache/doris/journal/JournalBatch.java | 5 +
.../apache/doris/journal/bdbje/BDBJEJournal.java | 10 +-
.../java/org/apache/doris/metric/MetricRepo.java | 3 +
.../java/org/apache/doris/persist/EditLog.java | 187 ++++++++++++++++++---
.../doris/transaction/PublishVersionDaemon.java | 159 +++++++++++++-----
6 files changed, 308 insertions(+), 72 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0eb9d46a729..71ab35692a6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -251,6 +251,10 @@ public class Config extends ConfigBase {
+ "Indicates the writting count before a rest"})
public static long batch_edit_log_continuous_count_for_rest = 1000;
+ @ConfField(description = {
+ "攒批写 EditLog。", "Batch EditLog writing"})
+ public static boolean enable_batch_editlog = false;
+
@ConfField(description = {"元数据同步的容忍延迟时间,单位为秒。如果元数据的延迟超过这个值,非主 FE 会停止提供服务",
"The toleration delay time of meta data synchronization, in
seconds. "
+ "If the delay of meta data exceeds this value,
non-master FE will stop offering service"})
@@ -546,6 +550,18 @@ public class Config extends ConfigBase {
"the upper limit of failure logs of PUBLISH_VERSION task"})
public static long publish_version_task_failed_log_threshold = 80;
+ @ConfField(masterOnly = true, description = {"Publish 线程池的数目",
+ "Num of thread to handle publish task"})
+ public static int publish_thread_pool_num = 128;
+
+ @ConfField(masterOnly = true, description = {"Publish 线程池的队列大小",
+ "Queue size to store publish task in publish thread pool"})
+ public static int publish_queue_size = 128;
+
+ @ConfField(mutable = true, description = {"是否启用并行发布版本",
+ "Whether to enable parallel publish version"})
+ public static boolean enable_parallel_publish_version = false;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
index e56bf34dfe5..1236a08d0db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -17,6 +17,7 @@
package org.apache.doris.journal;
+import org.apache.doris.common.Config;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.OperationType;
@@ -70,6 +71,10 @@ public class JournalBatch {
return entities;
}
+ public boolean shouldFlush() {
+ return size >= Config.batch_edit_log_max_byte_size || entities.size()
>= Config.batch_edit_log_max_item_num;
+ }
+
public static class Entity {
short op;
DataOutputBuffer data;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index f03fac85ddb..281826505e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -146,10 +146,9 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
DatabaseEntry theData = new
DatabaseEntry(entity.getBinaryData());
currentJournalDB.put(txn, theKey, theData); // Put with
overwrite, it always success
dataSize += theData.getSize();
- if (i == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("opCode = {}, journal size = {}",
entity.getOpCode(), theData.getSize());
- }
+ if (i == 0 && LOG.isDebugEnabled()) {
+ LOG.debug("opCode = {}, journal size = {}, batchNum =
{}", entity.getOpCode(),
+ theData.getSize(), entitySize);
}
}
@@ -212,6 +211,9 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
LOG.warn("write bdb is too slow, cost {}ms, the first
journal id, batch size {}, data size{}",
watch.getTime(), firstId, entitySize, dataSize);
}
+ if (MetricRepo.isInit) {
+
MetricRepo.HISTO_JOURNAL_WRITE_LATENCY.update(watch.getTime());
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index ce69d6f993c..ee39f8e90f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -115,6 +115,7 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_LARGE_EDIT_LOG;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
+ public static Histogram HISTO_JOURNAL_WRITE_LATENCY;
public static Histogram HISTO_JOURNAL_BATCH_SIZE;
public static Histogram HISTO_JOURNAL_BATCH_DATA_SIZE;
public static Histogram HISTO_HTTP_COPY_INTO_UPLOAD_LATENCY;
@@ -455,6 +456,8 @@ public final class MetricRepo {
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("editlog", "write", "latency", "ms"));
+ HISTO_JOURNAL_WRITE_LATENCY = METRIC_REGISTER.histogram(
+ MetricRegistry.name("journal", "write", "latency", "ms"));
HISTO_JOURNAL_BATCH_SIZE = METRIC_REGISTER.histogram(
MetricRegistry.name("journal", "write", "batch_size"));
HISTO_JOURNAL_BATCH_DATA_SIZE = METRIC_REGISTER.histogram(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e91b13e32aa..cba5cfd0296 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -109,8 +109,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* EditLog maintains a log of the memory modifications.
@@ -119,13 +124,30 @@ import java.util.Map;
public class EditLog {
public static final Logger LOG = LogManager.getLogger(EditLog.class);
+ // Helper class to hold log edit requests
+ private static class EditLogItem {
+ final short op;
+ final Writable writable;
+ final Object lock = new Object();
+ boolean finished = false;
+ long logId = -1;
+
+ EditLogItem(short op, Writable writable) {
+ this.op = op;
+ this.writable = writable;
+ }
+ }
+
+ private final BlockingQueue<EditLogItem> logEditQueue = new
LinkedBlockingQueue<>();
+ private final Thread flushThread;
+
private EditLogOutputStream editStream = null;
private long txId = 0;
- private long numTransactions;
- private long totalTimeTransactions;
+ private AtomicLong numTransactions = new AtomicLong(0);
+ private AtomicLong totalTimeTransactions = new AtomicLong(0);
private Journal journal;
/**
@@ -140,6 +162,84 @@ public class EditLog {
} else {
throw new IllegalArgumentException("Unknown edit log type: " +
journalType);
}
+
+ // Flush thread initialization block
+ flushThread = new Thread(() -> {
+ while (true) {
+ flushEditLog();
+ }
+ }, "EditLog-Flusher");
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+
+ private void flushEditLog() {
+ List<EditLogItem> batch = new ArrayList<>();
+ try {
+ batch.clear();
+ EditLogItem first = logEditQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (first == null) {
+ return;
+ }
+ batch.add(first);
+ logEditQueue.drainTo(batch, Config.batch_edit_log_max_item_num -
1);
+
+ int itemNum = Math.max(1,
Math.min(Config.batch_edit_log_max_item_num, batch.size()));
+ JournalBatch journalBatch = new JournalBatch(itemNum);
+
+ // Array to record pairs of logId and num
+ List<long[]> logIdNumPairs = new ArrayList<>();
+ for (EditLogItem req : batch) {
+ journalBatch.addJournal(req.op, req.writable);
+ if (journalBatch.shouldFlush()) {
+ long logId = journal.write(journalBatch);
+ logIdNumPairs.add(new long[]{logId,
journalBatch.getJournalEntities().size()});
+ journalBatch = new JournalBatch(itemNum);
+ }
+ }
+ // Write any remaining entries in the batch
+ if (!journalBatch.getJournalEntities().isEmpty()) {
+ long logId = journal.write(journalBatch);
+ logIdNumPairs.add(new long[]{logId,
journalBatch.getJournalEntities().size()});
+ }
+
+ // Notify all producers
+ // For batch with index, assign logId to each request according to
the batch flushes
+ int reqIndex = 0;
+ for (long[] pair : logIdNumPairs) {
+ long logId = pair[0];
+ int num = (int) pair[1];
+ for (int i = 0; i < num && reqIndex < batch.size(); i++,
reqIndex++) {
+ EditLogItem req = batch.get(reqIndex);
+ req.logId = logId + i;
+ synchronized (req.lock) {
+ req.finished = true;
+ req.lock.notifyAll();
+ }
+ }
+ }
+
+ } catch (Throwable t) {
+ // Throwable contains all Exception and Error, such as IOException
and
+ // OutOfMemoryError
+ if (journal instanceof BDBJEJournal) {
+ LOG.error("BDBJE stats : {}", ((BDBJEJournal)
journal).getBDBStats());
+ }
+ LOG.error("Fatal Error : write stream Exception", t);
+ System.exit(-1);
+ }
+
+ txId += batch.size();
+ // update statistics, etc. (optional, can be added as needed)
+ if (txId >= Config.edit_log_roll_num) {
+ LOG.info("txId {} is equal to or larger than edit_log_roll_num {},
will roll edit.", txId,
+ Config.edit_log_roll_num);
+ rollEditLog();
+ txId = 0;
+ }
+ if (MetricRepo.isInit) {
+
MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(Long.valueOf(batch.size()));
+ }
}
public long getMaxJournalId() {
@@ -1354,8 +1454,7 @@ public class EditLog {
JournalBatch batch = new JournalBatch(itemNum);
long batchCount = 0;
for (T entry : entries) {
- if (batch.getJournalEntities().size() >=
Config.batch_edit_log_max_item_num
- || batch.getSize() >= Config.batch_edit_log_max_byte_size)
{
+ if (batch.shouldFlush()) {
journal.write(batch);
batch = new JournalBatch(itemNum);
@@ -1376,18 +1475,43 @@ public class EditLog {
if (!batch.getJournalEntities().isEmpty()) {
journal.write(batch);
}
+ txId += entries.size();
}
/**
- * Write an operation to the edit log. Do not sync to persistent store yet.
+ * Asynchronously log an edit by putting it into a blocking queue and
waiting for completion.
+ * This method blocks until the log is written and returns the logId.
*/
- private synchronized long logEdit(short op, Writable writable) {
- if (this.getNumEditStreams() == 0) {
- LOG.error("Fatal Error : no editLog stream", new Exception());
- throw new Error("Fatal Error : no editLog stream");
+ public long logEditWithQueue(short op, Writable writable) {
+ EditLogItem req = new EditLogItem(op, writable);
+ while (true) {
+ try {
+ logEditQueue.put(req);
+ break;
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during put, will sleep and retry.");
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ LOG.warn(" interrupted during sleep, will retry.", ex);
+ }
+ }
+ }
+ synchronized (req.lock) {
+ while (!req.finished) {
+ try {
+ req.lock.wait();
+ } catch (InterruptedException e) {
+ LOG.error("Fatal Error : write stream Exception");
+ System.exit(-1);
+ }
+ }
}
- long start = System.currentTimeMillis();
+ return req.logId;
+ }
+
+ private synchronized long logEditDirectly(short op, Writable writable) {
long logId = -1;
try {
logId = journal.write(op, writable);
@@ -1404,13 +1528,41 @@ public class EditLog {
// get a new transactionId
txId++;
+ if (txId >= Config.edit_log_roll_num) {
+ LOG.info("txId {} is equal to or larger than edit_log_roll_num {},
will roll edit.", txId,
+ Config.edit_log_roll_num);
+ rollEditLog();
+ txId = 0;
+ }
+
+ return logId;
+ }
+
+ /**
+ * Write an operation to the edit log. Do not sync to persistent store yet.
+ */
+ private long logEdit(short op, Writable writable) {
+ if (this.getNumEditStreams() == 0) {
+ LOG.error("Fatal Error : no editLog stream", new Exception());
+ throw new Error("Fatal Error : no editLog stream");
+ }
+
+ long start = System.currentTimeMillis();
+ long logId = -1;
+ if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
+ logId = logEditWithQueue(op, writable);
+ } else {
+ logId = logEditDirectly(op, writable);
+ }
+
// update statistics
long end = System.currentTimeMillis();
- numTransactions++;
- totalTimeTransactions += (end - start);
+ numTransactions.incrementAndGet();
+ totalTimeTransactions.addAndGet(end - start);
if (MetricRepo.isInit) {
MetricRepo.HISTO_EDIT_LOG_WRITE_LATENCY.update((end - start));
MetricRepo.COUNTER_EDIT_LOG_CURRENT.increase(1L);
+ MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(1L);
}
if (LOG.isDebugEnabled()) {
@@ -1418,17 +1570,6 @@ public class EditLog {
txId, numTransactions, totalTimeTransactions, op, end -
start);
}
- if (txId >= Config.edit_log_roll_num) {
- LOG.info("txId {} is equal to or larger than edit_log_roll_num {},
will roll edit.", txId,
- Config.edit_log_roll_num);
- rollEditLog();
- txId = 0;
- }
-
- if (MetricRepo.isInit) {
- MetricRepo.COUNTER_EDIT_LOG_WRITE.increase(1L);
- }
-
return logId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 11219e7267f..46241169101 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
@@ -43,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -50,6 +53,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -57,24 +61,33 @@ public class PublishVersionDaemon extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(PublishVersionDaemon.class);
+ private static ArrayList<ExecutorService> dbExecutors = new
ArrayList(Config.publish_thread_pool_num);
+
+ private Set<Long> publishingTxnIds = Sets.newConcurrentHashSet();
+
+ private final MonitoredReentrantReadWriteLock visibleVersionsLock = new
MonitoredReentrantReadWriteLock(true);
+ private Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+ private Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+
public PublishVersionDaemon() {
super("PUBLISH_VERSION", Config.publish_version_interval_ms);
+ for (int i = 0; i < Config.publish_thread_pool_num; i++) {
+ dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1,
Config.publish_queue_size,
+ "PUBLISH_VERSION_EXEC-" + i, true));
+ }
}
@Override
protected void runAfterCatalogReady() {
- Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
- Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
-
try {
- publishVersion(partitionVisibleVersions, backendPartitions);
- sendBackendVisibleVersion(partitionVisibleVersions,
backendPartitions);
+ publishVersion();
+ sendBackendVisibleVersion();
} catch (Throwable t) {
LOG.error("errors while publish version to all backends", t);
}
}
- private void publishVersion(Map<Long, Long> partitionVisibleVersions,
Map<Long, Set<Long>> backendPartitions) {
+ private void publishVersion() {
if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
return;
}
@@ -93,8 +106,7 @@ public class PublishVersionDaemon extends MasterDaemon {
return;
}
traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates,
allBackends);
- tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr,
- partitionVisibleVersions, backendPartitions);
+ tryFinishTxn(readyTransactionStates, infoService,
globalTransactionMgr);
}
private void
traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState>
readyTransactionStates,
@@ -159,13 +171,11 @@ public class PublishVersionDaemon extends MasterDaemon {
}
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
- SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr,
- Map<Long, Long> partitionVisibleVersions,
Map<Long, Set<Long>> backendPartitions) {
+ SystemInfoService infoService, GlobalTransactionMgrIface
globalTransactionMgr) {
for (TransactionState transactionState : readyTransactionStates) {
try {
// try to finish the transaction, if failed just retry in next
loop
- tryFinishOneTxn(transactionState, infoService,
globalTransactionMgr, partitionVisibleVersions,
- backendPartitions);
+ tryFinishOneTxn(transactionState, infoService,
globalTransactionMgr);
} catch (Throwable t) {
LOG.error("errors while finish transaction: {}, publish tasks:
{}", transactionState,
transactionState.getPublishVersionTasks(), t);
@@ -174,8 +184,7 @@ public class PublishVersionDaemon extends MasterDaemon {
}
private void tryFinishOneTxn(TransactionState transactionState,
SystemInfoService infoService,
- GlobalTransactionMgrIface globalTransactionMgr,
- Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>>
backendPartitions) {
+ GlobalTransactionMgrIface globalTransactionMgr) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows =
Maps.newHashMap();
AtomicBoolean hasBackendAliveAndUnfinishedTask = new
AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
@@ -219,37 +228,90 @@ public class PublishVersionDaemon extends MasterDaemon {
|| isPublishSlow
||
DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
- try {
- // one transaction exception should not affect other
transaction
-
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
- transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
- } catch (Exception e) {
- LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
+ if (Config.enable_parallel_publish_version) {
+ tryFinishTxnAsync(transactionState, globalTransactionMgr);
+ } else {
+ tryFinishTxnSync(transactionState, globalTransactionMgr);
}
- if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
- // if finish transaction state failed, then update publish
version time, should check
- // to finish after some interval
- transactionState.updateSendTaskTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("publish version for transaction {} failed",
transactionState);
- }
+ }
+ }
+
+ private void tryFinishTxnAsync(TransactionState transactionState,
GlobalTransactionMgrIface globalTransactionMgr) {
+ if (publishingTxnIds.contains(transactionState.getTransactionId())) {
+ return;
+ }
+
+ publishingTxnIds.add(transactionState.getTransactionId());
+ LOG.info("try to finish transaction {}, dbId: {}, txnId: {}",
+ transactionState.getTransactionId(),
transactionState.getDbId(), transactionState.getTransactionId());
+ try {
+ dbExecutors.get((int) (transactionState.getDbId() %
Config.publish_thread_pool_num)).execute(() -> {
+ tryFinishTxnSync(transactionState, globalTransactionMgr);
+ publishingTxnIds.remove(transactionState.getTransactionId());
+ });
+ } catch (Throwable e) {
+ LOG.warn("failed to finish transaction {}, dbId: {}, txnId: {},
exception: {}",
+ transactionState.getTransactionId(),
transactionState.getDbId(),
+ transactionState.getTransactionId(), e);
+ publishingTxnIds.remove(transactionState.getTransactionId());
+ }
+ }
+
+ private void tryFinishTxnSync(TransactionState transactionState,
GlobalTransactionMgrIface globalTransactionMgr) {
+ try {
+ partitionVisibleVersions = Maps.newHashMap();
+ backendPartitions = Maps.newHashMap();
+ // one transaction exception should not affect other transaction
+ globalTransactionMgr.finishTransaction(transactionState.getDbId(),
+ transactionState.getTransactionId(),
partitionVisibleVersions, backendPartitions);
+ addBackendVisibleVersions(partitionVisibleVersions,
backendPartitions);
+ } catch (Exception e) {
+ LOG.warn("error happens when finish transaction {}",
transactionState.getTransactionId(), e);
+ }
+ if (transactionState.getTransactionStatus() !=
TransactionStatus.VISIBLE) {
+ // if finish transaction state failed, then update publish version
time, should check
+ // to finish after some interval
+ transactionState.updateSendTaskTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("publish version for transaction {} failed",
transactionState);
}
}
if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks
-> {
for (PublishVersionTask task : tasks) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION,
+ task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
- long publishTime =
transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
+ long publishTime = transactionState.getLastPublishVersionTime()
+ - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
}
}
+ private void addBackendVisibleVersions(Map<Long, Long>
partitionVisibleVersions,
+ Map<Long, Set<Long>> backendPartitions) {
+ visibleVersionsLock.writeLock().lock();
+ try {
+ this.partitionVisibleVersions.putAll(partitionVisibleVersions);
+ // merge backend partitions if exists merge value as set else add
a new set
+ for (Entry<Long, Set<Long>> entry : backendPartitions.entrySet()) {
+ this.backendPartitions.computeIfPresent(entry.getKey(),
+ (backendId, existingPartitions) -> {
+ existingPartitions.addAll(entry.getValue());
+ return existingPartitions;
+ });
+ this.backendPartitions.putIfAbsent(entry.getKey(),
entry.getValue());
+ }
+ } finally {
+ visibleVersionsLock.writeLock().unlock();
+ }
+ }
+
// Merge task tablets update rows to tableToTabletsDelta.
private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>>
tableIdToTabletDeltaRows, PublishVersionTask task) {
if (CollectionUtils.isEmpty(task.getErrorTablets())) {
@@ -296,23 +358,30 @@ public class PublishVersionDaemon extends MasterDaemon {
Collectors.mapping(p -> p.second,
Collectors.toSet())));
}
- private void sendBackendVisibleVersion(Map<Long, Long>
partitionVisibleVersions,
- Map<Long, Set<Long>> backendPartitions) {
- if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty())
{
- return;
- }
+ private void sendBackendVisibleVersion() {
+ visibleVersionsLock.writeLock().lock();
+ try {
+ if (partitionVisibleVersions.isEmpty() ||
backendPartitions.isEmpty()) {
+ return;
+ }
- long createTime = System.currentTimeMillis();
- AgentBatchTask batchTask = new AgentBatchTask();
- backendPartitions.forEach((backendId, partitionIds) -> {
- Map<Long, Long> backendPartitionVersions =
partitionVisibleVersions.entrySet().stream()
- .filter(entry -> partitionIds.contains(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- UpdateVisibleVersionTask task = new
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
- createTime);
- batchTask.addTask(task);
- });
- AgentTaskExecutor.submit(batchTask);
+ long createTime = System.currentTimeMillis();
+ AgentBatchTask batchTask = new AgentBatchTask();
+ backendPartitions.forEach((backendId, partitionIds) -> {
+ Map<Long, Long> backendPartitionVersions =
partitionVisibleVersions.entrySet().stream()
+ .filter(entry -> partitionIds.contains(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ UpdateVisibleVersionTask task = new
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
+ createTime);
+ batchTask.addTask(task);
+ });
+ AgentTaskExecutor.submit(batchTask);
+
+ this.partitionVisibleVersions.clear();
+ this.backendPartitions.clear();
+ } finally {
+ visibleVersionsLock.writeLock().unlock();
+ }
}
private List<TPartitionVersionInfo>
generatePartitionVersionInfos(Collection<TableCommitInfo> tableCommitInfos,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]