morningman commented on a change in pull request #3369:
URL: https://github.com/apache/incubator-doris/pull/3369#discussion_r422612957
##########
File path:
fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
##########
@@ -1346,81 +359,71 @@ public TransactionIdGenerator
getTransactionIDGenerator() {
@Override
public void write(DataOutput out) throws IOException {
- int numTransactions = idToTransactionState.size();
+ int numTransactions = getTransactionNum();
out.writeInt(numTransactions);
- for (Map.Entry<Long, TransactionState> entry :
idToTransactionState.entrySet()) {
- entry.getValue().write(out);
+ for (DatabaseTransactionMgr dbTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+ dbTransactionMgr.unprotectWriteAllTransactionStates(out);
}
idGenerator.write(out);
}
public void readFields(DataInput in) throws IOException {
- int numTransactions = in.readInt();
- for (int i = 0; i < numTransactions; ++i) {
- TransactionState transactionState = new TransactionState();
- transactionState.readFields(in);
- TransactionState preTxnState =
idToTransactionState.get(transactionState.getTransactionId());
- idToTransactionState.put(transactionState.getTransactionId(),
transactionState);
- updateTxnLabels(transactionState);
- updateDbRunningTxnNum(preTxnState == null ? null :
preTxnState.getTransactionStatus(),
- transactionState);
+ try {
+ int numTransactions = in.readInt();
+ for (int i = 0; i < numTransactions; ++i) {
+ TransactionState transactionState = new TransactionState();
+ transactionState.readFields(in);
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(transactionState.getDbId());
+
dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
+ }
+ idGenerator.readFields(in);
+ } catch (AnalysisException e) {
+ throw new IOException("Read transaction states failed", e);
}
- idGenerator.readFields(in);
+
}
- public TransactionState getTransactionStateByCallbackIdAndStatus(long
callbackId, Set<TransactionStatus> status) {
- readLock();
+ public TransactionState getTransactionStateByCallbackIdAndStatus(long
dbId, long callbackId, Set<TransactionStatus> status) {
try {
- for (TransactionState txn : idToTransactionState.values()) {
- if (txn.getCallbackId() == callbackId &&
status.contains(txn.getTransactionStatus())) {
- return txn;
- }
- }
- } finally {
- readUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return
dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction by callbackId and status failed", e);
+ return null;
}
- return null;
}
- public TransactionState getTransactionStateByCallbackId(long callbackId) {
- readLock();
+ public TransactionState getTransactionStateByCallbackId(long dbId, long
callbackId) {
try {
- for (TransactionState txn : idToTransactionState.values()) {
- if (txn.getCallbackId() == callbackId) {
- return txn;
- }
- }
- } finally {
- readUnlock();
+ DatabaseTransactionMgr dbTransactionMgr =
getDatabaseTransactioMgr(dbId);
+ return
dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
+ } catch (AnalysisException e) {
+ LOG.warn("Get transaction by callbackId failed", e);
+ return null;
}
- return null;
}
- public List<Long> getTransactionIdByCoordinateBe(String coordinateHost,
int limit) {
- ArrayList<Long> txnIds = new ArrayList<>();
- readLock();
- try {
- idToTransactionState.values().stream()
- .filter(t -> (t.getCoordinator().sourceType ==
TransactionState.TxnSourceType.BE
- && t.getCoordinator().ip.equals(coordinateHost)
- && (!t.getTransactionStatus().isFinalStatus())))
- .limit(limit)
- .forEach(t -> txnIds.add(t.getTransactionId()));
- } finally {
- readUnlock();
+ public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String
coordinateHost, int limit) {
+ ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
+ for (DatabaseTransactionMgr databaseTransactionMgr :
dbIdToDatabaseTransactionMgrs.values()) {
+
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost,
limit));
+ if (txnInfos.size() > limit) {
+ break;
+ }
}
- return txnIds;
+ return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0,
limit)) : txnInfos;
Review comment:
I think it's OK, cause no one will modify that list.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]