This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6af35a8eb7 [fix](TabletInvertedIndex) fix potential deadlock between
ForkJoinPool and TabletInvertedIndex (#11365)
6af35a8eb7 is described below
commit 6af35a8eb71f21bc16552cf369c709df4a574469
Author: caiconghui <[email protected]>
AuthorDate: Tue Aug 2 10:08:05 2022 +0800
[fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool and
TabletInvertedIndex (#11365)
* [fix](TabletInvertedIndex) fix potential deadlock between ForkJoinPool
and TabletInvertedIndex
The default ForkJoinPool is shared by all parallelStream by default, and we
obtain read lock outside the ForkJoinPool in TabletInvertIndex while we obtain
read lock inside the same ForkJoinPool in TabletStatMgr which may cause deadlock
---
.../org/apache/doris/analysis/LoadColumnsInfo.java | 6 +-
.../apache/doris/catalog/TabletInvertedIndex.java | 216 +++++++++++----------
.../org/apache/doris/catalog/TabletStatMgr.java | 46 +++--
.../org/apache/doris/load/loadv2/LoadManager.java | 2 +-
.../doris/load/routineload/RoutineLoadManager.java | 2 +-
.../planner/external/ExternalHiveScanProvider.java | 2 +-
6 files changed, 143 insertions(+), 131 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
index c93b05d8bb..ab949add28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadColumnsInfo.java
@@ -66,10 +66,10 @@ public class LoadColumnsInfo implements ParseNode {
sb.append(Joiner.on(",").join(columnNames));
sb.append(")");
- if (columnMappingList != null || columnMappingList.size() != 0) {
+ if (columnMappingList != null && !columnMappingList.isEmpty()) {
sb.append(" SET (");
- sb.append(Joiner.on(",").join(columnMappingList.parallelStream()
- .map(entity ->
entity.toSql()).collect(Collectors.toList())));
+ sb.append(Joiner.on(",").join(columnMappingList.stream()
+
.map(Expr::toSql).collect(Collectors.toList())));
sb.append(")");
}
return sb.toString();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index d725a14dcb..621e2a1627 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -93,6 +94,8 @@ public class TabletInvertedIndex {
private volatile ImmutableSet<Long> partitionIdInMemorySet =
ImmutableSet.of();
+ private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
public TabletInvertedIndex() {
}
@@ -128,128 +131,131 @@ public class TabletInvertedIndex {
LOG.info("begin to do tablet diff with backend[{}]. num: {}",
backendId, backendTablets.size());
Map<Long, Replica> replicaMetaWithBackend =
backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
- // traverse replicas in meta with this backend
-
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
- long tabletId = entry.getKey();
-
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
- TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
-
- if (backendTablets.containsKey(tabletId)) {
- TTablet backendTablet = backendTablets.get(tabletId);
- Replica replica = entry.getValue();
- tabletFoundInMeta.add(tabletId);
- TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
- if (partitionIdInMemorySet.contains(
- backendTabletInfo.getPartitionId()) !=
backendTabletInfo.isIsInMemory()) {
- synchronized (tabletToInMemory) {
- tabletToInMemory.add(new
ImmutableTriple<>(tabletId, backendTabletInfo.getSchemaHash(),
- !backendTabletInfo.isIsInMemory()));
+ taskPool.submit(() -> {
+ // traverse replicas in meta with this backend
+
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
+ long tabletId = entry.getKey();
+
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
+ TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+ if (backendTablets.containsKey(tabletId)) {
+ TTablet backendTablet =
backendTablets.get(tabletId);
+ Replica replica = entry.getValue();
+ tabletFoundInMeta.add(tabletId);
+ TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
+ if (partitionIdInMemorySet.contains(
+ backendTabletInfo.getPartitionId()) !=
backendTabletInfo.isIsInMemory()) {
+ synchronized (tabletToInMemory) {
+ tabletToInMemory.add(new
ImmutableTriple<>(tabletId,
+ backendTabletInfo.getSchemaHash(),
!backendTabletInfo.isIsInMemory()));
+ }
}
- }
- // 1. (intersection)
- if (needSync(replica, backendTabletInfo)) {
- // need sync
- synchronized (tabletSyncMap) {
- tabletSyncMap.put(tabletMeta.getDbId(),
tabletId);
+ // 1. (intersection)
+ if (needSync(replica, backendTabletInfo)) {
+ // need sync
+ synchronized (tabletSyncMap) {
+ tabletSyncMap.put(tabletMeta.getDbId(),
tabletId);
+ }
}
- }
- // check and set path
- // path info of replica is only saved in Master FE
- if (backendTabletInfo.isSetPathHash()
- && replica.getPathHash() !=
backendTabletInfo.getPathHash()) {
-
replica.setPathHash(backendTabletInfo.getPathHash());
- }
+ // check and set path
+ // path info of replica is only saved in Master FE
+ if (backendTabletInfo.isSetPathHash()
+ && replica.getPathHash() !=
backendTabletInfo.getPathHash()) {
+
replica.setPathHash(backendTabletInfo.getPathHash());
+ }
- if (backendTabletInfo.isSetSchemaHash() &&
replica.getState() == ReplicaState.NORMAL
- && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
- // update the schema hash only when replica is
normal
-
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
- }
+ if (backendTabletInfo.isSetSchemaHash() &&
replica.getState() == ReplicaState.NORMAL
+ && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
+ // update the schema hash only when replica is
normal
+
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
+ }
- if (needRecover(replica,
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
- LOG.warn("replica {} of tablet {} on backend {}
need recovery. "
- + "replica in FE: {}, report
version {}, report schema hash: {},"
- + " is bad: {}, is version
missing: {}",
- replica.getId(), tabletId, backendId,
replica,
- backendTabletInfo.getVersion(),
- backendTabletInfo.getSchemaHash(),
- backendTabletInfo.isSetUsed() ?
!backendTabletInfo.isUsed() : "false",
- backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() : "unset");
- synchronized (tabletRecoveryMap) {
- tabletRecoveryMap.put(tabletMeta.getDbId(),
tabletId);
+ if (needRecover(replica,
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
+ LOG.warn("replica {} of tablet {} on backend
{} need recovery. "
+ + "replica in FE: {}, report
version {}, report schema hash: {},"
+ + " is bad: {}, is version
missing: {}",
+ replica.getId(), tabletId, backendId,
replica,
+ backendTabletInfo.getVersion(),
+ backendTabletInfo.getSchemaHash(),
+ backendTabletInfo.isSetUsed() ?
!backendTabletInfo.isUsed() : "false",
+ backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() :
+ "unset");
+ synchronized (tabletRecoveryMap) {
+
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+ }
}
- }
- long partitionId = tabletMeta.getPartitionId();
- if (!Config.disable_storage_medium_check) {
- // check if need migration
- TStorageMedium storageMedium =
storageMediumMap.get(partitionId);
- if (storageMedium != null &&
backendTabletInfo.isSetStorageMedium()
- && isLocal(storageMedium) &&
isLocal(backendTabletInfo.getStorageMedium())
- && isLocal(tabletMeta.getStorageMedium()))
{
- if (storageMedium !=
backendTabletInfo.getStorageMedium()) {
- synchronized (tabletMigrationMap) {
- tabletMigrationMap.put(storageMedium,
tabletId);
+ long partitionId = tabletMeta.getPartitionId();
+ if (!Config.disable_storage_medium_check) {
+ // check if need migration
+ TStorageMedium storageMedium =
storageMediumMap.get(partitionId);
+ if (storageMedium != null &&
backendTabletInfo.isSetStorageMedium()
+ && isLocal(storageMedium) &&
isLocal(backendTabletInfo.getStorageMedium())
+ &&
isLocal(tabletMeta.getStorageMedium())) {
+ if (storageMedium !=
backendTabletInfo.getStorageMedium()) {
+ synchronized (tabletMigrationMap) {
+
tabletMigrationMap.put(storageMedium, tabletId);
+ }
+ }
+ if (storageMedium !=
tabletMeta.getStorageMedium()) {
+
tabletMeta.setStorageMedium(storageMedium);
}
- }
- if (storageMedium !=
tabletMeta.getStorageMedium()) {
- tabletMeta.setStorageMedium(storageMedium);
}
}
- }
- // check if should clear transactions
- if (backendTabletInfo.isSetTransactionIds()) {
- List<Long> transactionIds =
backendTabletInfo.getTransactionIds();
- GlobalTransactionMgr transactionMgr =
Env.getCurrentGlobalTransactionMgr();
- for (Long transactionId : transactionIds) {
- TransactionState transactionState
- =
transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
- if (transactionState == null
- ||
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
- synchronized (transactionsToClear) {
- transactionsToClear.put(transactionId,
tabletMeta.getPartitionId());
- }
- LOG.debug("transaction id [{}] is not
valid any more, "
- + "clear it from backend [{}]",
transactionId, backendId);
- } else if
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
- TableCommitInfo tableCommitInfo
- =
transactionState.getTableCommitInfo(tabletMeta.getTableId());
- PartitionCommitInfo partitionCommitInfo =
tableCommitInfo == null
- ? null :
tableCommitInfo.getPartitionCommitInfo(partitionId);
- if (partitionCommitInfo != null) {
- TPartitionVersionInfo versionInfo
- = new
TPartitionVersionInfo(tabletMeta.getPartitionId(),
-
partitionCommitInfo.getVersion(), 0);
- synchronized (transactionsToPublish) {
- ListMultimap<Long,
TPartitionVersionInfo> map
- =
transactionsToPublish.get(transactionState.getDbId());
- if (map == null) {
- map =
ArrayListMultimap.create();
-
transactionsToPublish.put(transactionState.getDbId(), map);
+ // check if should clear transactions
+ if (backendTabletInfo.isSetTransactionIds()) {
+ List<Long> transactionIds =
backendTabletInfo.getTransactionIds();
+ GlobalTransactionMgr transactionMgr =
Env.getCurrentGlobalTransactionMgr();
+ for (Long transactionId : transactionIds) {
+ TransactionState transactionState
+ =
transactionMgr.getTransactionState(tabletMeta.getDbId(), transactionId);
+ if (transactionState == null
+ ||
transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+ synchronized (transactionsToClear) {
+
transactionsToClear.put(transactionId, tabletMeta.getPartitionId());
+ }
+ LOG.debug("transaction id [{}] is not
valid any more, "
+ + "clear it from backend
[{}]", transactionId, backendId);
+ } else if
(transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
+ TableCommitInfo tableCommitInfo
+ =
transactionState.getTableCommitInfo(tabletMeta.getTableId());
+ PartitionCommitInfo
partitionCommitInfo = tableCommitInfo == null
+ ? null :
tableCommitInfo.getPartitionCommitInfo(partitionId);
+ if (partitionCommitInfo != null) {
+ TPartitionVersionInfo versionInfo
+ = new
TPartitionVersionInfo(tabletMeta.getPartitionId(),
+
partitionCommitInfo.getVersion(), 0);
+ synchronized
(transactionsToPublish) {
+ ListMultimap<Long,
TPartitionVersionInfo> map
+ =
transactionsToPublish.get(transactionState.getDbId());
+ if (map == null) {
+ map =
ArrayListMultimap.create();
+
transactionsToPublish.put(transactionState.getDbId(), map);
+ }
+ map.put(transactionId,
versionInfo);
}
- map.put(transactionId,
versionInfo);
}
}
}
- }
- } // end for txn id
+ } // end for txn id
- // update replicase's version count
- // no need to write log, and no need to get db lock.
- if (backendTabletInfo.isSetVersionCount()) {
-
replica.setVersionCount(backendTabletInfo.getVersionCount());
- }
- } else {
- // 2. (meta - be)
- // may need delete from meta
- LOG.debug("backend[{}] does not report tablet[{}-{}]",
backendId, tabletId, tabletMeta);
- synchronized (tabletDeleteFromMeta) {
- tabletDeleteFromMeta.put(tabletMeta.getDbId(),
tabletId);
+ // update replicase's version count
+ // no need to write log, and no need to get db
lock.
+ if (backendTabletInfo.isSetVersionCount()) {
+
replica.setVersionCount(backendTabletInfo.getVersionCount());
+ }
+ } else {
+ // 2. (meta - be)
+ // may need delete from meta
+ LOG.debug("backend[{}] does not report
tablet[{}-{}]", backendId, tabletId, tabletMeta);
+ synchronized (tabletDeleteFromMeta) {
+ tabletDeleteFromMeta.put(tabletMeta.getDbId(),
tabletId);
+ }
}
- }
- });
+ });
+ }).join();
}
} finally {
readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 75625ef6f7..c541ee5e12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
/*
* TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -42,6 +43,8 @@ import java.util.Map;
public class TabletStatMgr extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(TabletStatMgr.class);
+ private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
public TabletStatMgr() {
super("tablet stat mgr", Config.tablet_stat_update_interval_second *
1000);
}
@@ -50,27 +53,30 @@ public class TabletStatMgr extends MasterDaemon {
protected void runAfterCatalogReady() {
ImmutableMap<Long, Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend();
long start = System.currentTimeMillis();
- backends.values().parallelStream().forEach(backend -> {
- BackendService.Client client = null;
- TNetworkAddress address = null;
- boolean ok = false;
- try {
- address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
- client = ClientPool.backendPool.borrowObject(address);
- TTabletStatResult result = client.getTabletStat();
- LOG.debug("get tablet stat from backend: {}, num: {}",
backend.getId(), result.getTabletsStatsSize());
- updateTabletStat(backend.getId(), result);
- ok = true;
- } catch (Exception e) {
- LOG.warn("task exec error. backend[{}]", backend.getId(), e);
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
+ taskPool.submit(() -> {
+ backends.values().parallelStream().forEach(backend -> {
+ BackendService.Client client = null;
+ TNetworkAddress address = null;
+ boolean ok = false;
+ try {
+ address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
+ client = ClientPool.backendPool.borrowObject(address);
+ TTabletStatResult result = client.getTabletStat();
+ LOG.debug("get tablet stat from backend: {}, num: {}",
backend.getId(),
+ result.getTabletsStatsSize());
+ updateTabletStat(backend.getId(), result);
+ ok = true;
+ } catch (Exception e) {
+ LOG.warn("task exec error. backend[{}]", backend.getId(),
e);
+ } finally {
+ if (ok) {
+ ClientPool.backendPool.returnObject(address, client);
+ } else {
+ ClientPool.backendPool.invalidateObject(address,
client);
+ }
}
- }
- });
+ });
+ }).join();
LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 92ec49ddbf..16b2843593 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -131,7 +131,7 @@ public class LoadManager implements Writable {
}
private long unprotectedGetUnfinishedJobNum() {
- return idToLoadJob.values().parallelStream()
+ return idToLoadJob.values().stream()
.filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED)).count();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 3e9f4e5e59..f05defe11d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -204,7 +204,7 @@ public class RoutineLoadManager implements Writable {
Map<String, List<RoutineLoadJob>> labelToRoutineLoadJob =
dbToNameToRoutineLoadJob.get(dbId);
if (labelToRoutineLoadJob.containsKey(name)) {
List<RoutineLoadJob> routineLoadJobList =
labelToRoutineLoadJob.get(name);
- Optional<RoutineLoadJob> optional =
routineLoadJobList.parallelStream()
+ Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
.filter(entity -> entity.getName().equals(name))
.filter(entity ->
!entity.getState().isFinalState()).findFirst();
return optional.isPresent();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index 22b0436d75..39ac68773a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -112,7 +112,7 @@ public class ExternalHiveScanProvider implements
ExternalFileScanProvider {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration,
inputFormatName, false);
List<InputSplit> splits;
if (!hivePartitions.isEmpty()) {
- splits = hivePartitions.parallelStream()
+ splits = hivePartitions.stream()
.flatMap(x -> getSplitsByPath(inputFormat, configuration,
x.getSd().getLocation()).stream())
.collect(Collectors.toList());
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]