morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r421537636
##########
File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2639,6 +2641,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
fullNameToDb.remove(db.getFullName());
final Cluster cluster = nameToCluster.get(db.getClusterName());
cluster.removeDb(dbName, db.getId());
+ globalTransactionMgr.removeDatabaseTransactionMgr(db.getId());
Review comment:
Actually, the operation `DropDb` is just put database to
CatalogRecycleBin, not actually drop it. And the database can be recovered by
`Recover` operation.
So here you should not `removeDatabaseTransactionMgr()`, Instead, it should
be called in `CatalogRecycleBin.eraseDatabase()`
##########
File path: fe/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2686,6 +2689,7 @@ public void replayDropDb(String dbName) throws
DdlException {
idToDb.remove(db.getId());
final Cluster cluster = nameToCluster.get(db.getClusterName());
cluster.removeDb(dbName, db.getId());
+ globalTransactionMgr.removeDatabaseTransactionMgr(db.getId());
Review comment:
Same to `dropDb`, this should be called in
`CatalogRecycleBin.replayEraseDatabase()`
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
return callbackFactory;
}
+ public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws
AnalysisException {
Review comment:
```suggestion
public DatabaseTransactionMgr getDatabaseTransactionMgr(long dbId)
throws AnalysisException {
```
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -123,6 +70,22 @@ public TxnStateCallbackFactory getCallbackFactory() {
return callbackFactory;
}
+ public DatabaseTransactionMgr getDatabaseTransactioMgr(long dbId) throws
AnalysisException {
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
+ if (dbTransactionMgr == null) {
+ throw new AnalysisException("databaseTransactionMgr[" + dbId + "]
does not exist");
Review comment:
AnalysisException is not suitable here.
But it can be modified next time
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -156,114 +119,20 @@ public long beginTransaction(long dbId, List<Long>
tableIdList, String label, TU
+ Config.min_load_timeout_second + " and " +
Config.max_load_timeout_second
+ " seconds");
}
-
- writeLock();
- try {
- Preconditions.checkNotNull(coordinator);
- Preconditions.checkNotNull(label);
- FeNameFormat.checkLabel(label);
-
- /*
- * Check if label already used, by following steps
- * 1. get all existing transactions
- * 2. if there is a PREPARE transaction, check if this is a retry
request. If yes, return the
- * existing txn id.
- * 3. if there is a non-aborted transaction, throw label already
used exception.
- */
- Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
- if (existingTxnIds != null && !existingTxnIds.isEmpty()) {
- List<TransactionState> notAbortedTxns = Lists.newArrayList();
- for (long txnId : existingTxnIds) {
- TransactionState txn = idToTransactionState.get(txnId);
- Preconditions.checkNotNull(txn);
- if (txn.getTransactionStatus() !=
TransactionStatus.ABORTED) {
- notAbortedTxns.add(txn);
- }
- }
- // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE
status
- Preconditions.checkState(notAbortedTxns.size() <= 1,
notAbortedTxns);
- if (!notAbortedTxns.isEmpty()) {
- TransactionState notAbortedTxn = notAbortedTxns.get(0);
- if (requestId != null &&
notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
- && notAbortedTxn.getRequsetId() != null &&
notAbortedTxn.getRequsetId().equals(requestId)) {
- // this may be a retry request for same job, just
return existing txn id.
- throw new
DuplicatedRequestException(DebugUtil.printId(requestId),
- notAbortedTxn.getTransactionId(), "");
- }
- throw new LabelAlreadyUsedException(label,
notAbortedTxn.getTransactionStatus());
- }
- }
- checkRunningTxnExceedLimit(dbId, sourceType);
-
- long tid = idGenerator.getNextTransactionId();
- LOG.info("begin transaction: txn id {} with label {} from
coordinator {}", tid, label, coordinator);
- TransactionState transactionState = new TransactionState(dbId,
tableIdList, tid, label, requestId, sourceType,
- coordinator, listenerId, timeoutSecond * 1000);
- transactionState.setPrepareTime(System.currentTimeMillis());
- unprotectUpsertTransactionState(transactionState);
-
- if (MetricRepo.isInit.get()) {
- MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
- }
-
- return tid;
- } catch (DuplicatedRequestException e) {
- throw e;
- } catch (Exception e) {
- if (MetricRepo.isInit.get()) {
- MetricRepo.COUNTER_TXN_REJECT.increase(1L);
- }
- throw e;
- } finally {
- writeUnlock();
- }
- }
-
- private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType
sourceType) throws BeginTransactionException {
- switch (sourceType) {
- case ROUTINE_LOAD_TASK:
- // no need to check limit for routine load task:
- // 1. the number of running routine load tasks is limited by
Config.max_routine_load_task_num_per_be
- // 2. if we add routine load txn to runningTxnNums,
runningTxnNums will always be occupied by routine load,
- // and other txn may not be able to submitted.
- break;
- default:
- if (runningTxnNums.getOrDefault(dbId, 0) >=
Config.max_running_txn_num_per_db) {
- throw new BeginTransactionException("current running txns
on db " + dbId + " is "
- + runningTxnNums.get(dbId) + ", larger than limit
" + Config.max_running_txn_num_per_db);
- }
- break;
- }
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.beginTransaction(tableIdList, label,
requestId, coordinator, sourceType, listenerId, timeoutSecond);
}
public TransactionStatus getLabelState(long dbId, String label) {
- readLock();
try {
- Set<Long> existingTxnIds = dbIdToTxnLabels.get(dbId, label);
- if (existingTxnIds == null || existingTxnIds.isEmpty()) {
- return TransactionStatus.UNKNOWN;
- }
- // find the latest txn (which id is largest)
- long maxTxnId =
existingTxnIds.stream().max(Comparator.comparingLong(Long::valueOf)).get();
- return idToTransactionState.get(maxTxnId).getTransactionStatus();
- } finally {
- readUnlock();
- }
- }
-
- public void deleteTransaction(long transactionId) {
- writeLock();
- try {
- TransactionState state = idToTransactionState.get(transactionId);
- if (state == null) {
- return;
- }
- replayDeleteTransactionState(state);
- editLog.logDeleteTransactionState(state);
- } finally {
- writeUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getLabelState(label);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction status by label " + label + " failed",
e);
+ return null;
Review comment:
```suggestion
return TransactionStatus.UNKNOWN;
```
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
Review comment:
This logic(Remove the timeout txn) can also be put into
`DatabaseTransactionMgr`.
`dbTransactionMgr. removeExpiredAndTimeoutTxns();`
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
+ try {
+ dbTransactionMgr.abortTransaction(txnId, "timeout by txn
manager", null);
+ LOG.info("transaction [" + txnId + "] is timeout, abort it
by transaction manager");
+ } catch (UserException e) {
+ // abort may be failed. it is acceptable. just print a log
+ LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
}
}
- } finally {
- readUnlock();
- }
- // delete expired txns
- for (Long txnId : expiredTxns) {
- deleteTransaction(txnId);
- LOG.info("transaction [" + txnId + "] is expired, remove it from
transaction manager");
- }
-
- // abort timeout txns
- for (Long txnId : timeoutTxns) {
- try {
- abortTransaction(txnId, "timeout by txn manager");
- LOG.info("transaction [" + txnId + "] is timeout, abort it by
transaction manager");
- } catch (UserException e) {
- // abort may be failed. it is acceptable. just print a log
- LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
- }
}
}
- public TransactionState getTransactionState(long transactionId) {
- readLock();
- try {
- return idToTransactionState.get(transactionId);
- } finally {
- readUnlock();
- }
+ public TransactionState getTransactionState(long dbId, long transactionId)
{
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
+ return dbTransactionMgr.getTransactionState(transactionId);
}
public void setEditLog(EditLog editLog) {
- this.editLog = editLog;
this.idGenerator.setEditLog(editLog);
}
-
- private void readLock() {
- this.transactionLock.readLock().lock();
- }
-
- private void readUnlock() {
- this.transactionLock.readLock().unlock();
- }
-
- private void writeLock() {
- this.transactionLock.writeLock().lock();
- }
-
- private void writeUnlock() {
- this.transactionLock.writeLock().unlock();
- }
-
- // for add/update/delete TransactionState
- private void unprotectUpsertTransactionState(TransactionState
transactionState) {
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
- || transactionState.getSourceType() ==
LoadJobSourceType.FRONTEND) {
- // if this is a prepare txn, and load source type is not FRONTEND
- // no need to persist it. if prepare txn lost, the following
commit will just be failed.
- // user only need to retry this txn.
- // The FRONTEND type txn is committed and running asynchronously,
so we have to persist it.
- editLog.logInsertTransactionState(transactionState);
- }
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(transactionState.getPreStatus(),
transactionState);
- }
-
- private void unprotectedCommitTransaction(TransactionState
transactionState, Set<Long> errorReplicaIds,
- Map<Long, Set<Long>>
tableToPartition, Set<Long> totalInvolvedBackends,
- Database db) {
- // transaction state is modified during check if the transaction could
committed
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE) {
- return;
- }
- // update transaction state version
- transactionState.setCommitTime(System.currentTimeMillis());
- transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
- transactionState.setErrorReplicas(errorReplicaIds);
- for (long tableId : tableToPartition.keySet()) {
- TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
- for (long partitionId : tableToPartition.get(tableId)) {
- OlapTable table = (OlapTable) db.getTable(tableId);
- Partition partition = table.getPartition(partitionId);
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId,
-
partition.getNextVersion(),
-
partition.getNextVersionHash());
- tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
- }
- transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
- }
- // persist transactionState
- unprotectUpsertTransactionState(transactionState);
-
- // add publish version tasks. set task to null as a placeholder.
- // tasks will be created when publishing version.
- for (long backendId : totalInvolvedBackends) {
- transactionState.addPublishVersionTask(backendId, null);
- }
- }
- private boolean unprotectAbortTransaction(long transactionId, String
reason)
- throws UserException {
- TransactionState transactionState =
idToTransactionState.get(transactionId);
- if (transactionState == null) {
- throw new UserException("transaction not found");
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- return false;
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED
- || transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- throw new UserException("transaction's state is already "
- + transactionState.getTransactionStatus() + ", could not
abort");
- }
- transactionState.setFinishTime(System.currentTimeMillis());
- transactionState.setReason(reason);
- transactionState.setTransactionStatus(TransactionStatus.ABORTED);
- unprotectUpsertTransactionState(transactionState);
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- return true;
- }
-
// for replay idToTransactionState
// check point also run transaction cleaner, the cleaner maybe
concurrently modify id to
public void replayUpsertTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- // set transaction status will call txn state change listener
- transactionState.replaySetTransactionStatus();
- Database db = catalog.getDb(transactionState.getDbId());
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- LOG.info("replay a committed transaction {}",
transactionState);
- updateCatalogAfterCommitted(transactionState, db);
- } else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.info("replay a visible transaction {}", transactionState);
- updateCatalogAfterVisible(transactionState, db);
- }
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
- } finally {
- writeUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.replayUpsertTransactionState(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay upsert transaction failed", e);
Review comment:
add transaction's id in log, for easy debugging.
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -1346,81 +359,71 @@ public TransactionIdGenerator
getTransactionIDGenerator() {
@Override
public void write(DataOutput out) throws IOException {
- int numTransactions = idToTransactionState.size();
+ int numTransactions = getTransactionNum();
out.writeInt(numTransactions);
- for (Map.Entry<Long, TransactionState> entry :
idToTransactionState.entrySet()) {
- entry.getValue().write(out);
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.unprotectWriteAllTransactionStates(out);
}
idGenerator.write(out);
}
public void readFields(DataInput in) throws IOException {
- int numTransactions = in.readInt();
- for (int i = 0; i < numTransactions; ++i) {
- TransactionState transactionState = new TransactionState();
- transactionState.readFields(in);
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
+ try {
+ int numTransactions = in.readInt();
+ for (int i = 0; i < numTransactions; ++i) {
+ TransactionState transactionState = new TransactionState();
+ transactionState.readFields(in);
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+
dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
+ }
+ idGenerator.readFields(in);
+ } catch (AnalysisException e) {
+ throw new IOException("Read transaction states failed", e);
}
- idGenerator.readFields(in);
+
}
- public TransactionState getTransactionStateByCallbackIdAndStatus(long
callbackId, Set<TransactionStatus> status) {
- readLock();
+ public TransactionState getTransactionStateByCallbackIdAndStatus(long
dbId, long callbackId, Set<TransactionStatus> status) {
try {
- for (TransactionState txn : idToTransactionState.values()) {
- if (txn.getCallbackId() == callbackId &&
status.contains(txn.getTransactionStatus())) {
- return txn;
- }
- }
- } finally {
- readUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return
dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction by callbackId and status failed", e);
+ return null;
}
- return null;
}
- public TransactionState getTransactionStateByCallbackId(long callbackId) {
- readLock();
+ public TransactionState getTransactionStateByCallbackId(long dbId, long
callbackId) {
try {
- for (TransactionState txn : idToTransactionState.values()) {
- if (txn.getCallbackId() == callbackId) {
- return txn;
- }
- }
- } finally {
- readUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return
dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction by callbackId failed", e);
+ return null;
}
- return null;
}
- public List<Long> getTransactionIdByCoordinateBe(String coordinateHost,
int limit) {
- ArrayList<Long> txnIds = new ArrayList<>();
- readLock();
- try {
- idToTransactionState.values().stream()
- .filter(t -> (t.getCoordinator().sourceType ==
TransactionState.TxnSourceType.BE
- && t.getCoordinator().ip.equals(coordinateHost)
- && (!t.getTransactionStatus().isFinalStatus())))
- .limit(limit)
- .forEach(t -> txnIds.add(t.getTransactionId()));
- } finally {
- readUnlock();
+ public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
+ for (DatabaseTransactionMgr databaseTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
limit));
+ if (txnInfos.size() > limit) {
+ break;
+ }
}
- return txnIds;
+ return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0,
limit)) : txnInfos;
Review comment:
```suggestion
return txnInfos.size() > limit ? txnInfos.subList(0, limit) :
txnInfos;
```
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
+ try {
+ dbTransactionMgr.abortTransaction(txnId, "timeout by txn
manager", null);
+ LOG.info("transaction [" + txnId + "] is timeout, abort it
by transaction manager");
+ } catch (UserException e) {
+ // abort may be failed. it is acceptable. just print a log
+ LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
}
}
- } finally {
- readUnlock();
- }
- // delete expired txns
- for (Long txnId : expiredTxns) {
- deleteTransaction(txnId);
- LOG.info("transaction [" + txnId + "] is expired, remove it from
transaction manager");
- }
-
- // abort timeout txns
- for (Long txnId : timeoutTxns) {
- try {
- abortTransaction(txnId, "timeout by txn manager");
- LOG.info("transaction [" + txnId + "] is timeout, abort it by
transaction manager");
- } catch (UserException e) {
- // abort may be failed. it is acceptable. just print a log
- LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
- }
}
}
- public TransactionState getTransactionState(long transactionId) {
- readLock();
- try {
- return idToTransactionState.get(transactionId);
- } finally {
- readUnlock();
- }
+ public TransactionState getTransactionState(long dbId, long transactionId)
{
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
Review comment:
No throw exception? `dbTransactionMgr` could be `null`
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
+ try {
+ dbTransactionMgr.abortTransaction(txnId, "timeout by txn
manager", null);
+ LOG.info("transaction [" + txnId + "] is timeout, abort it
by transaction manager");
+ } catch (UserException e) {
+ // abort may be failed. it is acceptable. just print a log
+ LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
}
}
- } finally {
- readUnlock();
- }
- // delete expired txns
- for (Long txnId : expiredTxns) {
- deleteTransaction(txnId);
- LOG.info("transaction [" + txnId + "] is expired, remove it from
transaction manager");
- }
-
- // abort timeout txns
- for (Long txnId : timeoutTxns) {
- try {
- abortTransaction(txnId, "timeout by txn manager");
- LOG.info("transaction [" + txnId + "] is timeout, abort it by
transaction manager");
- } catch (UserException e) {
- // abort may be failed. it is acceptable. just print a log
- LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
- }
}
}
- public TransactionState getTransactionState(long transactionId) {
- readLock();
- try {
- return idToTransactionState.get(transactionId);
- } finally {
- readUnlock();
- }
+ public TransactionState getTransactionState(long dbId, long transactionId)
{
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
+ return dbTransactionMgr.getTransactionState(transactionId);
}
public void setEditLog(EditLog editLog) {
- this.editLog = editLog;
this.idGenerator.setEditLog(editLog);
}
-
- private void readLock() {
- this.transactionLock.readLock().lock();
- }
-
- private void readUnlock() {
- this.transactionLock.readLock().unlock();
- }
-
- private void writeLock() {
- this.transactionLock.writeLock().lock();
- }
-
- private void writeUnlock() {
- this.transactionLock.writeLock().unlock();
- }
-
- // for add/update/delete TransactionState
- private void unprotectUpsertTransactionState(TransactionState
transactionState) {
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
- || transactionState.getSourceType() ==
LoadJobSourceType.FRONTEND) {
- // if this is a prepare txn, and load source type is not FRONTEND
- // no need to persist it. if prepare txn lost, the following
commit will just be failed.
- // user only need to retry this txn.
- // The FRONTEND type txn is committed and running asynchronously,
so we have to persist it.
- editLog.logInsertTransactionState(transactionState);
- }
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(transactionState.getPreStatus(),
transactionState);
- }
-
- private void unprotectedCommitTransaction(TransactionState
transactionState, Set<Long> errorReplicaIds,
- Map<Long, Set<Long>>
tableToPartition, Set<Long> totalInvolvedBackends,
- Database db) {
- // transaction state is modified during check if the transaction could
committed
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE) {
- return;
- }
- // update transaction state version
- transactionState.setCommitTime(System.currentTimeMillis());
- transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
- transactionState.setErrorReplicas(errorReplicaIds);
- for (long tableId : tableToPartition.keySet()) {
- TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
- for (long partitionId : tableToPartition.get(tableId)) {
- OlapTable table = (OlapTable) db.getTable(tableId);
- Partition partition = table.getPartition(partitionId);
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId,
-
partition.getNextVersion(),
-
partition.getNextVersionHash());
- tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
- }
- transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
- }
- // persist transactionState
- unprotectUpsertTransactionState(transactionState);
-
- // add publish version tasks. set task to null as a placeholder.
- // tasks will be created when publishing version.
- for (long backendId : totalInvolvedBackends) {
- transactionState.addPublishVersionTask(backendId, null);
- }
- }
- private boolean unprotectAbortTransaction(long transactionId, String
reason)
- throws UserException {
- TransactionState transactionState =
idToTransactionState.get(transactionId);
- if (transactionState == null) {
- throw new UserException("transaction not found");
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- return false;
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED
- || transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- throw new UserException("transaction's state is already "
- + transactionState.getTransactionStatus() + ", could not
abort");
- }
- transactionState.setFinishTime(System.currentTimeMillis());
- transactionState.setReason(reason);
- transactionState.setTransactionStatus(TransactionStatus.ABORTED);
- unprotectUpsertTransactionState(transactionState);
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- return true;
- }
-
// for replay idToTransactionState
// check point also run transaction cleaner, the cleaner maybe
concurrently modify id to
public void replayUpsertTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- // set transaction status will call txn state change listener
- transactionState.replaySetTransactionStatus();
- Database db = catalog.getDb(transactionState.getDbId());
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- LOG.info("replay a committed transaction {}",
transactionState);
- updateCatalogAfterCommitted(transactionState, db);
- } else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.info("replay a visible transaction {}", transactionState);
- updateCatalogAfterVisible(transactionState, db);
- }
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
- } finally {
- writeUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.replayUpsertTransactionState(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay upsert transaction failed", e);
}
+
}
public void replayDeleteTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- idToTransactionState.remove(transactionState.getTransactionId());
- Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
- txnIds.remove(transactionState.getTransactionId());
- if (txnIds.isEmpty()) {
- dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
- }
- } finally {
- writeUnlock();
- }
- }
-
- private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db) {
- Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTable(tableId);
- for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
- long partitionId = partitionCommitInfo.getPartitionId();
- Partition partition = table.getPartition(partitionId);
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex index : allIndices) {
- List<Tablet> tablets = index.getTablets();
- for (Tablet tablet : tablets) {
- for (Replica replica : tablet.getReplicas()) {
- if (errorReplicaIds.contains(replica.getId())) {
- // should not use partition.getNextVersion and
partition.getNextVersionHash because partition's next version hash is generated
locally
- // should get from transaction state
-
replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-
partitionCommitInfo.getVersionHash());
- }
- }
- }
- }
- partition.setNextVersion(partition.getNextVersion() + 1);
- // Although committed version(hash) is not visible to user,
- // but they need to be synchronized among Frontends.
- // because we use committed version(hash) to create clone
task, if the first Master FE
- // send clone task with committed version hash X, and than
Master changed, the new Master FE
- // received the clone task report with version hash X, which
not equals to it own committed
- // version hash, than the clone task is failed.
- partition.setNextVersionHash(Util.generateVersionHash() /*
next version hash */,
-
partitionCommitInfo.getVersionHash() /* committed version hash*/);
- }
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.deleteTransaction(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay delete transaction failed", e);
}
}
-
- private boolean updateCatalogAfterVisible(TransactionState
transactionState, Database db) {
- Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTable(tableId);
- for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
- long partitionId = partitionCommitInfo.getPartitionId();
- long newCommitVersion = partitionCommitInfo.getVersion();
- long newCommitVersionHash =
partitionCommitInfo.getVersionHash();
- Partition partition = table.getPartition(partitionId);
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex index : allIndices) {
- for (Tablet tablet : index.getTablets()) {
- for (Replica replica : tablet.getReplicas()) {
- long lastFailedVersion =
replica.getLastFailedVersion();
- long lastFailedVersionHash =
replica.getLastFailedVersionHash();
- long newVersion = newCommitVersion;
- long newVersionHash = newCommitVersionHash;
- long lastSucessVersion =
replica.getLastSuccessVersion();
- long lastSuccessVersionHash =
replica.getLastSuccessVersionHash();
- if (!errorReplicaIds.contains(replica.getId())) {
- if (replica.getLastFailedVersion() > 0) {
- // if the replica is a failed replica,
then not changing version and version hash
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- } else if
(!replica.checkVersionCatchUp(partition.getVisibleVersion(),
- partition.getVisibleVersionHash(),
true)) {
- // this means the replica has error in the
past, but we did not observe it
- // during upgrade, one job maybe in quorum
finished state, for example, A,B,C 3 replica
- // A,B 's version is 10, C's version is 10
but C' 10 is abnormal should be rollback
- // then we will detect this and set C's
last failed version to 10 and last success version to 11
- // this logic has to be replayed in
checkpoint thread
- lastFailedVersion =
partition.getVisibleVersion();
- lastFailedVersionHash =
partition.getVisibleVersionHash();
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- }
- // success version always move forward
- lastSucessVersion = newCommitVersion;
- lastSuccessVersionHash = newCommitVersionHash;
- } else {
- // for example, A,B,C 3 replicas, B,C failed
during publish version, then B C will be set abnormal
- // all loading will failed, B,C will have to
recovery by clone, it is very inefficient and maybe lost data
- // Using this method, B,C will publish failed,
and fe will publish again, not update their last failed version
- // if B is publish successfully in next turn,
then B is normal and C will be set abnormal so that quorum is maintained
- // and loading will go on.
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- if (newCommitVersion > lastFailedVersion) {
- lastFailedVersion = newCommitVersion;
- lastFailedVersionHash =
newCommitVersionHash;
- }
- }
- replica.updateVersionInfo(newVersion,
newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion,
lastSuccessVersionHash);
- }
- }
- } // end for indices
- long version = partitionCommitInfo.getVersion();
- long versionHash = partitionCommitInfo.getVersionHash();
- partition.updateVisibleVersionAndVersionHash(version,
versionHash);
- if (LOG.isDebugEnabled()) {
- LOG.debug("transaction state {} set partition {}'s version
to [{}] and version hash to [{}]",
- transactionState, partition.getId(), version,
versionHash);
- }
- }
- }
- return true;
- }
-
- private void updateTxnLabels(TransactionState transactionState) {
- Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
- if (txnIds == null) {
- txnIds = Sets.newHashSet();
- dbIdToTxnLabels.put(transactionState.getDbId(),
transactionState.getLabel(), txnIds);
- }
- txnIds.add(transactionState.getTransactionId());
- }
-
- private void updateDbRunningTxnNum(TransactionStatus preStatus,
TransactionState curTxnState) {
- Map<Long, Integer> txnNumMap = null;
- if (curTxnState.getSourceType() ==
LoadJobSourceType.ROUTINE_LOAD_TASK) {
- txnNumMap = runningRoutineLoadTxnNums;
- } else {
- txnNumMap = runningTxnNums;
- }
-
- int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0);
- if (preStatus == null
- && (curTxnState.getTransactionStatus() ==
TransactionStatus.PREPARE
- || curTxnState.getTransactionStatus() ==
TransactionStatus.COMMITTED)) {
- ++txnNum;
- } else if ((preStatus == TransactionStatus.PREPARE
- || preStatus == TransactionStatus.COMMITTED)
- && (curTxnState.getTransactionStatus() ==
TransactionStatus.VISIBLE
- || curTxnState.getTransactionStatus() ==
TransactionStatus.ABORTED)) {
- --txnNum;
- }
-
- if (txnNum < 1) {
- txnNumMap.remove(curTxnState.getDbId());
- } else {
- txnNumMap.put(curTxnState.getDbId(), txnNum);
- }
- }
-
public List<List<Comparable>> getDbInfo() {
List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
- readLock();
- try {
- Set<Long> dbIds = new HashSet<>();
- for (TransactionState transactionState :
idToTransactionState.values()) {
- dbIds.add(transactionState.getDbId());
- }
- for (long dbId : dbIds) {
- List<Comparable> info = new ArrayList<Comparable>();
- info.add(dbId);
- Database db = Catalog.getInstance().getDb(dbId);
- if (db == null) {
- continue;
- }
- info.add(db.getFullName());
- infos.add(info);
+ List<Long> dbIds = Lists.newArrayList();
Review comment:
```suggestion
List<Long> dbIds =
Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
```
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
+ try {
+ dbTransactionMgr.abortTransaction(txnId, "timeout by txn
manager", null);
+ LOG.info("transaction [" + txnId + "] is timeout, abort it
by transaction manager");
+ } catch (UserException e) {
+ // abort may be failed. it is acceptable. just print a log
+ LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
}
}
- } finally {
- readUnlock();
- }
- // delete expired txns
- for (Long txnId : expiredTxns) {
- deleteTransaction(txnId);
- LOG.info("transaction [" + txnId + "] is expired, remove it from
transaction manager");
- }
-
- // abort timeout txns
- for (Long txnId : timeoutTxns) {
- try {
- abortTransaction(txnId, "timeout by txn manager");
- LOG.info("transaction [" + txnId + "] is timeout, abort it by
transaction manager");
- } catch (UserException e) {
- // abort may be failed. it is acceptable. just print a log
- LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
- }
}
}
- public TransactionState getTransactionState(long transactionId) {
- readLock();
- try {
- return idToTransactionState.get(transactionId);
- } finally {
- readUnlock();
- }
+ public TransactionState getTransactionState(long dbId, long transactionId)
{
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
+ return dbTransactionMgr.getTransactionState(transactionId);
}
public void setEditLog(EditLog editLog) {
- this.editLog = editLog;
this.idGenerator.setEditLog(editLog);
}
-
- private void readLock() {
- this.transactionLock.readLock().lock();
- }
-
- private void readUnlock() {
- this.transactionLock.readLock().unlock();
- }
-
- private void writeLock() {
- this.transactionLock.writeLock().lock();
- }
-
- private void writeUnlock() {
- this.transactionLock.writeLock().unlock();
- }
-
- // for add/update/delete TransactionState
- private void unprotectUpsertTransactionState(TransactionState
transactionState) {
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
- || transactionState.getSourceType() ==
LoadJobSourceType.FRONTEND) {
- // if this is a prepare txn, and load source type is not FRONTEND
- // no need to persist it. if prepare txn lost, the following
commit will just be failed.
- // user only need to retry this txn.
- // The FRONTEND type txn is committed and running asynchronously,
so we have to persist it.
- editLog.logInsertTransactionState(transactionState);
- }
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(transactionState.getPreStatus(),
transactionState);
- }
-
- private void unprotectedCommitTransaction(TransactionState
transactionState, Set<Long> errorReplicaIds,
- Map<Long, Set<Long>>
tableToPartition, Set<Long> totalInvolvedBackends,
- Database db) {
- // transaction state is modified during check if the transaction could
committed
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE) {
- return;
- }
- // update transaction state version
- transactionState.setCommitTime(System.currentTimeMillis());
- transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
- transactionState.setErrorReplicas(errorReplicaIds);
- for (long tableId : tableToPartition.keySet()) {
- TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
- for (long partitionId : tableToPartition.get(tableId)) {
- OlapTable table = (OlapTable) db.getTable(tableId);
- Partition partition = table.getPartition(partitionId);
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId,
-
partition.getNextVersion(),
-
partition.getNextVersionHash());
- tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
- }
- transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
- }
- // persist transactionState
- unprotectUpsertTransactionState(transactionState);
-
- // add publish version tasks. set task to null as a placeholder.
- // tasks will be created when publishing version.
- for (long backendId : totalInvolvedBackends) {
- transactionState.addPublishVersionTask(backendId, null);
- }
- }
- private boolean unprotectAbortTransaction(long transactionId, String
reason)
- throws UserException {
- TransactionState transactionState =
idToTransactionState.get(transactionId);
- if (transactionState == null) {
- throw new UserException("transaction not found");
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- return false;
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED
- || transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- throw new UserException("transaction's state is already "
- + transactionState.getTransactionStatus() + ", could not
abort");
- }
- transactionState.setFinishTime(System.currentTimeMillis());
- transactionState.setReason(reason);
- transactionState.setTransactionStatus(TransactionStatus.ABORTED);
- unprotectUpsertTransactionState(transactionState);
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- return true;
- }
-
// for replay idToTransactionState
// check point also run transaction cleaner, the cleaner maybe
concurrently modify id to
public void replayUpsertTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- // set transaction status will call txn state change listener
- transactionState.replaySetTransactionStatus();
- Database db = catalog.getDb(transactionState.getDbId());
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- LOG.info("replay a committed transaction {}",
transactionState);
- updateCatalogAfterCommitted(transactionState, db);
- } else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.info("replay a visible transaction {}", transactionState);
- updateCatalogAfterVisible(transactionState, db);
- }
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
- } finally {
- writeUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.replayUpsertTransactionState(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay upsert transaction failed", e);
}
+
}
public void replayDeleteTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- idToTransactionState.remove(transactionState.getTransactionId());
- Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
- txnIds.remove(transactionState.getTransactionId());
- if (txnIds.isEmpty()) {
- dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
- }
- } finally {
- writeUnlock();
- }
- }
-
- private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db) {
- Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTable(tableId);
- for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
- long partitionId = partitionCommitInfo.getPartitionId();
- Partition partition = table.getPartition(partitionId);
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex index : allIndices) {
- List<Tablet> tablets = index.getTablets();
- for (Tablet tablet : tablets) {
- for (Replica replica : tablet.getReplicas()) {
- if (errorReplicaIds.contains(replica.getId())) {
- // should not use partition.getNextVersion and
partition.getNextVersionHash because partition's next version hash is generated
locally
- // should get from transaction state
-
replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-
partitionCommitInfo.getVersionHash());
- }
- }
- }
- }
- partition.setNextVersion(partition.getNextVersion() + 1);
- // Although committed version(hash) is not visible to user,
- // but they need to be synchronized among Frontends.
- // because we use committed version(hash) to create clone
task, if the first Master FE
- // send clone task with committed version hash X, and than
Master changed, the new Master FE
- // received the clone task report with version hash X, which
not equals to it own committed
- // version hash, than the clone task is failed.
- partition.setNextVersionHash(Util.generateVersionHash() /*
next version hash */,
-
partitionCommitInfo.getVersionHash() /* committed version hash*/);
- }
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.deleteTransaction(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay delete transaction failed", e);
Review comment:
Add txn id in log.
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -17,69 +17,33 @@
package org.apache.doris.transaction;
+import org.apache.commons.lang3.tuple.Pair;
Review comment:
You can use `org.apache.doris.common.Pair`
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -851,493 +243,114 @@ public boolean isIntersectionNotEmpty(List<Long>
sourceTableIdList, List<Long> t
*/
public void removeExpiredAndTimeoutTxns() {
long currentMillis = System.currentTimeMillis();
-
- List<Long> timeoutTxns = Lists.newArrayList();
- List<Long> expiredTxns = Lists.newArrayList();
- readLock();
- try {
- for (TransactionState transactionState :
idToTransactionState.values()) {
- if (transactionState.isExpired(currentMillis)) {
- // remove the txn which labels are expired
- expiredTxns.add(transactionState.getTransactionId());
- } else if (transactionState.isTimeout(currentMillis)) {
- // txn is running but timeout, abort it.
- timeoutTxns.add(transactionState.getTransactionId());
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.removeExpiredTxns();
+ List<Long> timeoutTxns =
dbTransactionMgr.getTimeoutTxns(currentMillis);
+ // abort timeout txns
+ for (Long txnId : timeoutTxns) {
+ try {
+ dbTransactionMgr.abortTransaction(txnId, "timeout by txn
manager", null);
+ LOG.info("transaction [" + txnId + "] is timeout, abort it
by transaction manager");
+ } catch (UserException e) {
+ // abort may be failed. it is acceptable. just print a log
+ LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
}
}
- } finally {
- readUnlock();
- }
- // delete expired txns
- for (Long txnId : expiredTxns) {
- deleteTransaction(txnId);
- LOG.info("transaction [" + txnId + "] is expired, remove it from
transaction manager");
- }
-
- // abort timeout txns
- for (Long txnId : timeoutTxns) {
- try {
- abortTransaction(txnId, "timeout by txn manager");
- LOG.info("transaction [" + txnId + "] is timeout, abort it by
transaction manager");
- } catch (UserException e) {
- // abort may be failed. it is acceptable. just print a log
- LOG.warn("abort timeout txn {} failed. msg: {}", txnId,
e.getMessage());
- }
}
}
- public TransactionState getTransactionState(long transactionId) {
- readLock();
- try {
- return idToTransactionState.get(transactionId);
- } finally {
- readUnlock();
- }
+ public TransactionState getTransactionState(long dbId, long transactionId)
{
+ DatabaseTransactionMgr dbTransactionMgr =
dbIdToDatabaseTransactionMgrs.get(dbId);
+ return dbTransactionMgr.getTransactionState(transactionId);
}
public void setEditLog(EditLog editLog) {
- this.editLog = editLog;
this.idGenerator.setEditLog(editLog);
}
-
- private void readLock() {
- this.transactionLock.readLock().lock();
- }
-
- private void readUnlock() {
- this.transactionLock.readLock().unlock();
- }
-
- private void writeLock() {
- this.transactionLock.writeLock().lock();
- }
-
- private void writeUnlock() {
- this.transactionLock.writeLock().unlock();
- }
-
- // for add/update/delete TransactionState
- private void unprotectUpsertTransactionState(TransactionState
transactionState) {
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE
- || transactionState.getSourceType() ==
LoadJobSourceType.FRONTEND) {
- // if this is a prepare txn, and load source type is not FRONTEND
- // no need to persist it. if prepare txn lost, the following
commit will just be failed.
- // user only need to retry this txn.
- // The FRONTEND type txn is committed and running asynchronously,
so we have to persist it.
- editLog.logInsertTransactionState(transactionState);
- }
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(transactionState.getPreStatus(),
transactionState);
- }
-
- private void unprotectedCommitTransaction(TransactionState
transactionState, Set<Long> errorReplicaIds,
- Map<Long, Set<Long>>
tableToPartition, Set<Long> totalInvolvedBackends,
- Database db) {
- // transaction state is modified during check if the transaction could
committed
- if (transactionState.getTransactionStatus() !=
TransactionStatus.PREPARE) {
- return;
- }
- // update transaction state version
- transactionState.setCommitTime(System.currentTimeMillis());
- transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
- transactionState.setErrorReplicas(errorReplicaIds);
- for (long tableId : tableToPartition.keySet()) {
- TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
- for (long partitionId : tableToPartition.get(tableId)) {
- OlapTable table = (OlapTable) db.getTable(tableId);
- Partition partition = table.getPartition(partitionId);
- PartitionCommitInfo partitionCommitInfo = new
PartitionCommitInfo(partitionId,
-
partition.getNextVersion(),
-
partition.getNextVersionHash());
- tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
- }
- transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
- }
- // persist transactionState
- unprotectUpsertTransactionState(transactionState);
-
- // add publish version tasks. set task to null as a placeholder.
- // tasks will be created when publishing version.
- for (long backendId : totalInvolvedBackends) {
- transactionState.addPublishVersionTask(backendId, null);
- }
- }
- private boolean unprotectAbortTransaction(long transactionId, String
reason)
- throws UserException {
- TransactionState transactionState =
idToTransactionState.get(transactionId);
- if (transactionState == null) {
- throw new UserException("transaction not found");
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.ABORTED) {
- return false;
- }
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED
- || transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- throw new UserException("transaction's state is already "
- + transactionState.getTransactionStatus() + ", could not
abort");
- }
- transactionState.setFinishTime(System.currentTimeMillis());
- transactionState.setReason(reason);
- transactionState.setTransactionStatus(TransactionStatus.ABORTED);
- unprotectUpsertTransactionState(transactionState);
- for (PublishVersionTask task :
transactionState.getPublishVersionTasks().values()) {
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUBLISH_VERSION, task.getSignature());
- }
- return true;
- }
-
// for replay idToTransactionState
// check point also run transaction cleaner, the cleaner maybe
concurrently modify id to
public void replayUpsertTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- // set transaction status will call txn state change listener
- transactionState.replaySetTransactionStatus();
- Database db = catalog.getDb(transactionState.getDbId());
- if (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED) {
- LOG.info("replay a committed transaction {}",
transactionState);
- updateCatalogAfterCommitted(transactionState, db);
- } else if (transactionState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
- LOG.info("replay a visible transaction {}", transactionState);
- updateCatalogAfterVisible(transactionState, db);
- }
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
- } finally {
- writeUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.replayUpsertTransactionState(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay upsert transaction failed", e);
}
+
}
public void replayDeleteTransactionState(TransactionState
transactionState) {
- writeLock();
try {
- idToTransactionState.remove(transactionState.getTransactionId());
- Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
- txnIds.remove(transactionState.getTransactionId());
- if (txnIds.isEmpty()) {
- dbIdToTxnLabels.remove(transactionState.getDbId(),
transactionState.getLabel());
- }
- } finally {
- writeUnlock();
- }
- }
-
- private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db) {
- Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTable(tableId);
- for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
- long partitionId = partitionCommitInfo.getPartitionId();
- Partition partition = table.getPartition(partitionId);
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex index : allIndices) {
- List<Tablet> tablets = index.getTablets();
- for (Tablet tablet : tablets) {
- for (Replica replica : tablet.getReplicas()) {
- if (errorReplicaIds.contains(replica.getId())) {
- // should not use partition.getNextVersion and
partition.getNextVersionHash because partition's next version hash is generated
locally
- // should get from transaction state
-
replica.updateLastFailedVersion(partitionCommitInfo.getVersion(),
-
partitionCommitInfo.getVersionHash());
- }
- }
- }
- }
- partition.setNextVersion(partition.getNextVersion() + 1);
- // Although committed version(hash) is not visible to user,
- // but they need to be synchronized among Frontends.
- // because we use committed version(hash) to create clone
task, if the first Master FE
- // send clone task with committed version hash X, and than
Master changed, the new Master FE
- // received the clone task report with version hash X, which
not equals to it own committed
- // version hash, than the clone task is failed.
- partition.setNextVersionHash(Util.generateVersionHash() /*
next version hash */,
-
partitionCommitInfo.getVersionHash() /* committed version hash*/);
- }
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+ dbTransactionMgr.deleteTransaction(transactionState);
+ } catch (AnalysisException e) {
+ LOG.warn("replay delete transaction failed", e);
}
}
-
- private boolean updateCatalogAfterVisible(TransactionState
transactionState, Database db) {
- Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
- for (TableCommitInfo tableCommitInfo :
transactionState.getIdToTableCommitInfos().values()) {
- long tableId = tableCommitInfo.getTableId();
- OlapTable table = (OlapTable) db.getTable(tableId);
- for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
- long partitionId = partitionCommitInfo.getPartitionId();
- long newCommitVersion = partitionCommitInfo.getVersion();
- long newCommitVersionHash =
partitionCommitInfo.getVersionHash();
- Partition partition = table.getPartition(partitionId);
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex index : allIndices) {
- for (Tablet tablet : index.getTablets()) {
- for (Replica replica : tablet.getReplicas()) {
- long lastFailedVersion =
replica.getLastFailedVersion();
- long lastFailedVersionHash =
replica.getLastFailedVersionHash();
- long newVersion = newCommitVersion;
- long newVersionHash = newCommitVersionHash;
- long lastSucessVersion =
replica.getLastSuccessVersion();
- long lastSuccessVersionHash =
replica.getLastSuccessVersionHash();
- if (!errorReplicaIds.contains(replica.getId())) {
- if (replica.getLastFailedVersion() > 0) {
- // if the replica is a failed replica,
then not changing version and version hash
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- } else if
(!replica.checkVersionCatchUp(partition.getVisibleVersion(),
- partition.getVisibleVersionHash(),
true)) {
- // this means the replica has error in the
past, but we did not observe it
- // during upgrade, one job maybe in quorum
finished state, for example, A,B,C 3 replica
- // A,B 's version is 10, C's version is 10
but C' 10 is abnormal should be rollback
- // then we will detect this and set C's
last failed version to 10 and last success version to 11
- // this logic has to be replayed in
checkpoint thread
- lastFailedVersion =
partition.getVisibleVersion();
- lastFailedVersionHash =
partition.getVisibleVersionHash();
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- }
- // success version always move forward
- lastSucessVersion = newCommitVersion;
- lastSuccessVersionHash = newCommitVersionHash;
- } else {
- // for example, A,B,C 3 replicas, B,C failed
during publish version, then B C will be set abnormal
- // all loading will failed, B,C will have to
recovery by clone, it is very inefficient and maybe lost data
- // Using this method, B,C will publish failed,
and fe will publish again, not update their last failed version
- // if B is publish successfully in next turn,
then B is normal and C will be set abnormal so that quorum is maintained
- // and loading will go on.
- newVersion = replica.getVersion();
- newVersionHash = replica.getVersionHash();
- if (newCommitVersion > lastFailedVersion) {
- lastFailedVersion = newCommitVersion;
- lastFailedVersionHash =
newCommitVersionHash;
- }
- }
- replica.updateVersionInfo(newVersion,
newVersionHash, lastFailedVersion, lastFailedVersionHash, lastSucessVersion,
lastSuccessVersionHash);
- }
- }
- } // end for indices
- long version = partitionCommitInfo.getVersion();
- long versionHash = partitionCommitInfo.getVersionHash();
- partition.updateVisibleVersionAndVersionHash(version,
versionHash);
- if (LOG.isDebugEnabled()) {
- LOG.debug("transaction state {} set partition {}'s version
to [{}] and version hash to [{}]",
- transactionState, partition.getId(), version,
versionHash);
- }
- }
- }
- return true;
- }
-
- private void updateTxnLabels(TransactionState transactionState) {
- Set<Long> txnIds = dbIdToTxnLabels.get(transactionState.getDbId(),
transactionState.getLabel());
- if (txnIds == null) {
- txnIds = Sets.newHashSet();
- dbIdToTxnLabels.put(transactionState.getDbId(),
transactionState.getLabel(), txnIds);
- }
- txnIds.add(transactionState.getTransactionId());
- }
-
- private void updateDbRunningTxnNum(TransactionStatus preStatus,
TransactionState curTxnState) {
- Map<Long, Integer> txnNumMap = null;
- if (curTxnState.getSourceType() ==
LoadJobSourceType.ROUTINE_LOAD_TASK) {
- txnNumMap = runningRoutineLoadTxnNums;
- } else {
- txnNumMap = runningTxnNums;
- }
-
- int txnNum = txnNumMap.getOrDefault(curTxnState.getDbId(), 0);
- if (preStatus == null
- && (curTxnState.getTransactionStatus() ==
TransactionStatus.PREPARE
- || curTxnState.getTransactionStatus() ==
TransactionStatus.COMMITTED)) {
- ++txnNum;
- } else if ((preStatus == TransactionStatus.PREPARE
- || preStatus == TransactionStatus.COMMITTED)
- && (curTxnState.getTransactionStatus() ==
TransactionStatus.VISIBLE
- || curTxnState.getTransactionStatus() ==
TransactionStatus.ABORTED)) {
- --txnNum;
- }
-
- if (txnNum < 1) {
- txnNumMap.remove(curTxnState.getDbId());
- } else {
- txnNumMap.put(curTxnState.getDbId(), txnNum);
- }
- }
-
public List<List<Comparable>> getDbInfo() {
List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
- readLock();
- try {
- Set<Long> dbIds = new HashSet<>();
- for (TransactionState transactionState :
idToTransactionState.values()) {
- dbIds.add(transactionState.getDbId());
- }
- for (long dbId : dbIds) {
- List<Comparable> info = new ArrayList<Comparable>();
- info.add(dbId);
- Database db = Catalog.getInstance().getDb(dbId);
- if (db == null) {
- continue;
- }
- info.add(db.getFullName());
- infos.add(info);
+ List<Long> dbIds = Lists.newArrayList();
+ for (Long dbId : dbIdToDatabaseTransactionMgrs.keySet()) {
+ dbIds.add(dbId);
+ }
+
+ for (long dbId : dbIds) {
+ List<Comparable> info = new ArrayList<Comparable>();
+ info.add(dbId);
+ Database db = Catalog.getInstance().getDb(dbId);
+ if (db == null) {
+ continue;
}
- } finally {
- readUnlock();
+ info.add(db.getFullName());
+ infos.add(info);
}
return infos;
}
public List<List<String>> getDbTransStateInfo(long dbId) {
- List<List<String>> infos = Lists.newArrayList();
- readLock();
try {
- infos.add(Lists.newArrayList("running", String.valueOf(
- runningTxnNums.getOrDefault(dbId, 0) +
runningRoutineLoadTxnNums.getOrDefault(dbId, 0))));
- long finishedNum = idToTransactionState.values().stream().filter(
- t -> (t.getDbId() == dbId &&
t.getTransactionStatus().isFinalStatus())).count();
- infos.add(Lists.newArrayList("finished",
String.valueOf(finishedNum)));
- } finally {
- readUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getDbTransStateInfo();
+ } catch (AnalysisException e) {
+ LOG.warn("Get db transaction state info failed", e);
+ return Lists.newArrayList();
}
- return infos;
}
public List<List<String>> getDbTransInfo(long dbId, boolean running, int
limit) throws AnalysisException {
- List<List<String>> infos = new ArrayList<>();
- readLock();
- try {
- Database db = Catalog.getInstance().getDb(dbId);
- if (db == null) {
- throw new AnalysisException("Database[" + dbId + "] does not
exist");
- }
-
- // get transaction order by txn id desc limit 'limit'
- idToTransactionState.values().stream()
- .filter(t -> (t.getDbId() == dbId && (running !=
t.getTransactionStatus().isFinalStatus())))
- .sorted(TransactionState.TXN_ID_COMPARATOR)
- .limit(limit)
- .forEach(t -> {
- List<String> info = Lists.newArrayList();
- getTxnStateInfo(t, info);
- infos.add(info);
- });
- } finally {
- readUnlock();
- }
- return infos;
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getTxnStateInfoList(running, limit);
}
// get show info of a specified txnId
public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws
AnalysisException {
- List<List<String>> infos = new ArrayList<List<String>>();
- readLock();
- try {
- Database db = Catalog.getInstance().getDb(dbId);
- if (db == null) {
- throw new AnalysisException("Database[" + dbId + "] does not
exist");
- }
-
- TransactionState txnState = idToTransactionState.get(txnId);
- if (txnState == null) {
- throw new AnalysisException("transaction with id " + txnId + "
does not exist");
- }
-
- if (ConnectContext.get() != null) {
- // check auth
- Set<Long> tblIds = txnState.getIdToTableCommitInfos().keySet();
- for (Long tblId : tblIds) {
- Table tbl = db.getTable(tblId);
- if (tbl != null) {
- if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
db.getFullName(),
- tbl.getName(), PrivPredicate.SHOW)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
- "SHOW TRANSACTION",
- ConnectContext.get().getQualifiedUser(),
- ConnectContext.get().getRemoteIP(),
- tbl.getName());
- }
- }
- }
- }
-
- List<String> info = Lists.newArrayList();
- getTxnStateInfo(txnState, info);
- infos.add(info);
- } finally {
- readUnlock();
- }
- return infos;
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getSingleTranInfo(dbId, txnId);
}
-
- private void getTxnStateInfo(TransactionState txnState, List<String> info)
{
- info.add(String.valueOf(txnState.getTransactionId()));
- info.add(txnState.getLabel());
- info.add(txnState.getCoordinator().toString());
- info.add(txnState.getTransactionStatus().name());
- info.add(txnState.getSourceType().name());
- info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
- info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
- info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
- info.add(txnState.getReason());
- info.add(String.valueOf(txnState.getErrorReplicas().size()));
- info.add(String.valueOf(txnState.getCallbackId()));
- info.add(String.valueOf(txnState.getTimeoutMs()));
- }
-
- public List<List<Comparable>> getTableTransInfo(long txnId) throws
AnalysisException {
- List<List<Comparable>> tableInfos = new ArrayList<>();
- readLock();
- try {
- TransactionState transactionState =
idToTransactionState.get(txnId);
- if (null == transactionState) {
- throw new AnalysisException("Transaction[" + txnId + "] does
not exist.");
- }
- for (Map.Entry<Long, TableCommitInfo> entry :
transactionState.getIdToTableCommitInfos().entrySet()) {
- List<Comparable> tableInfo = new ArrayList<>();
- tableInfo.add(entry.getKey());
- tableInfo.add(Joiner.on(",
").join(entry.getValue().getIdToPartitionCommitInfo().values().stream().map(
-
PartitionCommitInfo::getPartitionId).collect(Collectors.toList())));
- tableInfos.add(tableInfo);
- }
- } finally {
- readUnlock();
- }
- return tableInfos;
+ public List<List<Comparable>> getTableTransInfo(long dbId, long txnId)
throws AnalysisException {
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getTableTransInfo(txnId);
}
- public List<List<Comparable>> getPartitionTransInfo(long tid, long tableId)
+ public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid,
long tableId)
throws AnalysisException {
- List<List<Comparable>> partitionInfos = new
ArrayList<List<Comparable>>();
- readLock();
- try {
- TransactionState transactionState = idToTransactionState.get(tid);
- if (null == transactionState) {
- throw new AnalysisException("Transaction[" + tid + "] does not
exist.");
- }
- TableCommitInfo tableCommitInfo =
transactionState.getIdToTableCommitInfos().get(tableId);
- Map<Long, PartitionCommitInfo> idToPartitionCommitInfo =
tableCommitInfo.getIdToPartitionCommitInfo();
- for (Map.Entry<Long, PartitionCommitInfo> entry :
idToPartitionCommitInfo.entrySet()) {
- List<Comparable> partitionInfo = new ArrayList<Comparable>();
- partitionInfo.add(entry.getKey());
- partitionInfo.add(entry.getValue().getVersion());
- partitionInfo.add(entry.getValue().getVersionHash());
- partitionInfos.add(partitionInfo);
- }
- } finally {
- readUnlock();
- }
- return partitionInfos;
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return dbTransactionMgr.getPartitionTransInfo(tid, tableId);
}
-
+
+ /**
+ * It is a non thread safe method, only invoked by checkpoint thread
without any lock or image dump thread with db lock
+ */
public int getTransactionNum() {
- return this.idToTransactionState.size();
+ int txnNum = 0;
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ txnNum = txnNum + dbTransactionMgr.getTransactionNum();
Review comment:
```suggestion
txnNum += dbTransactionMgr.getTransactionNum();
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]