This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 463044a71fc branch-3.1:  [fix](tablet report)Replace tablet report 
with ForkJoinPool #57382 (#57927)
463044a71fc is described below

commit 463044a71fcda9a0f4847a50c41e4b2b21dcb865
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 12 14:07:48 2025 +0800

    branch-3.1:  [fix](tablet report)Replace tablet report with ForkJoinPool 
#57382 (#57927)
    
    Cherry-picked from #57382
    
    Co-authored-by: deardeng <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   9 +
 .../apache/doris/catalog/TabletInvertedIndex.java  | 518 +++++++++++++++------
 2 files changed, 375 insertions(+), 152 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a86c0b10e4f..da115c285c1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -552,6 +552,15 @@ public class Config extends ConfigBase {
             "Whether to enable parallel publish version"})
     public static boolean enable_parallel_publish_version = true;
 
+
+    @ConfField(masterOnly = true, description = {"Tablet report 线程池的数目",
+        "Num of thread to handle tablet report task"})
+    public static int tablet_report_thread_pool_num = 10;
+
+    @ConfField(masterOnly = true, description = {"Tablet report 线程池的队列大小",
+        "Queue size to store tablet report task in publish thread pool"})
+    public static int tablet_report_queue_size = 1024;
+
     @ConfField(mutable = true, masterOnly = true, description = 
{"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction 
to be committed, in seconds. "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index a1faada3fea..b774f4615d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -56,7 +56,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.StampedLock;
 import java.util.stream.Collectors;
 
@@ -107,7 +111,14 @@ public class TabletInvertedIndex {
     // Notice only none-cloud use it for be reporting tablets. This map is 
empty in cloud mode.
     private volatile ImmutableMap<Long, PartitionCollectInfo> 
partitionCollectInfoMap = ImmutableMap.of();
 
-    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+    private final ExecutorService taskPool = new ThreadPoolExecutor(
+            Config.tablet_report_thread_pool_num,
+            2 * Config.tablet_report_thread_pool_num,
+            // tablet report task default 60s once
+            120L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(Config.tablet_report_queue_size),
+            new ThreadPoolExecutor.DiscardOldestPolicy());
 
     public TabletInvertedIndex() {
     }
@@ -146,178 +157,381 @@ public class TabletInvertedIndex {
         long feTabletNum = 0;
         long stamp = readLock();
         long start = System.currentTimeMillis();
+
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("begin to do tablet diff with backend[{}]. num: {}", 
backendId, backendTablets.size());
             }
+
             Map<Long, Replica> replicaMetaWithBackend = 
backingReplicaMetaTable.row(backendId);
             if (replicaMetaWithBackend != null) {
                 feTabletNum = replicaMetaWithBackend.size();
-                taskPool.submit(() -> {
-                    // traverse replicas in meta with this backend
-                    
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
-                        long tabletId = entry.getKey();
-                        
Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
-                                "tablet " + tabletId + " not exists, backend " 
+ backendId);
-                        TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
-
-                        if (backendTablets.containsKey(tabletId)) {
-                            TTablet backendTablet = 
backendTablets.get(tabletId);
-                            Replica replica = entry.getValue();
-                            tabletFoundInMeta.add(tabletId);
-                            TTabletInfo backendTabletInfo = 
backendTablet.getTabletInfos().get(0);
-                            TTabletMetaInfo tabletMetaInfo = null;
-                            if (backendTabletInfo.getReplicaId() != 
replica.getId()
-                                    && replica.getState() != 
ReplicaState.CLONE) {
-                                // Need to update replica id in BE
-                                tabletMetaInfo = new TTabletMetaInfo();
-                                tabletMetaInfo.setReplicaId(replica.getId());
-                            }
-                            PartitionCollectInfo partitionCollectInfo =
-                                    
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
-                            boolean isInMemory = partitionCollectInfo != null 
&& partitionCollectInfo.isInMemory();
-                            if (isInMemory != 
backendTabletInfo.isIsInMemory()) {
-                                if (tabletMetaInfo == null) {
-                                    tabletMetaInfo = new TTabletMetaInfo();
-                                    tabletMetaInfo.setIsInMemory(isInMemory);
-                                }
-                            }
-                            if (Config.fix_tablet_partition_id_eq_0
-                                    && tabletMeta.getPartitionId() > 0
-                                    && backendTabletInfo.getPartitionId() == 
0) {
-                                LOG.warn("be report tablet partition id not eq 
fe, in be {} but in fe {}",
-                                        backendTabletInfo, tabletMeta);
-                                // Need to update partition id in BE
-                                tabletMetaInfo = new TTabletMetaInfo();
-                                
tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
-                            }
-                            // 1. (intersection)
-                            if (needSync(replica, backendTabletInfo)) {
-                                // need sync
-                                synchronized (tabletSyncMap) {
-                                    tabletSyncMap.put(tabletMeta.getDbId(), 
tabletId);
-                                }
-                            }
+                processTabletReportAsync(backendId, backendTablets, 
backendPartitionsVersion, storageMediumMap,
+                        tabletSyncMap, tabletDeleteFromMeta, 
tabletFoundInMeta, tabletMigrationMap,
+                        partitionVersionSyncMap, transactionsToPublish, 
transactionsToClear, tabletRecoveryMap,
+                        tabletToUpdate, cooldownTablets, 
replicaMetaWithBackend);
+            }
+        } finally {
+            readUnlock(stamp);
+        }
 
-                            // check and set path
-                            // path info of replica is only saved in Master FE
-                            if (backendTabletInfo.isSetPathHash()
-                                    && replica.getPathHash() != 
backendTabletInfo.getPathHash()) {
-                                
replica.setPathHash(backendTabletInfo.getPathHash());
-                            }
+        // Process cooldown configs outside of read lock to avoid deadlock
+        cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, 
cooldownConfToPush, cooldownConfToUpdate));
 
-                            if (backendTabletInfo.isSetSchemaHash() && 
replica.getState() == ReplicaState.NORMAL
-                                    && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
-                                // update the schema hash only when replica is 
normal
-                                
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
-                            }
+        logTabletReportSummary(backendId, feTabletNum, backendTablets, 
backendPartitionsVersion,
+                tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, 
tabletMigrationMap,
+                partitionVersionSyncMap, transactionsToPublish, 
transactionsToClear,
+                tabletToUpdate, tabletRecoveryMap, start);
+    }
 
-                            if (needRecover(replica, 
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
-                                LOG.warn("replica {} of tablet {} on backend 
{} need recovery. "
-                                                + "replica in FE: {}, report 
version {}, report schema hash: {},"
-                                                + " is bad: {}, is version 
missing: {}",
-                                        replica.getId(), tabletId, backendId, 
replica,
-                                        backendTabletInfo.getVersion(),
-                                        backendTabletInfo.getSchemaHash(),
-                                        backendTabletInfo.isSetUsed() ? 
!backendTabletInfo.isUsed() : "false",
-                                        backendTabletInfo.isSetVersionMiss() ? 
backendTabletInfo.isVersionMiss() :
-                                                "unset");
-                                synchronized (tabletRecoveryMap) {
-                                    
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
-                                }
-                            }
+    /**
+     * Process tablet report asynchronously using thread pool
+     */
+    private void processTabletReportAsync(long backendId, Map<Long, TTablet> 
backendTablets,
+                                          Map<Long, Long> 
backendPartitionsVersion,
+                                          HashMap<Long, TStorageMedium> 
storageMediumMap,
+                                          ListMultimap<Long, Long> 
tabletSyncMap,
+                                          ListMultimap<Long, Long> 
tabletDeleteFromMeta,
+                                          Set<Long> tabletFoundInMeta,
+                                          ListMultimap<TStorageMedium, Long> 
tabletMigrationMap,
+                                          Map<Long, Long> 
partitionVersionSyncMap,
+                                          Map<Long, SetMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
+                                          SetMultimap<Long, Long> 
transactionsToClear,
+                                          ListMultimap<Long, Long> 
tabletRecoveryMap,
+                                          List<TTabletMetaInfo> tabletToUpdate,
+                                          List<Pair<TabletMeta, TTabletInfo>> 
cooldownTablets,
+                                          Map<Long, Replica> 
replicaMetaWithBackend) {
+        // Calculate optimal chunk size to balance task granularity and 
concurrency
+        // For large tablet counts (40W-50W), we want smaller chunks to 
maximize parallelism
+        // Target: create at least threadPoolSize * 4 tasks for better load 
balancing
+        int totalTablets = replicaMetaWithBackend.size();
+        int threadPoolSize = Config.tablet_report_thread_pool_num;
+        int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads 
for better load balancing
+        int chunkSize = Math.max(500, totalTablets / targetTasks);
+
+        // Cap chunk size to avoid too large tasks
+        // so thread pool queue will not be fulled with few large tasks
+        int maxChunkSize = 10000;
+        chunkSize = Math.min(chunkSize, maxChunkSize);
+        List<Map.Entry<Long, Replica>> entries = new 
ArrayList<>(replicaMetaWithBackend.entrySet());
+        List<CompletableFuture<Void>> tabletFutures = new ArrayList<>();
+        int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Processing tablet report for backend[{}]: total 
tablets={}, chunkSize={}, estimated tasks={}",
+                    backendId, totalTablets, chunkSize, estimatedTasks);
+        }
 
-                            if (Config.enable_storage_policy && 
backendTabletInfo.isSetCooldownTerm()) {
-                                // Place tablet info in a container and 
process it outside of read lock to avoid
-                                // deadlock with OlapTable lock
-                                synchronized (cooldownTablets) {
-                                    cooldownTablets.add(Pair.of(tabletMeta, 
backendTabletInfo));
-                                }
-                                
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
-                                
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
-                            }
+        for (int i = 0; i < entries.size(); i += chunkSize) {
+            final int start = i;
+            final int end = Math.min(i + chunkSize, entries.size());
+
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                for (int j = start; j < end; j++) {
+                    Map.Entry<Long, Replica> entry = entries.get(j);
+                    processTabletEntry(backendId, backendTablets, 
storageMediumMap, tabletSyncMap,
+                            tabletDeleteFromMeta, tabletFoundInMeta, 
tabletMigrationMap,
+                            transactionsToPublish, transactionsToClear, 
tabletRecoveryMap,
+                            tabletToUpdate, cooldownTablets, entry);
+                }
+            }, taskPool);
 
-                            long partitionId = tabletMeta.getPartitionId();
-                            if (!Config.disable_storage_medium_check) {
-                                // check if need migration
-                                TStorageMedium storageMedium = 
storageMediumMap.get(partitionId);
-                                if (storageMedium != null && 
backendTabletInfo.isSetStorageMedium()
-                                        && isLocal(storageMedium) && 
isLocal(backendTabletInfo.getStorageMedium())
-                                        && 
isLocal(tabletMeta.getStorageMedium())) {
-                                    if (storageMedium != 
backendTabletInfo.getStorageMedium()) {
-                                        synchronized (tabletMigrationMap) {
-                                            
tabletMigrationMap.put(storageMedium, tabletId);
-                                        }
-                                    }
-                                    if (storageMedium != 
tabletMeta.getStorageMedium()) {
-                                        
tabletMeta.setStorageMedium(storageMedium);
-                                    }
-                                }
-                            }
+            tabletFutures.add(future);
+        }
 
-                            // check if should clear transactions
-                            if (backendTabletInfo.isSetTransactionIds()) {
-                                handleBackendTransactions(backendId, 
backendTabletInfo.getTransactionIds(), tabletId,
-                                        tabletMeta, transactionsToPublish, 
transactionsToClear);
-                            } // end for txn id
-
-                            // update replicase's version count
-                            // no need to write log, and no need to get db 
lock.
-                            if (backendTabletInfo.isSetTotalVersionCount()) {
-                                
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
-                                
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
-                                        ? 
backendTabletInfo.getVisibleVersionCount()
-                                                : 
backendTabletInfo.getTotalVersionCount());
-                            }
-                            if (tabletMetaInfo != null) {
-                                tabletMetaInfo.setTabletId(tabletId);
-                                synchronized (tabletToUpdate) {
-                                    tabletToUpdate.add(tabletMetaInfo);
-                                }
-                            }
-                        } else {
-                            // 2. (meta - be)
-                            // may need delete from meta
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("backend[{}] does not report 
tablet[{}-{}]", backendId, tabletId, tabletMeta);
-                            }
-                            synchronized (tabletDeleteFromMeta) {
-                                tabletDeleteFromMeta.put(tabletMeta.getDbId(), 
tabletId);
-                            }
-                        }
-                    });
-
-                    
backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> {
-                        long partitionId = entry.getKey();
-                        long backendVersion = entry.getValue();
-                        PartitionCollectInfo partitionInfo = 
partitionCollectInfoMap.get(partitionId);
-                        if (partitionInfo != null && 
partitionInfo.getVisibleVersion() > backendVersion) {
-                            partitionVersionSyncMap.put(partitionId, 
partitionInfo.getVisibleVersion());
-                        }
-                    });
-                }).join();
+        // Process partition versions in parallel
+        CompletableFuture<Void> partitionFuture = 
CompletableFuture.runAsync(() -> {
+            processPartitionVersions(backendPartitionsVersion, 
partitionVersionSyncMap);
+        }, taskPool);
+
+        // Wait for all tasks to complete
+        CompletableFuture.allOf(tabletFutures.toArray(new 
CompletableFuture[0])).join();
+        partitionFuture.join();
+    }
+
+    /**
+     * Process a single tablet entry from backend report
+     */
+    private void processTabletEntry(long backendId, Map<Long, TTablet> 
backendTablets,
+                                     HashMap<Long, TStorageMedium> 
storageMediumMap,
+                                     ListMultimap<Long, Long> tabletSyncMap,
+                                     ListMultimap<Long, Long> 
tabletDeleteFromMeta,
+                                     Set<Long> tabletFoundInMeta,
+                                     ListMultimap<TStorageMedium, Long> 
tabletMigrationMap,
+                                     Map<Long, SetMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
+                                     SetMultimap<Long, Long> 
transactionsToClear,
+                                     ListMultimap<Long, Long> 
tabletRecoveryMap,
+                                     List<TTabletMetaInfo> tabletToUpdate,
+                                     List<Pair<TabletMeta, TTabletInfo>> 
cooldownTablets,
+                                     Map.Entry<Long, Replica> entry) {
+        long tabletId = entry.getKey();
+        Replica replica = entry.getValue();
+
+        Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
+                "tablet " + tabletId + " not exists, backend " + backendId);
+        TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+        if (backendTablets.containsKey(tabletId)) {
+            // Tablet exists in both FE and BE
+            TTablet backendTablet = backendTablets.get(tabletId);
+            TTabletInfo backendTabletInfo = 
backendTablet.getTabletInfos().get(0);
+
+            tabletFoundInMeta.add(tabletId);
+
+            processExistingTablet(backendId, tabletId, replica, tabletMeta, 
backendTabletInfo,
+                    storageMediumMap, tabletSyncMap, tabletMigrationMap, 
transactionsToPublish,
+                    transactionsToClear, tabletRecoveryMap, tabletToUpdate, 
cooldownTablets);
+        } else {
+            // Tablet exists in FE but not in BE - may need deletion
+            processDeletedTablet(backendId, tabletId, tabletMeta, 
tabletDeleteFromMeta);
+        }
+    }
+
+    /**
+     * Process tablet that exists in both FE and BE
+     */
+    private void processExistingTablet(long backendId, long tabletId, Replica 
replica,
+                                        TabletMeta tabletMeta, TTabletInfo 
backendTabletInfo,
+                                        HashMap<Long, TStorageMedium> 
storageMediumMap,
+                                        ListMultimap<Long, Long> tabletSyncMap,
+                                        ListMultimap<TStorageMedium, Long> 
tabletMigrationMap,
+                                        Map<Long, SetMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
+                                        SetMultimap<Long, Long> 
transactionsToClear,
+                                        ListMultimap<Long, Long> 
tabletRecoveryMap,
+                                        List<TTabletMetaInfo> tabletToUpdate,
+                                        List<Pair<TabletMeta, TTabletInfo>> 
cooldownTablets) {
+        // Check and prepare tablet meta info update
+        TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica, 
tabletMeta, backendTabletInfo);
+
+        // Check if version sync is needed
+        if (needSync(replica, backendTabletInfo)) {
+            synchronized (tabletSyncMap) {
+                tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
             }
-        } finally {
-            readUnlock(stamp);
         }
-        cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, 
cooldownConfToPush, cooldownConfToUpdate));
 
-        long end = System.currentTimeMillis();
+        // Update replica path and schema hash
+        updateReplicaBasicInfo(replica, backendTabletInfo);
+
+        // Check if replica needs recovery
+        if (needRecover(replica, tabletMeta.getOldSchemaHash(), 
backendTabletInfo)) {
+            logReplicaRecovery(replica, tabletId, backendId, 
backendTabletInfo);
+            synchronized (tabletRecoveryMap) {
+                tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+            }
+        }
+
+        // Handle cooldown policy
+        if (Config.enable_storage_policy && 
backendTabletInfo.isSetCooldownTerm()) {
+            synchronized (cooldownTablets) {
+                cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo));
+            }
+            replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
+            replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
+        }
+
+        // Check storage medium migration
+        checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo,
+                storageMediumMap, tabletMigrationMap);
+
+        // Handle transactions
+        if (backendTabletInfo.isSetTransactionIds()) {
+            handleBackendTransactions(backendId, 
backendTabletInfo.getTransactionIds(), tabletId,
+                    tabletMeta, transactionsToPublish, transactionsToClear);
+        }
+
+        // Update replica version count
+        updateReplicaVersionCount(replica, backendTabletInfo);
+
+        // Add tablet meta info to update list if needed
+        if (tabletMetaInfo != null) {
+            tabletMetaInfo.setTabletId(tabletId);
+            synchronized (tabletToUpdate) {
+                tabletToUpdate.add(tabletMetaInfo);
+            }
+        }
+    }
+
+    /**
+     * Prepare tablet meta info for BE update if needed
+     */
+    private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta 
tabletMeta,
+                                                   TTabletInfo 
backendTabletInfo) {
+        TTabletMetaInfo tabletMetaInfo = null;
+
+        // Check replica id mismatch
+        if (backendTabletInfo.getReplicaId() != replica.getId()
+                && replica.getState() != ReplicaState.CLONE) {
+            tabletMetaInfo = new TTabletMetaInfo();
+            tabletMetaInfo.setReplicaId(replica.getId());
+        }
+
+        // Check in-memory flag
+        PartitionCollectInfo partitionCollectInfo =
+                
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
+        boolean isInMemory = partitionCollectInfo != null && 
partitionCollectInfo.isInMemory();
+        if (isInMemory != backendTabletInfo.isIsInMemory()) {
+            if (tabletMetaInfo == null) {
+                tabletMetaInfo = new TTabletMetaInfo();
+            }
+            tabletMetaInfo.setIsInMemory(isInMemory);
+        }
+
+        // Check partition id mismatch
+        if (Config.fix_tablet_partition_id_eq_0
+                && tabletMeta.getPartitionId() > 0
+                && backendTabletInfo.getPartitionId() == 0) {
+            LOG.warn("be report tablet partition id not eq fe, in be {} but in 
fe {}",
+                    backendTabletInfo, tabletMeta);
+            if (tabletMetaInfo == null) {
+                tabletMetaInfo = new TTabletMetaInfo();
+            }
+            tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
+        }
+
+        return tabletMetaInfo;
+    }
+
+    /**
+     * Update replica's basic info like path hash and schema hash
+     */
+    private void updateReplicaBasicInfo(Replica replica, TTabletInfo 
backendTabletInfo) {
+        // Update path hash
+        if (backendTabletInfo.isSetPathHash()
+                && replica.getPathHash() != backendTabletInfo.getPathHash()) {
+            replica.setPathHash(backendTabletInfo.getPathHash());
+        }
+
+        // Update schema hash
+        if (backendTabletInfo.isSetSchemaHash()
+                && replica.getState() == ReplicaState.NORMAL
+                && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
+            replica.setSchemaHash(backendTabletInfo.getSchemaHash());
+        }
+    }
+
+    /**
+     * Log replica recovery information
+     */
+    private void logReplicaRecovery(Replica replica, long tabletId, long 
backendId,
+                                     TTabletInfo backendTabletInfo) {
+        LOG.warn("replica {} of tablet {} on backend {} need recovery. "
+                        + "replica in FE: {}, report version {}, report schema 
hash: {}, "
+                        + "is bad: {}, is version missing: {}",
+                replica.getId(), tabletId, backendId, replica,
+                backendTabletInfo.getVersion(),
+                backendTabletInfo.getSchemaHash(),
+                backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : 
"false",
+                backendTabletInfo.isSetVersionMiss() ? 
backendTabletInfo.isVersionMiss() : "unset");
+    }
+
+    /**
+     * Check if storage medium migration is needed
+     */
+    private void checkStorageMediumMigration(long tabletId, TabletMeta 
tabletMeta,
+                                              TTabletInfo backendTabletInfo,
+                                              HashMap<Long, TStorageMedium> 
storageMediumMap,
+                                              ListMultimap<TStorageMedium, 
Long> tabletMigrationMap) {
+        if (Config.disable_storage_medium_check) {
+            return;
+        }
+
+        long partitionId = tabletMeta.getPartitionId();
+        TStorageMedium storageMedium = storageMediumMap.get(partitionId);
+
+        if (storageMedium != null && backendTabletInfo.isSetStorageMedium()
+                && isLocal(storageMedium)
+                && isLocal(backendTabletInfo.getStorageMedium())
+                && isLocal(tabletMeta.getStorageMedium())) {
+
+            if (storageMedium != backendTabletInfo.getStorageMedium()) {
+                synchronized (tabletMigrationMap) {
+                    tabletMigrationMap.put(storageMedium, tabletId);
+                }
+            }
+
+            if (storageMedium != tabletMeta.getStorageMedium()) {
+                tabletMeta.setStorageMedium(storageMedium);
+            }
+        }
+    }
+
+    /**
+     * Update replica's version count
+     */
+    private void updateReplicaVersionCount(Replica replica, TTabletInfo 
backendTabletInfo) {
+        if (backendTabletInfo.isSetTotalVersionCount()) {
+            
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
+            
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
+                    ? backendTabletInfo.getVisibleVersionCount()
+                    : backendTabletInfo.getTotalVersionCount());
+        }
+    }
+
+    /**
+     * Process tablet that exists in FE but not reported by BE
+     */
+    private void processDeletedTablet(long backendId, long tabletId, 
TabletMeta tabletMeta,
+                                       ListMultimap<Long, Long> 
tabletDeleteFromMeta) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, 
tabletId, tabletMeta);
+        }
+        synchronized (tabletDeleteFromMeta) {
+            tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
+        }
+    }
+
+    /**
+     * Process partition versions reported by BE
+     */
+    private void processPartitionVersions(Map<Long, Long> 
backendPartitionsVersion,
+                                           Map<Long, Long> 
partitionVersionSyncMap) {
+        for (Map.Entry<Long, Long> entry : 
backendPartitionsVersion.entrySet()) {
+            long partitionId = entry.getKey();
+            long backendVersion = entry.getValue();
+            PartitionCollectInfo partitionInfo = 
partitionCollectInfoMap.get(partitionId);
+
+            if (partitionInfo != null && partitionInfo.getVisibleVersion() > 
backendVersion) {
+                partitionVersionSyncMap.put(partitionId, 
partitionInfo.getVisibleVersion());
+            }
+        }
+    }
+
+    /**
+     * Log tablet report summary
+     */
+    private void logTabletReportSummary(long backendId, long feTabletNum,
+                                         Map<Long, TTablet> backendTablets,
+                                         Map<Long, Long> 
backendPartitionsVersion,
+                                         ListMultimap<Long, Long> 
tabletSyncMap,
+                                         ListMultimap<Long, Long> 
tabletDeleteFromMeta,
+                                         Set<Long> tabletFoundInMeta,
+                                         ListMultimap<TStorageMedium, Long> 
tabletMigrationMap,
+                                         Map<Long, Long> 
partitionVersionSyncMap,
+                                         Map<Long, SetMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
+                                         SetMultimap<Long, Long> 
transactionsToClear,
+                                         List<TTabletMetaInfo> tabletToUpdate,
+                                         ListMultimap<Long, Long> 
tabletRecoveryMap,
+                                         long startTime) {
+        long endTime = System.currentTimeMillis();
         long toClearTransactionsNum = transactionsToClear.keySet().size();
         long toClearTransactionsPartitions = 
transactionsToClear.values().size();
         long toPublishTransactionsNum = transactionsToPublish.values().stream()
-                                        .mapToLong(m -> 
m.keySet().size()).sum();
+                .mapToLong(m -> m.keySet().size()).sum();
         long toPublishTransactionsPartitions = 
transactionsToPublish.values().stream()
-                                               .mapToLong(m -> 
m.values().size()).sum();
-        LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: 
{}, backend tablet num: {}. sync: {}."
-                        + " metaDel: {}. foundInMeta: {}. migration: {}. 
backend partition num: {}, backend need "
-                        + "update: {}. found invalid transactions 
{}(partitions: {}). found republish "
-                        + "transactions {}(partitions: {}). tabletToUpdate: 
{}. need recovery: {}. cost: {} ms",
+                .mapToLong(m -> m.values().size()).sum();
+
+        LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: 
{}, backend tablet num: {}. "
+                        + "sync: {}, metaDel: {}, foundInMeta: {}, migration: 
{}, "
+                        + "backend partition num: {}, backend need update: {}, 
"
+                        + "found invalid transactions {}(partitions: {}), "
+                        + "found republish transactions {}(partitions: {}), "
+                        + "tabletToUpdate: {}, need recovery: {}, cost: {} ms",
                 backendId, feTabletNum, backendTablets.size(), 
tabletSyncMap.size(),
                 tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), 
tabletMigrationMap.size(),
-                backendPartitionsVersion.size(), 
partitionVersionSyncMap.size(), toClearTransactionsNum,
-                toClearTransactionsPartitions, toPublishTransactionsNum, 
toPublishTransactionsPartitions,
-                tabletToUpdate.size(), tabletRecoveryMap.size(), (end - 
start));
+                backendPartitionsVersion.size(), 
partitionVersionSyncMap.size(),
+                toClearTransactionsNum, toClearTransactionsPartitions,
+                toPublishTransactionsNum, toPublishTransactionsPartitions,
+                tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime - 
startTime));
     }
 
     private void handleBackendTransactions(long backendId, List<Long> 
transactionIds, long tabletId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to