This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2d67297 IGNITE-15732 Thin client: Fix transaction failure after
timeout - Fixes #9504.
2d67297 is described below
commit 2d67297c9284252dc6ea3f46f5cdc7f06177fb3e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 27 09:51:48 2021 +0300
IGNITE-15732 Thin client: Fix transaction failure after timeout - Fixes
#9504.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../platform/client/tx/ClientTxContext.java | 18 ++++++---
.../org/apache/ignite/client/FunctionalTest.java | 44 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 5 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
index 2c430aa..626c6bd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxContext.java
@@ -63,13 +63,21 @@ public class ClientTxContext {
*/
public void release(boolean suspendTx) throws IgniteCheckedException {
try {
- if (suspendTx) {
- TransactionState state = tx.state();
+ try {
+ if (suspendTx) {
+ TransactionState state = tx.state();
- if (state != TransactionState.COMMITTED && state !=
TransactionState.ROLLED_BACK)
- tx.suspend();
+ if (state == TransactionState.ACTIVE)
+ tx.suspend();
+ }
}
- } finally {
+ finally {
+ // In some cases thread can still hold the transaction (due to
concurrent rollbacks), threadMap should
+ // be forcibly cleared to avoid problems with resuming other
transactions in the current worker.
+ tx.context().tm().clearThreadMap(tx);
+ }
+ }
+ finally {
lock.unlock();
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
index 8a07ebd..673dc69 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -75,6 +76,7 @@ import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Rule;
import org.junit.Test;
@@ -82,6 +84,7 @@ import org.junit.rules.Timeout;
import static
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static
org.apache.ignite.testframework.junits.GridAbstractTest.getMxBean;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -797,6 +800,47 @@ public class FunctionalTest {
}
/**
+ * Test that client-connector worker can process further transactional
requests (resume transactions) after
+ * external termination of previous transaction.
+ */
+ @Test
+ public void testTxResumeAfterTxTimeout() throws Exception {
+ IgniteConfiguration cfg =
Config.getServerConfiguration().setClientConnectorConfiguration(
+ new ClientConnectorConfiguration().setThreadPoolSize(1));
+
+ try (Ignite ignite = Ignition.start(cfg); IgniteClient client =
Ignition.startClient(getClientConfiguration())) {
+ String cacheName = "cache";
+
+ IgniteCache<Object, Object> igniteCache = ignite.createCache(new
CacheConfiguration<>(cacheName)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ try (ClientTransaction clientTx = client.transactions().txStart())
{
+ runAsync(() -> {
+ try (Transaction tx =
ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ igniteCache.put(0, 0); // Lock key by ignite node.
+
+ try {
+ // Start, but don't close the transaction (to keep
it in the threadMap after timeout).
+ client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED, 200L);
+
+ // Wait until transaction interrupted externally
by timeout.
+ client.cache(cacheName).put(0, 0);
+
+ fail();
+ }
+ catch (ClientException ignored) {
+ // Expected.
+ }
+ }
+ }).get();
+
+ // Resume tx in the worker with interrupted transaction.
+ assertFalse(client.cache(cacheName).containsKey(0));
+ }
+ }
+ }
+
+ /**
* Test transactions.
*/
@Test