This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 463044a71fc branch-3.1: [fix](tablet report)Replace tablet report
with ForkJoinPool #57382 (#57927)
463044a71fc is described below
commit 463044a71fcda9a0f4847a50c41e4b2b21dcb865
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 12 14:07:48 2025 +0800
branch-3.1: [fix](tablet report)Replace tablet report with ForkJoinPool
#57382 (#57927)
Cherry-picked from #57382
Co-authored-by: deardeng <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 9 +
.../apache/doris/catalog/TabletInvertedIndex.java | 518 +++++++++++++++------
2 files changed, 375 insertions(+), 152 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a86c0b10e4f..da115c285c1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -552,6 +552,15 @@ public class Config extends ConfigBase {
"Whether to enable parallel publish version"})
public static boolean enable_parallel_publish_version = true;
+
+ @ConfField(masterOnly = true, description = {"Tablet report 线程池的数目",
+ "Num of thread to handle tablet report task"})
+ public static int tablet_report_thread_pool_num = 10;
+
+ @ConfField(masterOnly = true, description = {"Tablet report 线程池的队列大小",
+ "Queue size to store tablet report task in publish thread pool"})
+ public static int tablet_report_queue_size = 1024;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index a1faada3fea..b774f4615d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -56,7 +56,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
@@ -107,7 +111,14 @@ public class TabletInvertedIndex {
// Notice only none-cloud use it for be reporting tablets. This map is
empty in cloud mode.
private volatile ImmutableMap<Long, PartitionCollectInfo>
partitionCollectInfoMap = ImmutableMap.of();
- private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+ private final ExecutorService taskPool = new ThreadPoolExecutor(
+ Config.tablet_report_thread_pool_num,
+ 2 * Config.tablet_report_thread_pool_num,
+ // tablet report task default 60s once
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(Config.tablet_report_queue_size),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
public TabletInvertedIndex() {
}
@@ -146,178 +157,381 @@ public class TabletInvertedIndex {
long feTabletNum = 0;
long stamp = readLock();
long start = System.currentTimeMillis();
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to do tablet diff with backend[{}]. num: {}",
backendId, backendTablets.size());
}
+
Map<Long, Replica> replicaMetaWithBackend =
backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
feTabletNum = replicaMetaWithBackend.size();
- taskPool.submit(() -> {
- // traverse replicas in meta with this backend
-
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
- long tabletId = entry.getKey();
-
Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
- "tablet " + tabletId + " not exists, backend "
+ backendId);
- TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
-
- if (backendTablets.containsKey(tabletId)) {
- TTablet backendTablet =
backendTablets.get(tabletId);
- Replica replica = entry.getValue();
- tabletFoundInMeta.add(tabletId);
- TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
- TTabletMetaInfo tabletMetaInfo = null;
- if (backendTabletInfo.getReplicaId() !=
replica.getId()
- && replica.getState() !=
ReplicaState.CLONE) {
- // Need to update replica id in BE
- tabletMetaInfo = new TTabletMetaInfo();
- tabletMetaInfo.setReplicaId(replica.getId());
- }
- PartitionCollectInfo partitionCollectInfo =
-
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
- boolean isInMemory = partitionCollectInfo != null
&& partitionCollectInfo.isInMemory();
- if (isInMemory !=
backendTabletInfo.isIsInMemory()) {
- if (tabletMetaInfo == null) {
- tabletMetaInfo = new TTabletMetaInfo();
- tabletMetaInfo.setIsInMemory(isInMemory);
- }
- }
- if (Config.fix_tablet_partition_id_eq_0
- && tabletMeta.getPartitionId() > 0
- && backendTabletInfo.getPartitionId() ==
0) {
- LOG.warn("be report tablet partition id not eq
fe, in be {} but in fe {}",
- backendTabletInfo, tabletMeta);
- // Need to update partition id in BE
- tabletMetaInfo = new TTabletMetaInfo();
-
tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
- }
- // 1. (intersection)
- if (needSync(replica, backendTabletInfo)) {
- // need sync
- synchronized (tabletSyncMap) {
- tabletSyncMap.put(tabletMeta.getDbId(),
tabletId);
- }
- }
+ processTabletReportAsync(backendId, backendTablets,
backendPartitionsVersion, storageMediumMap,
+ tabletSyncMap, tabletDeleteFromMeta,
tabletFoundInMeta, tabletMigrationMap,
+ partitionVersionSyncMap, transactionsToPublish,
transactionsToClear, tabletRecoveryMap,
+ tabletToUpdate, cooldownTablets,
replicaMetaWithBackend);
+ }
+ } finally {
+ readUnlock(stamp);
+ }
- // check and set path
- // path info of replica is only saved in Master FE
- if (backendTabletInfo.isSetPathHash()
- && replica.getPathHash() !=
backendTabletInfo.getPathHash()) {
-
replica.setPathHash(backendTabletInfo.getPathHash());
- }
+ // Process cooldown configs outside of read lock to avoid deadlock
+ cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second,
cooldownConfToPush, cooldownConfToUpdate));
- if (backendTabletInfo.isSetSchemaHash() &&
replica.getState() == ReplicaState.NORMAL
- && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
- // update the schema hash only when replica is
normal
-
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
- }
+ logTabletReportSummary(backendId, feTabletNum, backendTablets,
backendPartitionsVersion,
+ tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta,
tabletMigrationMap,
+ partitionVersionSyncMap, transactionsToPublish,
transactionsToClear,
+ tabletToUpdate, tabletRecoveryMap, start);
+ }
- if (needRecover(replica,
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
- LOG.warn("replica {} of tablet {} on backend
{} need recovery. "
- + "replica in FE: {}, report
version {}, report schema hash: {},"
- + " is bad: {}, is version
missing: {}",
- replica.getId(), tabletId, backendId,
replica,
- backendTabletInfo.getVersion(),
- backendTabletInfo.getSchemaHash(),
- backendTabletInfo.isSetUsed() ?
!backendTabletInfo.isUsed() : "false",
- backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() :
- "unset");
- synchronized (tabletRecoveryMap) {
-
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
- }
- }
+ /**
+ * Process tablet report asynchronously using thread pool
+ */
+ private void processTabletReportAsync(long backendId, Map<Long, TTablet>
backendTablets,
+ Map<Long, Long>
backendPartitionsVersion,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long>
tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, Long>
partitionVersionSyncMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets,
+ Map<Long, Replica>
replicaMetaWithBackend) {
+ // Calculate optimal chunk size to balance task granularity and
concurrency
+ // For large tablet counts (40W-50W), we want smaller chunks to
maximize parallelism
+ // Target: create at least threadPoolSize * 4 tasks for better load
balancing
+ int totalTablets = replicaMetaWithBackend.size();
+ int threadPoolSize = Config.tablet_report_thread_pool_num;
+ int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads
for better load balancing
+ int chunkSize = Math.max(500, totalTablets / targetTasks);
+
+ // Cap chunk size to avoid too large tasks
+ // so thread pool queue will not be fulled with few large tasks
+ int maxChunkSize = 10000;
+ chunkSize = Math.min(chunkSize, maxChunkSize);
+ List<Map.Entry<Long, Replica>> entries = new
ArrayList<>(replicaMetaWithBackend.entrySet());
+ List<CompletableFuture<Void>> tabletFutures = new ArrayList<>();
+ int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing tablet report for backend[{}]: total
tablets={}, chunkSize={}, estimated tasks={}",
+ backendId, totalTablets, chunkSize, estimatedTasks);
+ }
- if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownTerm()) {
- // Place tablet info in a container and
process it outside of read lock to avoid
- // deadlock with OlapTable lock
- synchronized (cooldownTablets) {
- cooldownTablets.add(Pair.of(tabletMeta,
backendTabletInfo));
- }
-
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
-
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
- }
+ for (int i = 0; i < entries.size(); i += chunkSize) {
+ final int start = i;
+ final int end = Math.min(i + chunkSize, entries.size());
+
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ for (int j = start; j < end; j++) {
+ Map.Entry<Long, Replica> entry = entries.get(j);
+ processTabletEntry(backendId, backendTablets,
storageMediumMap, tabletSyncMap,
+ tabletDeleteFromMeta, tabletFoundInMeta,
tabletMigrationMap,
+ transactionsToPublish, transactionsToClear,
tabletRecoveryMap,
+ tabletToUpdate, cooldownTablets, entry);
+ }
+ }, taskPool);
- long partitionId = tabletMeta.getPartitionId();
- if (!Config.disable_storage_medium_check) {
- // check if need migration
- TStorageMedium storageMedium =
storageMediumMap.get(partitionId);
- if (storageMedium != null &&
backendTabletInfo.isSetStorageMedium()
- && isLocal(storageMedium) &&
isLocal(backendTabletInfo.getStorageMedium())
- &&
isLocal(tabletMeta.getStorageMedium())) {
- if (storageMedium !=
backendTabletInfo.getStorageMedium()) {
- synchronized (tabletMigrationMap) {
-
tabletMigrationMap.put(storageMedium, tabletId);
- }
- }
- if (storageMedium !=
tabletMeta.getStorageMedium()) {
-
tabletMeta.setStorageMedium(storageMedium);
- }
- }
- }
+ tabletFutures.add(future);
+ }
- // check if should clear transactions
- if (backendTabletInfo.isSetTransactionIds()) {
- handleBackendTransactions(backendId,
backendTabletInfo.getTransactionIds(), tabletId,
- tabletMeta, transactionsToPublish,
transactionsToClear);
- } // end for txn id
-
- // update replicase's version count
- // no need to write log, and no need to get db
lock.
- if (backendTabletInfo.isSetTotalVersionCount()) {
-
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
-
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
- ?
backendTabletInfo.getVisibleVersionCount()
- :
backendTabletInfo.getTotalVersionCount());
- }
- if (tabletMetaInfo != null) {
- tabletMetaInfo.setTabletId(tabletId);
- synchronized (tabletToUpdate) {
- tabletToUpdate.add(tabletMetaInfo);
- }
- }
- } else {
- // 2. (meta - be)
- // may need delete from meta
- if (LOG.isDebugEnabled()) {
- LOG.debug("backend[{}] does not report
tablet[{}-{}]", backendId, tabletId, tabletMeta);
- }
- synchronized (tabletDeleteFromMeta) {
- tabletDeleteFromMeta.put(tabletMeta.getDbId(),
tabletId);
- }
- }
- });
-
-
backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> {
- long partitionId = entry.getKey();
- long backendVersion = entry.getValue();
- PartitionCollectInfo partitionInfo =
partitionCollectInfoMap.get(partitionId);
- if (partitionInfo != null &&
partitionInfo.getVisibleVersion() > backendVersion) {
- partitionVersionSyncMap.put(partitionId,
partitionInfo.getVisibleVersion());
- }
- });
- }).join();
+ // Process partition versions in parallel
+ CompletableFuture<Void> partitionFuture =
CompletableFuture.runAsync(() -> {
+ processPartitionVersions(backendPartitionsVersion,
partitionVersionSyncMap);
+ }, taskPool);
+
+ // Wait for all tasks to complete
+ CompletableFuture.allOf(tabletFutures.toArray(new
CompletableFuture[0])).join();
+ partitionFuture.join();
+ }
+
+ /**
+ * Process a single tablet entry from backend report
+ */
+ private void processTabletEntry(long backendId, Map<Long, TTablet>
backendTablets,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long> tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets,
+ Map.Entry<Long, Replica> entry) {
+ long tabletId = entry.getKey();
+ Replica replica = entry.getValue();
+
+ Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
+ "tablet " + tabletId + " not exists, backend " + backendId);
+ TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+ if (backendTablets.containsKey(tabletId)) {
+ // Tablet exists in both FE and BE
+ TTablet backendTablet = backendTablets.get(tabletId);
+ TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
+
+ tabletFoundInMeta.add(tabletId);
+
+ processExistingTablet(backendId, tabletId, replica, tabletMeta,
backendTabletInfo,
+ storageMediumMap, tabletSyncMap, tabletMigrationMap,
transactionsToPublish,
+ transactionsToClear, tabletRecoveryMap, tabletToUpdate,
cooldownTablets);
+ } else {
+ // Tablet exists in FE but not in BE - may need deletion
+ processDeletedTablet(backendId, tabletId, tabletMeta,
tabletDeleteFromMeta);
+ }
+ }
+
+ /**
+ * Process tablet that exists in both FE and BE
+ */
+ private void processExistingTablet(long backendId, long tabletId, Replica
replica,
+ TabletMeta tabletMeta, TTabletInfo
backendTabletInfo,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long> tabletSyncMap,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets) {
+ // Check and prepare tablet meta info update
+ TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica,
tabletMeta, backendTabletInfo);
+
+ // Check if version sync is needed
+ if (needSync(replica, backendTabletInfo)) {
+ synchronized (tabletSyncMap) {
+ tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
}
- } finally {
- readUnlock(stamp);
}
- cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second,
cooldownConfToPush, cooldownConfToUpdate));
- long end = System.currentTimeMillis();
+ // Update replica path and schema hash
+ updateReplicaBasicInfo(replica, backendTabletInfo);
+
+ // Check if replica needs recovery
+ if (needRecover(replica, tabletMeta.getOldSchemaHash(),
backendTabletInfo)) {
+ logReplicaRecovery(replica, tabletId, backendId,
backendTabletInfo);
+ synchronized (tabletRecoveryMap) {
+ tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+ }
+ }
+
+ // Handle cooldown policy
+ if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownTerm()) {
+ synchronized (cooldownTablets) {
+ cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo));
+ }
+ replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
+ replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
+ }
+
+ // Check storage medium migration
+ checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo,
+ storageMediumMap, tabletMigrationMap);
+
+ // Handle transactions
+ if (backendTabletInfo.isSetTransactionIds()) {
+ handleBackendTransactions(backendId,
backendTabletInfo.getTransactionIds(), tabletId,
+ tabletMeta, transactionsToPublish, transactionsToClear);
+ }
+
+ // Update replica version count
+ updateReplicaVersionCount(replica, backendTabletInfo);
+
+ // Add tablet meta info to update list if needed
+ if (tabletMetaInfo != null) {
+ tabletMetaInfo.setTabletId(tabletId);
+ synchronized (tabletToUpdate) {
+ tabletToUpdate.add(tabletMetaInfo);
+ }
+ }
+ }
+
+ /**
+ * Prepare tablet meta info for BE update if needed
+ */
+ private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta
tabletMeta,
+ TTabletInfo
backendTabletInfo) {
+ TTabletMetaInfo tabletMetaInfo = null;
+
+ // Check replica id mismatch
+ if (backendTabletInfo.getReplicaId() != replica.getId()
+ && replica.getState() != ReplicaState.CLONE) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ tabletMetaInfo.setReplicaId(replica.getId());
+ }
+
+ // Check in-memory flag
+ PartitionCollectInfo partitionCollectInfo =
+
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
+ boolean isInMemory = partitionCollectInfo != null &&
partitionCollectInfo.isInMemory();
+ if (isInMemory != backendTabletInfo.isIsInMemory()) {
+ if (tabletMetaInfo == null) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ }
+ tabletMetaInfo.setIsInMemory(isInMemory);
+ }
+
+ // Check partition id mismatch
+ if (Config.fix_tablet_partition_id_eq_0
+ && tabletMeta.getPartitionId() > 0
+ && backendTabletInfo.getPartitionId() == 0) {
+ LOG.warn("be report tablet partition id not eq fe, in be {} but in
fe {}",
+ backendTabletInfo, tabletMeta);
+ if (tabletMetaInfo == null) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ }
+ tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
+ }
+
+ return tabletMetaInfo;
+ }
+
+ /**
+ * Update replica's basic info like path hash and schema hash
+ */
+ private void updateReplicaBasicInfo(Replica replica, TTabletInfo
backendTabletInfo) {
+ // Update path hash
+ if (backendTabletInfo.isSetPathHash()
+ && replica.getPathHash() != backendTabletInfo.getPathHash()) {
+ replica.setPathHash(backendTabletInfo.getPathHash());
+ }
+
+ // Update schema hash
+ if (backendTabletInfo.isSetSchemaHash()
+ && replica.getState() == ReplicaState.NORMAL
+ && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
+ replica.setSchemaHash(backendTabletInfo.getSchemaHash());
+ }
+ }
+
+ /**
+ * Log replica recovery information
+ */
+ private void logReplicaRecovery(Replica replica, long tabletId, long
backendId,
+ TTabletInfo backendTabletInfo) {
+ LOG.warn("replica {} of tablet {} on backend {} need recovery. "
+ + "replica in FE: {}, report version {}, report schema
hash: {}, "
+ + "is bad: {}, is version missing: {}",
+ replica.getId(), tabletId, backendId, replica,
+ backendTabletInfo.getVersion(),
+ backendTabletInfo.getSchemaHash(),
+ backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() :
"false",
+ backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() : "unset");
+ }
+
+ /**
+ * Check if storage medium migration is needed
+ */
+ private void checkStorageMediumMigration(long tabletId, TabletMeta
tabletMeta,
+ TTabletInfo backendTabletInfo,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<TStorageMedium,
Long> tabletMigrationMap) {
+ if (Config.disable_storage_medium_check) {
+ return;
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ TStorageMedium storageMedium = storageMediumMap.get(partitionId);
+
+ if (storageMedium != null && backendTabletInfo.isSetStorageMedium()
+ && isLocal(storageMedium)
+ && isLocal(backendTabletInfo.getStorageMedium())
+ && isLocal(tabletMeta.getStorageMedium())) {
+
+ if (storageMedium != backendTabletInfo.getStorageMedium()) {
+ synchronized (tabletMigrationMap) {
+ tabletMigrationMap.put(storageMedium, tabletId);
+ }
+ }
+
+ if (storageMedium != tabletMeta.getStorageMedium()) {
+ tabletMeta.setStorageMedium(storageMedium);
+ }
+ }
+ }
+
+ /**
+ * Update replica's version count
+ */
+ private void updateReplicaVersionCount(Replica replica, TTabletInfo
backendTabletInfo) {
+ if (backendTabletInfo.isSetTotalVersionCount()) {
+
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
+
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
+ ? backendTabletInfo.getVisibleVersionCount()
+ : backendTabletInfo.getTotalVersionCount());
+ }
+ }
+
+ /**
+ * Process tablet that exists in FE but not reported by BE
+ */
+ private void processDeletedTablet(long backendId, long tabletId,
TabletMeta tabletMeta,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId,
tabletId, tabletMeta);
+ }
+ synchronized (tabletDeleteFromMeta) {
+ tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
+ }
+ }
+
+ /**
+ * Process partition versions reported by BE
+ */
+ private void processPartitionVersions(Map<Long, Long>
backendPartitionsVersion,
+ Map<Long, Long>
partitionVersionSyncMap) {
+ for (Map.Entry<Long, Long> entry :
backendPartitionsVersion.entrySet()) {
+ long partitionId = entry.getKey();
+ long backendVersion = entry.getValue();
+ PartitionCollectInfo partitionInfo =
partitionCollectInfoMap.get(partitionId);
+
+ if (partitionInfo != null && partitionInfo.getVisibleVersion() >
backendVersion) {
+ partitionVersionSyncMap.put(partitionId,
partitionInfo.getVisibleVersion());
+ }
+ }
+ }
+
+ /**
+ * Log tablet report summary
+ */
+ private void logTabletReportSummary(long backendId, long feTabletNum,
+ Map<Long, TTablet> backendTablets,
+ Map<Long, Long>
backendPartitionsVersion,
+ ListMultimap<Long, Long>
tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, Long>
partitionVersionSyncMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ List<TTabletMetaInfo> tabletToUpdate,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ long startTime) {
+ long endTime = System.currentTimeMillis();
long toClearTransactionsNum = transactionsToClear.keySet().size();
long toClearTransactionsPartitions =
transactionsToClear.values().size();
long toPublishTransactionsNum = transactionsToPublish.values().stream()
- .mapToLong(m ->
m.keySet().size()).sum();
+ .mapToLong(m -> m.keySet().size()).sum();
long toPublishTransactionsPartitions =
transactionsToPublish.values().stream()
- .mapToLong(m ->
m.values().size()).sum();
- LOG.info("finished to do tablet diff with backend[{}]. fe tablet num:
{}, backend tablet num: {}. sync: {}."
- + " metaDel: {}. foundInMeta: {}. migration: {}.
backend partition num: {}, backend need "
- + "update: {}. found invalid transactions
{}(partitions: {}). found republish "
- + "transactions {}(partitions: {}). tabletToUpdate:
{}. need recovery: {}. cost: {} ms",
+ .mapToLong(m -> m.values().size()).sum();
+
+ LOG.info("finished to do tablet diff with backend[{}]. fe tablet num:
{}, backend tablet num: {}. "
+ + "sync: {}, metaDel: {}, foundInMeta: {}, migration:
{}, "
+ + "backend partition num: {}, backend need update: {},
"
+ + "found invalid transactions {}(partitions: {}), "
+ + "found republish transactions {}(partitions: {}), "
+ + "tabletToUpdate: {}, need recovery: {}, cost: {} ms",
backendId, feTabletNum, backendTablets.size(),
tabletSyncMap.size(),
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(),
tabletMigrationMap.size(),
- backendPartitionsVersion.size(),
partitionVersionSyncMap.size(), toClearTransactionsNum,
- toClearTransactionsPartitions, toPublishTransactionsNum,
toPublishTransactionsPartitions,
- tabletToUpdate.size(), tabletRecoveryMap.size(), (end -
start));
+ backendPartitionsVersion.size(),
partitionVersionSyncMap.size(),
+ toClearTransactionsNum, toClearTransactionsPartitions,
+ toPublishTransactionsNum, toPublishTransactionsPartitions,
+ tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime -
startTime));
}
private void handleBackendTransactions(long backendId, List<Long>
transactionIds, long tabletId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]