This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6af35a8eb7 [fix](TabletInvertedIndex) fix potential deadlock between 
ForkJoinPool and TabletInvertedIndex (#11365)
6af35a8eb7 is described below

commit 6af35a8eb71f21bc16552cf369c709df4a574469
Author: caiconghui <[email protected]>
AuthorDate: Tue Aug 2 10:08:05 2022 +0800

    [fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool and 
TabletInvertedIndex (#11365)
    
    * [fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool 
and TabletInvertedIndex
    
    The default ForkJoinPool is shared by all parallelStream by default, and we 
obtain read lock outside the ForkJoinPool in TabletInvertIndex while we obtain 
read lock inside the same ForkJoinPool in TabletStatMgr which may cause deadlock
---
 .../org/apache/doris/analysis/LoadColumnsInfo.java |   6 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  | 216 +++++++++++----------
 .../org/apache/doris/catalog/TabletStatMgr.java    |  46 +++--
 .../org/apache/doris/load/loadv2/LoadManager.java  |   2 +-
 .../doris/load/routineload/RoutineLoadManager.java |   2 +-
 .../planner/external/ExternalHiveScanProvider.java |   2 +-
 6 files changed, 143 insertions(+), 131 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
index c93b05d8bb..ab949add28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
@@ -66,10 +66,10 @@ public class LoadColumnsInfo implements ParseNode {
         sb.append(Joiner.on(",").join(columnNames));
         sb.append(")");
 
-        if (columnMappingList != null || columnMappingList.size() != 0) {
+        if (columnMappingList != null && !columnMappingList.isEmpty()) {
             sb.append(" SET (");
-            sb.append(Joiner.on(",").join(columnMappingList.parallelStream()
-                                                  .map(entity -> 
entity.toSql()).collect(Collectors.toList())));
+            sb.append(Joiner.on(",").join(columnMappingList.stream()
+                                                  
.map(Expr::toSql).collect(Collectors.toList())));
             sb.append(")");
         }
         return sb.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index d725a14dcb..621e2a1627 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -93,6 +94,8 @@ public class TabletInvertedIndex {
 
     private volatile ImmutableSet<Long> partitionIdInMemorySet = 
ImmutableSet.of();
 
+    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
     public TabletInvertedIndex() {
     }
 
@@ -128,128 +131,131 @@ public class TabletInvertedIndex {
             LOG.info("begin to do tablet diff with backend[{}]. num: {}", 
backendId, backendTablets.size());
             Map<Long, Replica> replicaMetaWithBackend = 
backingReplicaMetaTable.row(backendId);
             if (replicaMetaWithBackend != null) {
-                // traverse replicas in meta with this backend
-                
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
-                    long tabletId = entry.getKey();
-                    
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
-                    TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
-
-                    if (backendTablets.containsKey(tabletId)) {
-                        TTablet backendTablet = backendTablets.get(tabletId);
-                        Replica replica = entry.getValue();
-                        tabletFoundInMeta.add(tabletId);
-                        TTabletInfo backendTabletInfo = 
backendTablet.getTabletInfos().get(0);
-                        if (partitionIdInMemorySet.contains(
-                                backendTabletInfo.getPartitionId()) != 
backendTabletInfo.isIsInMemory()) {
-                            synchronized (tabletToInMemory) {
-                                tabletToInMemory.add(new 
ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(),
-                                        !backendTabletInfo.isIsInMemory()));
+                taskPool.submit(() -> {
+                    // traverse replicas in meta with this backend
+                    
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
+                        long tabletId = entry.getKey();
+                        
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
+                        TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+                        if (backendTablets.containsKey(tabletId)) {
+                            TTablet backendTablet = 
backendTablets.get(tabletId);
+                            Replica replica = entry.getValue();
+                            tabletFoundInMeta.add(tabletId);
+                            TTabletInfo backendTabletInfo = 
backendTablet.getTabletInfos().get(0);
+                            if (partitionIdInMemorySet.contains(
+                                    backendTabletInfo.getPartitionId()) != 
backendTabletInfo.isIsInMemory()) {
+                                synchronized (tabletToInMemory) {
+                                    tabletToInMemory.add(new 
ImmutableTriple<>(tabletId,
+                                            backendTabletInfo.getSchemaHash(), 
!backendTabletInfo.isIsInMemory()));
+                                }
                             }
-                        }
-                        // 1. (intersection)
-                        if (needSync(replica, backendTabletInfo)) {
-                            // need sync
-                            synchronized (tabletSyncMap) {
-                                tabletSyncMap.put(tabletMeta.getDbId(), 
tabletId);
+                            // 1. (intersection)
+                            if (needSync(replica, backendTabletInfo)) {
+                                // need sync
+                                synchronized (tabletSyncMap) {
+                                    tabletSyncMap.put(tabletMeta.getDbId(), 
tabletId);
+                                }
                             }
-                        }
 
-                        // check and set path
-                        // path info of replica is only saved in Master FE
-                        if (backendTabletInfo.isSetPathHash()
-                                && replica.getPathHash() != 
backendTabletInfo.getPathHash()) {
-                            
replica.setPathHash(backendTabletInfo.getPathHash());
-                        }
+                            // check and set path
+                            // path info of replica is only saved in Master FE
+                            if (backendTabletInfo.isSetPathHash()
+                                    && replica.getPathHash() != 
backendTabletInfo.getPathHash()) {
+                                
replica.setPathHash(backendTabletInfo.getPathHash());
+                            }
 
-                        if (backendTabletInfo.isSetSchemaHash() && 
replica.getState() == ReplicaState.NORMAL
-                                && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
-                            // update the schema hash only when replica is 
normal
-                            
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
-                        }
+                            if (backendTabletInfo.isSetSchemaHash() && 
replica.getState() == ReplicaState.NORMAL
+                                    && replica.getSchemaHash() != 
backendTabletInfo.getSchemaHash()) {
+                                // update the schema hash only when replica is 
normal
+                                
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
+                            }
 
-                        if (needRecover(replica, 
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
-                            LOG.warn("replica {} of tablet {} on backend {} 
need recovery. "
-                                            + "replica in FE: {}, report 
version {}, report schema hash: {},"
-                                            + " is bad: {}, is version 
missing: {}",
-                                    replica.getId(), tabletId, backendId, 
replica,
-                                    backendTabletInfo.getVersion(),
-                                    backendTabletInfo.getSchemaHash(),
-                                    backendTabletInfo.isSetUsed() ? 
!backendTabletInfo.isUsed() : "false",
-                                    backendTabletInfo.isSetVersionMiss() ? 
backendTabletInfo.isVersionMiss() : "unset");
-                            synchronized (tabletRecoveryMap) {
-                                tabletRecoveryMap.put(tabletMeta.getDbId(), 
tabletId);
+                            if (needRecover(replica, 
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
+                                LOG.warn("replica {} of tablet {} on backend 
{} need recovery. "
+                                                + "replica in FE: {}, report 
version {}, report schema hash: {},"
+                                                + " is bad: {}, is version 
missing: {}",
+                                        replica.getId(), tabletId, backendId, 
replica,
+                                        backendTabletInfo.getVersion(),
+                                        backendTabletInfo.getSchemaHash(),
+                                        backendTabletInfo.isSetUsed() ? 
!backendTabletInfo.isUsed() : "false",
+                                        backendTabletInfo.isSetVersionMiss() ? 
backendTabletInfo.isVersionMiss() :
+                                                "unset");
+                                synchronized (tabletRecoveryMap) {
+                                    
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+                                }
                             }
-                        }
 
-                        long partitionId = tabletMeta.getPartitionId();
-                        if (!Config.disable_storage_medium_check) {
-                            // check if need migration
-                            TStorageMedium storageMedium = 
storageMediumMap.get(partitionId);
-                            if (storageMedium != null && 
backendTabletInfo.isSetStorageMedium()
-                                    && isLocal(storageMedium) && 
isLocal(backendTabletInfo.getStorageMedium())
-                                    && isLocal(tabletMeta.getStorageMedium())) 
{
-                                if (storageMedium != 
backendTabletInfo.getStorageMedium()) {
-                                    synchronized (tabletMigrationMap) {
-                                        tabletMigrationMap.put(storageMedium, 
tabletId);
+                            long partitionId = tabletMeta.getPartitionId();
+                            if (!Config.disable_storage_medium_check) {
+                                // check if need migration
+                                TStorageMedium storageMedium = 
storageMediumMap.get(partitionId);
+                                if (storageMedium != null && 
backendTabletInfo.isSetStorageMedium()
+                                        && isLocal(storageMedium) && 
isLocal(backendTabletInfo.getStorageMedium())
+                                        && 
isLocal(tabletMeta.getStorageMedium())) {
+                                    if (storageMedium != 
backendTabletInfo.getStorageMedium()) {
+                                        synchronized (tabletMigrationMap) {
+                                            
tabletMigrationMap.put(storageMedium, tabletId);
+                                        }
+                                    }
+                                    if (storageMedium != 
tabletMeta.getStorageMedium()) {
+                                        
tabletMeta.setStorageMedium(storageMedium);
                                     }
-                                }
-                                if (storageMedium != 
tabletMeta.getStorageMedium()) {
-                                    tabletMeta.setStorageMedium(storageMedium);
                                 }
                             }
-                        }
 
-                        // check if should clear transactions
-                        if (backendTabletInfo.isSetTransactionIds()) {
-                            List<Long> transactionIds = 
backendTabletInfo.getTransactionIds();
-                            GlobalTransactionMgr transactionMgr = 
Env.getCurrentGlobalTransactionMgr();
-                            for (Long transactionId : transactionIds) {
-                                TransactionState transactionState
-                                        = 
transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
-                                if (transactionState == null
-                                        || 
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
-                                    synchronized (transactionsToClear) {
-                                        transactionsToClear.put(transactionId, 
tabletMeta.getPartitionId());
-                                    }
-                                    LOG.debug("transaction id [{}] is not 
valid any more, "
-                                            + "clear it from backend [{}]", 
transactionId, backendId);
-                                } else if 
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
-                                    TableCommitInfo tableCommitInfo
-                                            = 
transactionState.getTableCommitInfo(tabletMeta.getTableId());
-                                    PartitionCommitInfo partitionCommitInfo = 
tableCommitInfo == null
-                                            ? null : 
tableCommitInfo.getPartitionCommitInfo(partitionId);
-                                    if (partitionCommitInfo != null) {
-                                        TPartitionVersionInfo versionInfo
-                                                = new 
TPartitionVersionInfo(tabletMeta.getPartitionId(),
-                                                
partitionCommitInfo.getVersion(), 0);
-                                        synchronized (transactionsToPublish) {
-                                            ListMultimap<Long, 
TPartitionVersionInfo> map
-                                                    = 
transactionsToPublish.get(transactionState.getDbId());
-                                            if (map == null) {
-                                                map = 
ArrayListMultimap.create();
-                                                
transactionsToPublish.put(transactionState.getDbId(), map);
+                            // check if should clear transactions
+                            if (backendTabletInfo.isSetTransactionIds()) {
+                                List<Long> transactionIds = 
backendTabletInfo.getTransactionIds();
+                                GlobalTransactionMgr transactionMgr = 
Env.getCurrentGlobalTransactionMgr();
+                                for (Long transactionId : transactionIds) {
+                                    TransactionState transactionState
+                                            = 
transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
+                                    if (transactionState == null
+                                            || 
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+                                        synchronized (transactionsToClear) {
+                                            
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
+                                        }
+                                        LOG.debug("transaction id [{}] is not 
valid any more, "
+                                                + "clear it from backend 
[{}]", transactionId, backendId);
+                                    } else if 
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+                                        TableCommitInfo tableCommitInfo
+                                                = 
transactionState.getTableCommitInfo(tabletMeta.getTableId());
+                                        PartitionCommitInfo 
partitionCommitInfo = tableCommitInfo == null
+                                                ? null : 
tableCommitInfo.getPartitionCommitInfo(partitionId);
+                                        if (partitionCommitInfo != null) {
+                                            TPartitionVersionInfo versionInfo
+                                                    = new 
TPartitionVersionInfo(tabletMeta.getPartitionId(),
+                                                    
partitionCommitInfo.getVersion(), 0);
+                                            synchronized 
(transactionsToPublish) {
+                                                ListMultimap<Long, 
TPartitionVersionInfo> map
+                                                        = 
transactionsToPublish.get(transactionState.getDbId());
+                                                if (map == null) {
+                                                    map = 
ArrayListMultimap.create();
+                                                    
transactionsToPublish.put(transactionState.getDbId(), map);
+                                                }
+                                                map.put(transactionId, 
versionInfo);
                                             }
-                                            map.put(transactionId, 
versionInfo);
                                         }
                                     }
                                 }
-                            }
-                        } // end for txn id
+                            } // end for txn id
 
-                        // update replicase's version count
-                        // no need to write log, and no need to get db lock.
-                        if (backendTabletInfo.isSetVersionCount()) {
-                            
replica.setVersionCount(backendTabletInfo.getVersionCount());
-                        }
-                    } else {
-                        // 2. (meta - be)
-                        // may need delete from meta
-                        LOG.debug("backend[{}] does not report tablet[{}-{}]", 
backendId, tabletId, tabletMeta);
-                        synchronized (tabletDeleteFromMeta) {
-                            tabletDeleteFromMeta.put(tabletMeta.getDbId(), 
tabletId);
+                            // update replicase's version count
+                            // no need to write log, and no need to get db 
lock.
+                            if (backendTabletInfo.isSetVersionCount()) {
+                                
replica.setVersionCount(backendTabletInfo.getVersionCount());
+                            }
+                        } else {
+                            // 2. (meta - be)
+                            // may need delete from meta
+                            LOG.debug("backend[{}] does not report 
tablet[{}-{}]", backendId, tabletId, tabletMeta);
+                            synchronized (tabletDeleteFromMeta) {
+                                tabletDeleteFromMeta.put(tabletMeta.getDbId(), 
tabletId);
+                            }
                         }
-                    }
-                });
+                    });
+                }).join();
             }
         } finally {
             readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 75625ef6f7..c541ee5e12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
 
 /*
  * TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -42,6 +43,8 @@ import java.util.Map;
 public class TabletStatMgr extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(TabletStatMgr.class);
 
+    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
     public TabletStatMgr() {
         super("tablet stat mgr", Config.tablet_stat_update_interval_second * 
1000);
     }
@@ -50,27 +53,30 @@ public class TabletStatMgr extends MasterDaemon {
     protected void runAfterCatalogReady() {
         ImmutableMap<Long, Backend> backends = 
Env.getCurrentSystemInfo().getIdToBackend();
         long start = System.currentTimeMillis();
-        backends.values().parallelStream().forEach(backend -> {
-            BackendService.Client client = null;
-            TNetworkAddress address = null;
-            boolean ok = false;
-            try {
-                address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
-                client = ClientPool.backendPool.borrowObject(address);
-                TTabletStatResult result = client.getTabletStat();
-                LOG.debug("get tablet stat from backend: {}, num: {}", 
backend.getId(), result.getTabletsStatsSize());
-                updateTabletStat(backend.getId(), result);
-                ok = true;
-            } catch (Exception e) {
-                LOG.warn("task exec error. backend[{}]", backend.getId(), e);
-            } finally {
-                if (ok) {
-                    ClientPool.backendPool.returnObject(address, client);
-                } else {
-                    ClientPool.backendPool.invalidateObject(address, client);
+        taskPool.submit(() -> {
+            backends.values().parallelStream().forEach(backend -> {
+                BackendService.Client client = null;
+                TNetworkAddress address = null;
+                boolean ok = false;
+                try {
+                    address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
+                    client = ClientPool.backendPool.borrowObject(address);
+                    TTabletStatResult result = client.getTabletStat();
+                    LOG.debug("get tablet stat from backend: {}, num: {}", 
backend.getId(),
+                            result.getTabletsStatsSize());
+                    updateTabletStat(backend.getId(), result);
+                    ok = true;
+                } catch (Exception e) {
+                    LOG.warn("task exec error. backend[{}]", backend.getId(), 
e);
+                } finally {
+                    if (ok) {
+                        ClientPool.backendPool.returnObject(address, client);
+                    } else {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    }
                 }
-            }
-        });
+            });
+        }).join();
         LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
                 (System.currentTimeMillis() - start));
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 92ec49ddbf..16b2843593 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -131,7 +131,7 @@ public class LoadManager implements Writable {
     }
 
     private long unprotectedGetUnfinishedJobNum() {
-        return idToLoadJob.values().parallelStream()
+        return idToLoadJob.values().stream()
                 .filter(j -> (j.getState() != JobState.FINISHED && 
j.getState() != JobState.CANCELLED)).count();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 3e9f4e5e59..f05defe11d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -204,7 +204,7 @@ public class RoutineLoadManager implements Writable {
             Map<String, List<RoutineLoadJob>> labelToRoutineLoadJob = 
dbToNameToRoutineLoadJob.get(dbId);
             if (labelToRoutineLoadJob.containsKey(name)) {
                 List<RoutineLoadJob> routineLoadJobList = 
labelToRoutineLoadJob.get(name);
-                Optional<RoutineLoadJob> optional = 
routineLoadJobList.parallelStream()
+                Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
                         .filter(entity -> entity.getName().equals(name))
                         .filter(entity -> 
!entity.getState().isFinalState()).findFirst();
                 return optional.isPresent();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index 22b0436d75..39ac68773a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -112,7 +112,7 @@ public class ExternalHiveScanProvider implements 
ExternalFileScanProvider {
         InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, 
inputFormatName, false);
         List<InputSplit> splits;
         if (!hivePartitions.isEmpty()) {
-            splits = hivePartitions.parallelStream()
+            splits = hivePartitions.stream()
                     .flatMap(x -> getSplitsByPath(inputFormat, configuration, 
x.getSd().getLocation()).stream())
                     .collect(Collectors.toList());
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to