This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 11872d5 Sending clear txn task explicitly after transaction being aborted (#2182) 11872d5 is described below commit 11872d5cf60afa9a165cfd0f19a55f8863e3aaae Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Nov 13 11:22:45 2019 +0800 Sending clear txn task explicitly after transaction being aborted (#2182) --- be/src/agent/task_worker_pool.cpp | 9 ++++- be/src/common/config.h | 5 +++ be/src/olap/delta_writer.cpp | 2 +- be/src/olap/olap_define.h | 1 + be/src/olap/push_handler.cpp | 9 +++-- be/src/olap/storage_engine.cpp | 9 ++++- be/src/olap/storage_engine.h | 3 +- be/src/olap/txn_manager.cpp | 44 +++++++++++++++++++++- be/src/olap/txn_manager.h | 15 ++++++++ be/test/olap/txn_manager_test.cpp | 1 + .../java/org/apache/doris/master/MasterImpl.java | 12 ++---- .../org/apache/doris/master/ReportHandler.java | 4 +- .../java/org/apache/doris/task/ClearAlterTask.java | 9 ----- .../apache/doris/task/ClearTransactionTask.java | 25 +++--------- .../doris/transaction/GlobalTransactionMgr.java | 43 +++++++++++++++++++++ 15 files changed, 141 insertions(+), 50 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 9e51d17..dbb60fd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -893,8 +893,13 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t // transaction_id should be greater than zero. // If it is not greater than zero, no need to execute // the following clear_transaction_task() function. - worker_pool_this->_env->storage_engine()->clear_transaction_task( - clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id); + if (clear_transaction_task_req.partition_id.empty()) { + worker_pool_this->_env->storage_engine()->clear_transaction_task( + clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id); + } else { + worker_pool_this->_env->storage_engine()->clear_transaction_task( + clear_transaction_task_req.transaction_id); + } LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature << ", transaction_id:" << clear_transaction_task_req.transaction_id; } else { diff --git a/be/src/common/config.h b/be/src/common/config.h index 9aa7d2b..e335000 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -491,6 +491,11 @@ namespace config { // brpc config CONF_Int64(brpc_max_body_size, "67108864") + + // max number of txns in txn manager + // this is a self protection to avoid too many txns saving in manager + CONF_Int64(max_runnings_transactions, "1000"); + } // namespace config } // namespace doris diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 711d50c..57179b7 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -123,7 +123,7 @@ OLAPStatus DeltaWriter::init() { if (!new_migration_rlock.own_lock()) { return OLAP_ERR_RWLOCK_ERROR; } - _storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id); + RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id)); } } } diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index f4b40f1..f1efdb0 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -166,6 +166,7 @@ enum OLAPStatus { OLAP_ERR_VERSION_ALREADY_MERGED = -230, OLAP_ERR_LZO_DISABLED = -231, OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232, + OLAP_ERR_TOO_MANY_TRANSACTIONS = -233, // CommandExecutor // [-300, -400) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 298634d..614b366 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -93,8 +93,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion( PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - OLAPStatus res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id, - tablet, request.transaction_id, load_id); + RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id, + tablet, request.transaction_id, load_id)); // prepare txn will be always successful // if current tablet is under schema change, origin tablet is successful and @@ -140,8 +140,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion( PUniqueId load_id; load_id.set_hi(0); load_id.set_lo(0); - res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id, - related_tablet, request.transaction_id, load_id); + RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id, + related_tablet, request.transaction_id, load_id)); // prepare txn will always be successful tablet_vars->push_back(TabletVars()); TabletVars& new_item = tablet_vars->back(); @@ -158,6 +158,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion( // not call validate request here, because realtime load does not // contain version info + OLAPStatus res; // check delete condition if push for delete std::queue<DeletePredicatePB> del_preds; if (push_type == PUSH_FOR_DELETE) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 99cb877..4479451 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -477,8 +477,15 @@ OLAPStatus StorageEngine::clear() { return OLAP_SUCCESS; } +void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { + // clear transaction task may not contains partitions ids, we should get partition id from txn manager. + std::vector<int64_t> partition_ids; + StorageEngine::instance()->txn_manager()->get_partition_ids(transaction_id, &partition_ids); + clear_transaction_task(transaction_id, partition_ids); +} + void StorageEngine::clear_transaction_task(const TTransactionId transaction_id, - const vector<TPartitionId> partition_ids) { + const vector<TPartitionId>& partition_ids) { LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id; for (const TPartitionId& partition_id : partition_ids) { diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index c6634b0..a360f22 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -86,8 +86,9 @@ public: const bool is_schema_change_tablet, const TabletSharedPtr ref_tablet); + void clear_transaction_task(const TTransactionId transaction_id); void clear_transaction_task(const TTransactionId transaction_id, - const std::vector<TPartitionId> partition_ids); + const std::vector<TPartitionId>& partition_ids); // Instance should be inited from create_instance // MUST NOT be called in other circumstances. diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index e103b27..44fe9b2 100755 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -134,11 +134,21 @@ OLAPStatus TxnManager::prepare_txn( } } } + + // check if there are too many transactions on running. + // if yes, reject the request. + if (_txn_partition_map.size() > config::max_runnings_transactions) { + LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions; + return OLAP_ERR_TOO_MANY_TRANSACTIONS; + } + // not found load id // case 1: user start a new txn, rowset_ptr = null // case 2: loading txn from meta env TabletTxnInfo load_info(load_id, nullptr); _txn_tablet_map[key][tablet_info] = load_info; + _insert_txn_partition_map_unlocked(transaction_id, partition_id); + LOG(INFO) << "add transaction to engine successfully." << "partition_id: " << key.first << ", transaction_id: " << key.second @@ -225,6 +235,7 @@ OLAPStatus TxnManager::commit_txn( WriteLock wrlock(&_txn_map_lock); TabletTxnInfo load_info(load_id, rowset_ptr); _txn_tablet_map[key][tablet_info] = load_info; + _insert_txn_partition_map_unlocked(transaction_id, partition_id); LOG(INFO) << "commit transaction to engine successfully." << " partition_id: " << key.first << ", transaction_id: " << key.second @@ -289,6 +300,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT _txn_tablet_map.erase(it); } } + _clear_txn_partition_map_unlocked(transaction_id, partition_id); return OLAP_SUCCESS; } } @@ -324,7 +336,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr if (it->second.empty()) { _txn_tablet_map.erase(it); } - return OLAP_SUCCESS; + _clear_txn_partition_map_unlocked(transaction_id, partition_id); } return OLAP_SUCCESS; } @@ -374,6 +386,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr if (it->second.empty()) { _txn_tablet_map.erase(it); } + _clear_txn_partition_map_unlocked(transaction_id, partition_id); return OLAP_SUCCESS; } @@ -423,6 +436,7 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta if (it.second.empty()) { _txn_tablet_map.erase(it.first); } + _clear_txn_partition_map_unlocked(it.first.second, it.first.first); } } @@ -500,4 +514,32 @@ bool TxnManager::get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, Ta return true; } +void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) { + ReadLock txn_rdlock(&_txn_map_lock); + auto it = _txn_partition_map.find(transaction_id); + if (it != _txn_partition_map.end()) { + for (int64_t partition_id : it->second) { + partition_ids->push_back(partition_id); + } + } +} + +void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { + auto find = _txn_partition_map.find(transaction_id); + if (find == _txn_partition_map.end()) { + _txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); + } + _txn_partition_map[transaction_id].insert(partition_id); +} + +void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { + auto it = _txn_partition_map.find(transaction_id); + if (it != _txn_partition_map.end()) { + it->second.erase(partition_id); + if (it->second.empty()) { + _txn_partition_map.erase(it); + } + } +} + } // namespace doris diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index ef7f46e..8ea816e 100755 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -27,6 +27,8 @@ #include <string> #include <vector> #include <thread> +#include <unordered_map> +#include <unordered_set> #include <rapidjson/document.h> #include <pthread.h> @@ -70,6 +72,7 @@ public: ~TxnManager() { _txn_tablet_map.clear(); + _txn_partition_map.clear(); _txn_locks.clear(); } @@ -132,16 +135,28 @@ public: bool get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, std::vector<int64_t>* transaction_ids); void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid); + + void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids); private: RWMutex* _get_txn_lock(TTransactionId txn_id) { return _txn_locks[txn_id % _txn_lock_num].get(); } + // insert or remove (transaction_id, partition_id) from _txn_partition_map + // get _txn_map_lock before calling + void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); + private: RWMutex _txn_map_lock; using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map; + // transaction_id -> corresponding partition ids + // This is mainly for the clear txn task received from FE, which may only has transaction id, + // so we need this map to find out which partitions are corresponding to a transaction id. + // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map' + std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map; const int32_t _txn_lock_num = 100; std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks; diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index be11224..0c93e5a 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -92,6 +92,7 @@ public: } virtual void SetUp() { + config::max_runnings_transactions = 1000; std::vector<StorePath> paths; paths.emplace_back("_engine_data_path", -1); diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index 1393fd6..db92f77 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -558,18 +558,14 @@ public class MasterImpl { private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) { ClearAlterTask clearAlterTask = (ClearAlterTask) task; - clearAlterTask.setFinished(); - AgentTaskQueue.removeTask(task.getBackendId(), - task.getTaskType(), - task.getSignature()); + clearAlterTask.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature()); } private void finishClearTransactionTask(AgentTask task, TFinishTaskRequest request) { ClearTransactionTask clearTransactionTask = (ClearTransactionTask) task; - clearTransactionTask.setFinished(); - AgentTaskQueue.removeTask(task.getBackendId(), - task.getTaskType(), - task.getSignature()); + clearTransactionTask.setFinished(true); + AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature()); } private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) { diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 2a40961..213da10 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -81,7 +81,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -896,8 +895,7 @@ public class ReportHandler extends Daemon { AgentBatchTask batchTask = new AgentBatchTask(); for (Long transactionId : transactionsToClear.keySet()) { ClearTransactionTask clearTransactionTask = new ClearTransactionTask(backendId, - transactionId, - new ArrayList<Long>(transactionsToClear.get(transactionId))); + transactionId, transactionsToClear.get(transactionId)); batchTask.addTask(clearTransactionTask); AgentTaskQueue.addTask(clearTransactionTask); } diff --git a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java index 53e8015..1417861 100644 --- a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java +++ b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java @@ -22,7 +22,6 @@ import org.apache.doris.thrift.TTaskType; public class ClearAlterTask extends AgentTask { private int schemaHash; - private boolean isFinished; public ClearAlterTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, int schemaHash) { @@ -40,12 +39,4 @@ public class ClearAlterTask extends AgentTask { public int getSchemaHash() { return schemaHash; } - - public void setFinished() { - this.isFinished = true; - } - - public boolean isFinished() { - return isFinished; - } } diff --git a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java index 7b1caea..6f07898 100644 --- a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java +++ b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java @@ -17,24 +17,17 @@ package org.apache.doris.task; -import java.util.List; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import org.apache.doris.thrift.TClearTransactionTaskRequest; import org.apache.doris.thrift.TTaskType; -public class ClearTransactionTask extends AgentTask { +import java.util.List; - private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class); +public class ClearTransactionTask extends AgentTask { private long transactionId; private List<Long> partitionIds; - private boolean isFinished; - public ClearTransactionTask(long backendId, long transactionId, - List<Long> partitionIds) { + public ClearTransactionTask(long backendId, long transactionId, List<Long> partitionIds) { super(null, backendId, TTaskType.CLEAR_TRANSACTION_TASK, -1L, -1L, -1L, -1L, -1L, transactionId); this.transactionId = transactionId; this.partitionIds = partitionIds; @@ -42,16 +35,8 @@ public class ClearTransactionTask extends AgentTask { } public TClearTransactionTaskRequest toThrift() { - TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest(transactionId, - partitionIds); + TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest( + transactionId, partitionIds); return clearTransactionTaskRequest; } - - public void setFinished() { - this.isFinished = true; - } - - public boolean isFinished() { - return this.isFinished; - } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 5908d18..15d3e19 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -39,7 +39,10 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.EditLog; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.ClearTransactionTask; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; @@ -95,6 +98,8 @@ public class GlobalTransactionMgr { private Catalog catalog; + private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList(); + public GlobalTransactionMgr(Catalog catalog) { this.catalog = catalog; } @@ -484,6 +489,7 @@ public class GlobalTransactionMgr { return transactionState.getTransactionStatus() == TransactionStatus.VISIBLE; } + // for http cancel stream load api public void abortTransaction(Long dbId, String label, String reason) throws UserException { Preconditions.checkNotNull(label); Long transactionId = null; @@ -532,9 +538,46 @@ public class GlobalTransactionMgr { writeUnlock(); transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); } + + // send clear txn task to BE to clear the transactions on BE. + // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared + // explicitly, or it will be remained on BE forever + // (However the report process will do the diff and send clear txn tasks to BE, but that is our + // last defense) + if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + clearBackendTransactions(transactionState); + } + return; } + private void clearBackendTransactions(TransactionState transactionState) { + Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED); + // for aborted transaction, we don't know which backends are involved, so we have to send clear task + // to all backends. + List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false); + AgentBatchTask batchTask = null; + synchronized (clearTransactionTasks) { + for (Long beId : allBeIds) { + ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList()); + clearTransactionTasks.add(task); + } + + // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc. + if (clearTransactionTasks.size() > allBeIds.size() * 2) { + batchTask = new AgentBatchTask(); + for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) { + batchTask.addTask(clearTransactionTask); + } + clearTransactionTasks.clear(); + } + } + + if (batchTask != null) { + AgentTaskExecutor.submit(batchTask); + } + } + /* * get all txns which is ready to publish * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org