This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7711fc9 IGNITE-12292 Java thin client: Fixed transaction issue in
case of txId intersection - Fixes #6979.
7711fc9 is described below
commit 7711fc9e37c622e358de97d8e0d8b98c23959965
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 23 14:00:28 2019 +0300
IGNITE-12292 Java thin client: Fixed transaction issue in case of txId
intersection - Fixes #6979.
---
.../client/thin/TcpClientTransactions.java | 42 +++++++------
.../org/apache/ignite/client/ReliabilityTest.java | 70 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 17 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 341dad1..b117e9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.thin;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.ClientTransactions;
@@ -43,11 +44,14 @@ class TcpClientTransactions implements ClientTransactions {
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
- /** Current thread transaction id. */
- private final ThreadLocal<Integer> threadLocTxId = new ThreadLocal<>();
+ /** Tx counter (used to generate tx UID). */
+ private final AtomicLong txCnt = new AtomicLong();
- /** Tx map. */
- private final Map<Integer, TcpClientTransaction> txMap = new
ConcurrentHashMap<>();
+ /** Current thread transaction UID. */
+ private final ThreadLocal<Long> threadLocTxUid = new ThreadLocal<>();
+
+ /** Tx map (Tx UID to Tx). */
+ private final Map<Long, TcpClientTransaction> txMap = new
ConcurrentHashMap<>();
/** Tx config. */
private final ClientTransactionConfiguration txCfg;
@@ -103,9 +107,9 @@ class TcpClientTransactions implements ClientTransactions {
res -> new TcpClientTransaction(res.in().readInt(),
res.clientChannel())
);
- threadLocTxId.set(tx0.txId);
+ threadLocTxUid.set(tx0.txUid);
- txMap.put(tx0.txId, tx0);
+ txMap.put(tx0.txUid, tx0);
return tx0;
}
@@ -126,12 +130,12 @@ class TcpClientTransactions implements ClientTransactions
{
* Current thread transaction.
*/
TcpClientTransaction tx() {
- Integer txId = threadLocTxId.get();
+ Long txUid = threadLocTxUid.get();
- if (txId == null)
+ if (txUid == null)
return null;
- TcpClientTransaction tx0 = txMap.get(txId);
+ TcpClientTransaction tx0 = txMap.get(txUid);
// Also check isClosed() flag, since transaction can be closed by
another thread.
return tx0 == null || tx0.isClosed() ? null : tx0;
@@ -141,7 +145,10 @@ class TcpClientTransactions implements ClientTransactions {
*
*/
class TcpClientTransaction implements ClientTransaction {
- /** Transaction id. */
+ /** Unique client-side transaction id. */
+ private final long txUid;
+
+ /** Server-side transaction id. */
private final int txId;
/** Client channel. */
@@ -155,18 +162,19 @@ class TcpClientTransactions implements ClientTransactions
{
* @param clientCh Client channel.
*/
private TcpClientTransaction(int id, ClientChannel clientCh) {
+ txUid = txCnt.incrementAndGet();
txId = id;
this.clientCh = clientCh;
}
/** {@inheritDoc} */
@Override public void commit() {
- Integer threadTxId;
+ Long threadTxUid;
- if (closed || (threadTxId = threadLocTxId.get()) == null)
+ if (closed || (threadTxUid = threadLocTxUid.get()) == null)
throw new ClientException("The transaction is already closed");
- if (txId != threadTxId)
+ if (txUid != threadTxUid)
throw new ClientException("You can commit transaction only
from the thread it was started");
endTx(true);
@@ -202,14 +210,14 @@ class TcpClientTransactions implements ClientTransactions
{
}, null);
}
finally {
- txMap.remove(txId);
+ txMap.remove(txUid);
closed = true;
- Integer threadTxId = threadLocTxId.get();
+ Long threadTxUid = threadLocTxUid.get();
- if (threadTxId != null && txId == threadTxId)
- threadLocTxId.set(null);
+ if (threadTxUid != null && txUid == threadTxUid)
+ threadLocTxUid.set(null);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 646c52b..1bb3698 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -22,8 +22,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -33,6 +35,7 @@ import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
@@ -41,6 +44,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -192,6 +196,72 @@ public class ReliabilityTest extends
GridCommonAbstractTest {
}
/**
+ * Test that client works properly with servers txId intersection.
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testTxWithIdIntersection() throws Exception {
+ int CLUSTER_SIZE = 2;
+
+ try (LocalIgniteCluster cluster =
LocalIgniteCluster.start(CLUSTER_SIZE);
+ IgniteClient client = Ignition.startClient(new
ClientConfiguration()
+ .setAddresses(cluster.clientAddresses().toArray(new
String[CLUSTER_SIZE])))
+ ) {
+ ClientCache<Integer, Integer> cache = client.createCache(new
ClientCacheConfiguration().setName("cache")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ GridTestUtils.runAsync(() -> {
+ try {
+ // Another thread starts transaction here.
+ barrier.await(1, TimeUnit.SECONDS);
+
+ for (int i = 0; i < CLUSTER_SIZE; i++)
+
dropAllThinClientConnections(Ignition.allGrids().get(i));
+
+ ClientTransaction tx = client.transactions().txStart();
+
+ barrier.await(1, TimeUnit.SECONDS);
+
+ // Another thread puts to cache here.
+ barrier.await(1, TimeUnit.SECONDS);
+
+ tx.commit();
+
+ barrier.await(1, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ log.error("Unexpected error", e);
+ }
+ });
+
+ ClientTransaction tx = client.transactions().txStart();
+
+ barrier.await(1, TimeUnit.SECONDS);
+
+ // Another thread drops connections and create new transaction
here, which started on another node with the
+ // same transaction id as we started in this thread.
+ barrier.await(1, TimeUnit.SECONDS);
+
+ GridTestUtils.assertThrows(null, () -> {
+ cache.put(0, 0);
+
+ return null;
+ }, ClientException.class, "Transaction context has been lost due
to connection errors");
+
+ tx.close();
+
+ barrier.await(1, TimeUnit.SECONDS);
+
+ // Another thread commit transaction here.
+ barrier.await(1, TimeUnit.SECONDS);
+
+ assertFalse(cache.containsKey(0));
+ }
+ }
+
+ /**
* Drop all thin client connections on given Ignite instance.
*
* @param ignite Ignite.