This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 0675e2a IGNITE-14204 Fix C++ thin transactions 0675e2a is described below commit 0675e2a7e800730c9c8230332b82809754ddae5a Author: Igor Sapego <isap...@apache.org> AuthorDate: Mon Mar 1 21:03:54 2021 +0300 IGNITE-14204 Fix C++ thin transactions This closes #8836 --- .../platforms/cpp/thin-client-test/src/tx_test.cpp | 85 ++++++++---- modules/platforms/cpp/thin-client/CMakeLists.txt | 1 + .../cpp/thin-client/project/vs/thin-client.vcxproj | 1 + .../project/vs/thin-client.vcxproj.filters | 24 ++++ .../src/impl/cache/cache_client_impl.cpp | 145 ++++++++------------- .../thin-client/src/impl/cache/cache_client_impl.h | 37 +++++- .../cpp/thin-client/src/impl/data_router.h | 10 ++ .../platforms/cpp/thin-client/src/impl/message.h | 18 +-- ...{transactions_impl.cpp => transaction_impl.cpp} | 94 +++---------- .../src/impl/transactions/transaction_impl.h | 50 +++++-- .../src/impl/transactions/transactions_impl.cpp | 135 +------------------ .../src/impl/transactions/transactions_impl.h | 48 +------ 12 files changed, 252 insertions(+), 396 deletions(-) diff --git a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp index 19757a2..04422ad 100644 --- a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp +++ b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp @@ -39,7 +39,8 @@ class IgniteTxTestSuiteFixture public: IgniteTxTestSuiteFixture() { - serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode"); + node1 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node1"); + node2 = ignite_test::StartCrossPlatformServerNode("cache.xml", "node2"); } ~IgniteTxTestSuiteFixture() @@ -47,9 +48,24 @@ public: ignite::Ignition::StopAll(false); } + /** + * Start client. + */ + static IgniteClient StartClient() + { + IgniteClientConfiguration cfg; + + cfg.SetEndPoints("127.0.0.1:11110,127.0.0.1:11111"); + + return IgniteClient::Start(cfg); + } + private: - /** Server node. */ - ignite::Ignite serverNode; + /** Server node #1. */ + ignite::Ignite node1; + + /** Server node #2. */ + ignite::Ignite node2; }; BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture) @@ -75,11 +91,7 @@ bool checkTxTimeoutMessage(const ignite::IgniteError& ex) BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx) { - IgniteClientConfiguration cfg; - - cfg.SetEndPoints("127.0.0.1:11110"); - - IgniteClient client = IgniteClient::Start(cfg); + IgniteClient client = StartClient(); cache::CacheClient<int, int> cache = client.GetCache<int, int>("partitioned"); @@ -190,7 +202,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx) tx.Rollback(); - BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::ALL), 1); + BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::PRIMARY), 1); //--- @@ -241,11 +253,7 @@ BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx) void startAnotherClientAndTx(SharedPointer<SingleLatch>& l) { - IgniteClientConfiguration cfg; - - cfg.SetEndPoints("127.0.0.1:11110"); - - IgniteClient client = IgniteClient::Start(cfg); + IgniteClient client = IgniteTxTestSuiteFixture::StartClient(); cache::CacheClient<int, int> cache = client.GetCache<int, int>("partitioned"); @@ -263,11 +271,7 @@ void startAnotherClientAndTx(SharedPointer<SingleLatch>& l) BOOST_AUTO_TEST_CASE(TestTxOps) { - IgniteClientConfiguration cfg; - - cfg.SetEndPoints("127.0.0.1:11110"); - - IgniteClient client = IgniteClient::Start(cfg); + IgniteClient client = StartClient(); cache::CacheClient<int, int> cache = client.GetCache<int, int>("partitioned"); @@ -347,11 +351,7 @@ bool checkTxLabel1Message(const ignite::IgniteError& ex) BOOST_AUTO_TEST_CASE(TestTxWithLabel) { - IgniteClientConfiguration cfg; - - cfg.SetEndPoints("127.0.0.1:11110"); - - IgniteClient client = IgniteClient::Start(cfg); + IgniteClient client = StartClient(); cache::CacheClient<int, int> cache = client.GetCache<int, int>("partitioned"); @@ -397,4 +397,41 @@ BOOST_AUTO_TEST_CASE(TestTxWithLabel) tx.Close(); } +BOOST_AUTO_TEST_CASE(ManyTransactions) +{ + IgniteClient client = StartClient(); + + cache::CacheClient<int, int> cache = + client.GetCache<int, int>("partitioned"); + + transactions::ClientTransactions transactions = client.ClientTransactions(); + const int32_t key = 42; + + for (int32_t val = 0; val < 100; ++val) { + transactions::ClientTransaction tx = transactions.TxStart(); + + cache.Put(key, val); + + tx.Commit(); + + BOOST_CHECK_EQUAL(val, cache.Get(key)); + } + + const int32_t expected = -42; + + cache.Put(key, expected); + + BOOST_CHECK_EQUAL(expected, cache.Get(key)); + + for (int32_t val = 0; val < 100; ++val) { + transactions::ClientTransaction tx = transactions.TxStart(); + + cache.Put(key, val); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(expected, cache.Get(key)); + } +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt index 059b012..bc1fbeb 100644 --- a/modules/platforms/cpp/thin-client/CMakeLists.txt +++ b/modules/platforms/cpp/thin-client/CMakeLists.txt @@ -34,6 +34,7 @@ set(SOURCES src/impl/data_channel.cpp src/impl/message.cpp src/impl/cache/cache_client_proxy.cpp src/impl/cache/cache_client_impl.cpp + src/impl/transactions/transaction_impl.cpp src/impl/transactions/transactions_impl.cpp src/impl/transactions/transactions_proxy.cpp src/ignite_client.cpp diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj index 4d60182..d5cdc6f 100644 --- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj +++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj @@ -169,6 +169,7 @@ <ClCompile Include="..\..\src\impl\protocol_version.cpp" /> <ClCompile Include="..\..\src\impl\remote_type_updater.cpp" /> <ClCompile Include="..\..\src\impl\utility.cpp" /> + <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp" /> <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp" /> <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp" /> </ItemGroup> diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters index 263325c..c9a092a 100644 --- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters +++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj.filters @@ -67,6 +67,15 @@ <ClCompile Include="..\..\src\impl\affinity\affinity_manager.cpp"> <Filter>Code\impl\affinity</Filter> </ClCompile> + <ClCompile Include="..\..\src\impl\transactions\transaction_impl.cpp"> + <Filter>Code\impl\transactions</Filter> + </ClCompile> + <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp"> + <Filter>Code\impl\transactions</Filter> + </ClCompile> + <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp"> + <Filter>Code\impl\transactions</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\ignite\thin\ignite_client.h"> @@ -174,5 +183,20 @@ <ClInclude Include="..\..\src\impl\affinity\partition_awareness_group.h"> <Filter>Code\impl\affinity</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\thin\transactions\transactions.h"> + <Filter>Code\transactions</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\thin\transactions\transaction.h"> + <Filter>Code\transactions</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\thin\transactions\transaction_consts.h"> + <Filter>Code\transactions</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\thin\transactions\transactions_proxy.h"> + <Filter>Code\transactions</Filter> + </ClInclude> + <ClInclude Include="..\..\src\impl\transactions\transactions_impl.h"> + <Filter>Code\impl\transactions</Filter> + </ClInclude> </ItemGroup> </Project> \ No newline at end of file diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp index 51b0c28..d66b9e6 100644 --- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp @@ -101,71 +101,79 @@ namespace ignite return channel; } - template<typename ReqT> - void CacheClientImpl::checkTransactional(ReqT& req) + template<typename ReqT, typename RspT> + void CacheClientImpl::TransactionalSyncCacheKeyMessage(const WritableKey &key, ReqT &req, + RspT &rsp) + { + if (!TryProcessTransactional(req, rsp)) + SyncCacheKeyMessage(key, req, rsp); + } + + template<typename ReqT, typename RspT> + void CacheClientImpl::TransactionalSyncMessage(ReqT &req, RspT &rsp) { - SP_TransactionImpl activeTx = tx.Get()->GetCurrent(); + if (!TryProcessTransactional(req, rsp)) + SyncMessage(req, rsp); + } - bool isUnderTx = activeTx.IsValid(); + template<typename ReqT, typename RspT> + bool CacheClientImpl::TryProcessTransactional(ReqT& req, RspT& rsp) + { + TransactionImpl* activeTx = tx.Get()->GetCurrent().Get(); - int32_t txId = isUnderTx ? activeTx.Get()->TxId() : 0; + if (!activeTx) + return false; - req.activeTx(isUnderTx, txId); + req.activeTx(true, activeTx->TxId()); + + SP_DataChannel channel = activeTx->GetChannel(); + + channel.Get()->SyncMessage(req, rsp, router.Get()->GetIoTimeout()); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + + return true; } void CacheClientImpl::Put(const WritableKey& key, const Writable& value) { Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value); - - checkTransactional(req); - Response rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } void CacheClientImpl::Get(const WritableKey& key, Readable& value) { CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key); - - checkTransactional(req); - CacheValueResponse rsp(value); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } void CacheClientImpl::PutAll(const Writable & pairs) { CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs); - - checkTransactional(req); - Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs) { CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys); - - checkTransactional(req); - CacheValueResponse rsp(pairs); - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value) { Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -173,12 +181,9 @@ namespace ignite bool CacheClientImpl::ContainsKey(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -186,12 +191,9 @@ namespace ignite bool CacheClientImpl::ContainsKeys(const Writable& keys) { CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys); - - checkTransactional(req); - BoolResponse rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); return rsp.GetValue(); } @@ -199,12 +201,9 @@ namespace ignite int64_t CacheClientImpl::GetSize(int32_t peekModes) { CacheGetSizeRequest req(id, binary, peekModes); - - checkTransactional(req); - Int64Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); return rsp.GetValue(); } @@ -212,12 +211,9 @@ namespace ignite bool CacheClientImpl::Remove(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -225,12 +221,9 @@ namespace ignite bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val) { Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -238,78 +231,57 @@ namespace ignite void CacheClientImpl::RemoveAll(const Writable& keys) { CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys); - - checkTransactional(req); - Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } void CacheClientImpl::RemoveAll() { CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary); - - checkTransactional(req); - Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } void CacheClientImpl::Clear(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key); - - checkTransactional(req); - Response rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } void CacheClientImpl::Clear() { CacheRequest<RequestType::CACHE_CLEAR> req(id, binary); - - checkTransactional(req); - Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } void CacheClientImpl::ClearAll(const Writable& keys) { CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys); - - checkTransactional(req); - Response rsp; - SyncMessage(req, rsp); + TransactionalSyncMessage(req, rsp); } void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value) { CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key); - - checkTransactional(req); - CacheValueResponse rsp(value); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal) { Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -317,45 +289,33 @@ namespace ignite void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn); - - checkTransactional(req); - CacheValueResponse rsp(valOut); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut) { CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key); - - checkTransactional(req); - CacheValueResponse rsp(valOut); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn); - - checkTransactional(req); - CacheValueResponse rsp(valOut); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val) { Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val); - - checkTransactional(req); - BoolResponse rsp; - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); return rsp.GetValue(); } @@ -363,12 +323,9 @@ namespace ignite void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn); - - checkTransactional(req); - CacheValueResponse rsp(valOut); - SyncCacheKeyMessage(key, req, rsp); + TransactionalSyncCacheKeyMessage(key, req, rsp); } query::SP_QueryFieldsCursorImpl CacheClientImpl::Query( diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h index d74ad29..4efefd7 100644 --- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h @@ -310,9 +310,6 @@ namespace ignite template<typename ReqT, typename RspT> void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp); - template<typename ReqT> - void checkTransactional(ReqT& req); - /** * Synchronously send message and receive response. * @@ -324,6 +321,40 @@ namespace ignite template<typename ReqT, typename RspT> SP_DataChannel SyncMessage(const ReqT& req, RspT& rsp); + /** + * Synchronously send request message and receive response taking in account that it can be + * transactional. + * + * @param key Key. + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void TransactionalSyncCacheKeyMessage(const WritableKey& key, ReqT& req, RspT& rsp); + + /** + * Synchronously send message and receive response taking in account that it can be transactional. + * + * @param req Request message. + * @param rsp Response message. + * @return Channel that was used for request. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void TransactionalSyncMessage(ReqT& req, RspT& rsp); + + /*** + * Check whether request is transactional and process it if it is. + * @tparam ReqT Request type. + * @tparam RspT Response type. + * @param req Request. + * @param rsp Response. + * @return @c true if processed and false otherwise. + */ + template<typename ReqT, typename RspT> + bool TryProcessTransactional(ReqT& req, RspT& rsp); + /** Data router. */ SP_DataRouter router; diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h index baa74ac..701f710 100644 --- a/modules/platforms/cpp/thin-client/src/impl/data_router.h +++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h @@ -208,6 +208,16 @@ namespace ignite */ affinity::SP_AffinityAssignment GetAffinityAssignment(int32_t cacheId) const; + /** + * Get IO timeout. + * + * @return IO timeout. + */ + int32_t GetIoTimeout() + { + return ioTimeout; + } + private: IGNITE_NO_COPY_ASSIGNMENT(DataRouter); diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h index 80e5437..a7220d8 100644 --- a/modules/platforms/cpp/thin-client/src/impl/message.h +++ b/modules/platforms/cpp/thin-client/src/impl/message.h @@ -363,7 +363,8 @@ namespace ignite CacheRequest(int32_t cacheId, bool binary) : cacheId(cacheId), binary(binary), - actTx(false) + actTx(false), + txId(0) { // No-op. } @@ -536,15 +537,6 @@ namespace ignite } /** - * Sets transaction active flag and appropriate txId. - * @param active Transaction activity flag. - * @param id Transaction id. - */ - void activeTx(bool active, int32_t id) { - CacheRequest<OpCode>::activeTx(active, id); - } - - /** * Write request using provided writer. * @param writer Writer. * @param ver Version. @@ -691,11 +683,11 @@ namespace ignite * Constructor. * * @param id Transaction id. - * @param comm Need to commit flag. + * @param commit Need to commit flag. */ - TxEndRequest(int32_t id, bool comm) : + TxEndRequest(int32_t id, bool commit) : txId(id), - commited(comm) + commited(commit) { // No-op. } diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp similarity index 64% copy from modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp copy to modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp index d785932..6fd9115 100644 --- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.cpp @@ -16,8 +16,8 @@ */ #include "impl/message.h" -#include "impl/transactions/transactions_impl.h" #include "impl/transactions/transaction_impl.h" +#include "impl/transactions/transactions_impl.h" #include "impl/response_status.h" using namespace ignite::common::concurrent; @@ -32,35 +32,18 @@ namespace ignite { namespace transactions { - TransactionsImpl::TransactionsImpl(const SP_DataRouter& router) : - router(router) - { - // No-op. - } - template<typename ReqT, typename RspT> - void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp) + void TransactionImpl::SendTxMessage(const ReqT& req, RspT& rsp) { - router.Get()->SyncMessage(req, rsp); + channel.Get()->SyncMessage(req, rsp, static_cast<int32_t>(timeout / 1000) + ioTimeout); if (rsp.GetStatus() != ResponseStatus::SUCCESS) throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str()); } - SharedPointer<TransactionImpl> TransactionsImpl::TxStart( - TransactionConcurrency::Type concurrency, - TransactionIsolation::Type isolation, - int64_t timeout, - int32_t txSize, - SharedPointer<common::FixedSizeArray<char> > label) - { - SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label); - - return tx; - } - SP_TransactionImpl TransactionImpl::Create( TransactionsImpl& txs, + SP_DataRouter& router, TransactionConcurrency::Type concurrency, TransactionIsolation::Type isolation, int64_t timeout, @@ -72,19 +55,21 @@ namespace ignite TransactionImpl* ptr = tx.Get(); if (ptr && !ptr->IsClosed()) - { throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED); - } TxStartRequest req(concurrency, isolation, timeout, label); Int32Response rsp; - txs.SendTxMessage(req, rsp); + SP_DataChannel channel = router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str()); int32_t curTxId = rsp.GetValue(); - tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize)); + tx = SP_TransactionImpl(new TransactionImpl(txs, channel, curTxId, concurrency, + isolation, timeout, router.Get()->GetIoTimeout(), txSize)); txs.SetCurrent(tx); @@ -96,74 +81,29 @@ namespace ignite return closed; } - SP_TransactionImpl TransactionsImpl::GetCurrent() - { - SP_TransactionImpl tx = threadTx.Get(); - - TransactionImpl* ptr = tx.Get(); - - if (ptr && ptr->IsClosed()) - { - threadTx.Remove(); - - tx = SP_TransactionImpl(); - } - - return tx; - } - - void TransactionsImpl::SetCurrent(const SP_TransactionImpl& impl) - { - threadTx.Set(impl); - } - - void TransactionsImpl::ResetCurrent() + void TransactionImpl::Commit() { - threadTx.Remove(); - } + ThreadCheck(); - int32_t TransactionsImpl::TxCommit(int32_t txId) - { TxEndRequest req(txId, true); Response rsp; SendTxMessage(req, rsp); - return rsp.GetStatus(); + ThreadEnd(); } - int32_t TransactionsImpl::TxRollback(int32_t txId) + void TransactionImpl::Rollback() { + ThreadCheck(); + TxEndRequest req(txId, false); Response rsp; SendTxMessage(req, rsp); - return rsp.GetStatus(); - } - - int32_t TransactionsImpl::TxClose(int32_t txId) - { - return TxRollback(txId); - } - - void TransactionImpl::Commit() - { - ThreadCheck(); - - txs.TxCommit(txId); - - ThreadEnd(); - } - - void TransactionImpl::Rollback() - { - ThreadCheck(); - - txs.TxRollback(txId); - ThreadEnd(); } @@ -176,7 +116,7 @@ namespace ignite return; } - txs.TxClose(txId); + Rollback(); ThreadEnd(); } diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h index b6f9aa1..e395b3b 100644 --- a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h @@ -18,10 +18,10 @@ #ifndef _IGNITE_IMPL_THIN_TRANSACTION_IMPL #define _IGNITE_IMPL_THIN_TRANSACTION_IMPL -#include "impl/data_router.h" #include <ignite/common/fixed_size_array.h> -#include "ignite/thin/transactions/transaction_consts.h" -#include "impl/transactions/transactions_impl.h" +#include <ignite/thin/transactions/transaction_consts.h> + +#include "impl/data_router.h" namespace ignite { @@ -51,24 +51,30 @@ namespace ignite * Constructor. * * @param txImpl Transactions implementation. - * @param txid Transaction Id. + * @param channel Channel linked to transaction. + * @param txId Transaction Id. * @param concurrency Transaction concurrency. * @param isolation Transaction isolation. * @param timeout Transaction timeout. + * @param ioTimeout IO timeout for channel. * @param size Number of entries participating in transaction (may be approximate). */ TransactionImpl( TransactionsImpl& txImpl, - int32_t txid, + SP_DataChannel channel, + int32_t txId, ignite::thin::transactions::TransactionConcurrency::Type concurrency, ignite::thin::transactions::TransactionIsolation::Type isolation, int64_t timeout, + int32_t ioTimeout, int32_t size) : + channel(channel), txs(txImpl), - txId(txid), + txId(txId), concurrency(concurrency), isolation(isolation), timeout(timeout), + ioTimeout(ioTimeout), txSize(size), closed(false) { @@ -123,6 +129,7 @@ namespace ignite * Starts transaction. * * @param txs Transactions implementation. + * @param router Router to use to start transaction. * @param concurrency Transaction concurrency. * @param isolation Transaction isolation. * @param timeout Transaction timeout. @@ -131,19 +138,43 @@ namespace ignite */ static SP_TransactionImpl Create( TransactionsImpl& txs, + SP_DataRouter& router, ignite::thin::transactions::TransactionConcurrency::Type concurrency, ignite::thin::transactions::TransactionIsolation::Type isolation, int64_t timeout, int32_t txSize, ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label); - protected: + + /** + * Get channel for the transaction. + * + * @return Channel. + */ + SP_DataChannel GetChannel() + { + return channel; + } + + private: /** Checks current thread state. */ void ThreadCheck(); /** Completes tc and clear state from storage. */ void ThreadEnd(); - private: + /** + * Synchronously send message and receive response. + * + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SendTxMessage(const ReqT& req, RspT& rsp); + + /** Data channel to use. */ + SP_DataChannel channel; + /** Transactions implementation. */ TransactionsImpl& txs; @@ -159,6 +190,9 @@ namespace ignite /** Timeout in milliseconds. */ int64_t timeout; + /** Channel io timeout. */ + int32_t ioTimeout; + /** Transaction size. */ int32_t txSize; diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp index d785932..6227add 100644 --- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp @@ -15,10 +15,7 @@ * limitations under the License. */ -#include "impl/message.h" #include "impl/transactions/transactions_impl.h" -#include "impl/transactions/transaction_impl.h" -#include "impl/response_status.h" using namespace ignite::common::concurrent; using namespace ignite::impl::thin; @@ -38,15 +35,6 @@ namespace ignite // No-op. } - template<typename ReqT, typename RspT> - void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp) - { - router.Get()->SyncMessage(req, rsp); - - if (rsp.GetStatus() != ResponseStatus::SUCCESS) - throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str()); - } - SharedPointer<TransactionImpl> TransactionsImpl::TxStart( TransactionConcurrency::Type concurrency, TransactionIsolation::Type isolation, @@ -54,48 +42,11 @@ namespace ignite int32_t txSize, SharedPointer<common::FixedSizeArray<char> > label) { - SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label); + SP_TransactionImpl tx = TransactionImpl::Create(*this, router, concurrency, isolation, timeout, txSize, label); return tx; } - SP_TransactionImpl TransactionImpl::Create( - TransactionsImpl& txs, - TransactionConcurrency::Type concurrency, - TransactionIsolation::Type isolation, - int64_t timeout, - int32_t txSize, - SharedPointer<common::FixedSizeArray<char> > label) - { - SP_TransactionImpl tx = txs.GetCurrent(); - - TransactionImpl* ptr = tx.Get(); - - if (ptr && !ptr->IsClosed()) - { - throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED); - } - - TxStartRequest req(concurrency, isolation, timeout, label); - - Int32Response rsp; - - txs.SendTxMessage(req, rsp); - - int32_t curTxId = rsp.GetValue(); - - tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize)); - - txs.SetCurrent(tx); - - return tx; - } - - bool TransactionImpl::IsClosed() const - { - return closed; - } - SP_TransactionImpl TransactionsImpl::GetCurrent() { SP_TransactionImpl tx = threadTx.Get(); @@ -121,90 +72,6 @@ namespace ignite { threadTx.Remove(); } - - int32_t TransactionsImpl::TxCommit(int32_t txId) - { - TxEndRequest req(txId, true); - - Response rsp; - - SendTxMessage(req, rsp); - - return rsp.GetStatus(); - } - - int32_t TransactionsImpl::TxRollback(int32_t txId) - { - TxEndRequest req(txId, false); - - Response rsp; - - SendTxMessage(req, rsp); - - return rsp.GetStatus(); - } - - int32_t TransactionsImpl::TxClose(int32_t txId) - { - return TxRollback(txId); - } - - void TransactionImpl::Commit() - { - ThreadCheck(); - - txs.TxCommit(txId); - - ThreadEnd(); - } - - void TransactionImpl::Rollback() - { - ThreadCheck(); - - txs.TxRollback(txId); - - ThreadEnd(); - } - - void TransactionImpl::Close() - { - ThreadCheck(); - - if (IsClosed()) - { - return; - } - - txs.TxClose(txId); - - ThreadEnd(); - } - - void TransactionImpl::SetClosed() - { - closed = true; - } - - void TransactionImpl::ThreadEnd() - { - this->SetClosed(); - - txs.ResetCurrent(); - } - - void TransactionImpl::ThreadCheck() - { - SP_TransactionImpl tx = txs.GetCurrent(); - - TransactionImpl* ptr = tx.Get(); - - if (!ptr) - throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED); - - if (ptr->TxId() != this->TxId()) - throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD); - } } } } diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h index 278545d..a0879fb 100644 --- a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h @@ -18,9 +18,10 @@ #ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL #define _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL -#include "impl/data_router.h" #include <ignite/common/fixed_size_array.h> -#include "ignite/thin/transactions/transaction_consts.h" +#include <ignite/thin/transactions/transaction_consts.h> + +#include "impl/data_router.h" #include "impl/transactions/transaction_impl.h" namespace ignite @@ -31,11 +32,6 @@ namespace ignite { namespace transactions { - class TransactionsImpl; - - typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl; - typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl; - /** * Thin client transaction. */ @@ -72,32 +68,6 @@ namespace ignite ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label); /** - * Commit Transaction. - * - * @param id Transaction ID. - * @return Resulting state. - */ - int32_t TxCommit(int32_t id); - - /** - * Rollback Transaction. - * - * @param id Transaction ID. - * @return Resulting state. - */ - int32_t TxRollback(int32_t id); - - - /** - * Close the transaction. - * - * This method should only be used on the valid instance. - * - * @param id Transaction ID. - */ - int32_t TxClose(int32_t id); - - /** * Get active transaction for the current thread. * * @return Active transaction implementation for current thread @@ -118,16 +88,6 @@ namespace ignite */ void ResetCurrent(); - /** - * Synchronously send message and receive response. - * - * @param req Request message. - * @param rsp Response message. - * @throw IgniteError on error. - */ - template<typename ReqT, typename RspT> - void SendTxMessage(const ReqT& req, RspT& rsp); - private: /** Data router. */ SP_DataRouter router; @@ -137,6 +97,8 @@ namespace ignite IGNITE_NO_COPY_ASSIGNMENT(TransactionsImpl); }; + + typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl; } } }