This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7711fc9  IGNITE-12292 Java thin client: Fixed transaction issue in 
case of txId intersection - Fixes #6979.
7711fc9 is described below

commit 7711fc9e37c622e358de97d8e0d8b98c23959965
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 23 14:00:28 2019 +0300

    IGNITE-12292 Java thin client: Fixed transaction issue in case of txId 
intersection - Fixes #6979.
---
 .../client/thin/TcpClientTransactions.java         | 42 +++++++------
 .../org/apache/ignite/client/ReliabilityTest.java  | 70 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 17 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 341dad1..b117e9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.thin;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientTransaction;
 import org.apache.ignite.client.ClientTransactions;
@@ -43,11 +44,14 @@ class TcpClientTransactions implements ClientTransactions {
     /** Marshaller. */
     private final ClientBinaryMarshaller marsh;
 
-    /** Current thread transaction id. */
-    private final ThreadLocal<Integer> threadLocTxId = new ThreadLocal<>();
+    /** Tx counter (used to generate tx UID). */
+    private final AtomicLong txCnt = new AtomicLong();
 
-    /** Tx map. */
-    private final Map<Integer, TcpClientTransaction> txMap = new 
ConcurrentHashMap<>();
+    /** Current thread transaction UID. */
+    private final ThreadLocal<Long> threadLocTxUid = new ThreadLocal<>();
+
+    /** Tx map (Tx UID to Tx). */
+    private final Map<Long, TcpClientTransaction> txMap = new 
ConcurrentHashMap<>();
 
     /** Tx config. */
     private final ClientTransactionConfiguration txCfg;
@@ -103,9 +107,9 @@ class TcpClientTransactions implements ClientTransactions {
             res -> new TcpClientTransaction(res.in().readInt(), 
res.clientChannel())
         );
 
-        threadLocTxId.set(tx0.txId);
+        threadLocTxUid.set(tx0.txUid);
 
-        txMap.put(tx0.txId, tx0);
+        txMap.put(tx0.txUid, tx0);
 
         return tx0;
     }
@@ -126,12 +130,12 @@ class TcpClientTransactions implements ClientTransactions 
{
      * Current thread transaction.
      */
     TcpClientTransaction tx() {
-        Integer txId = threadLocTxId.get();
+        Long txUid = threadLocTxUid.get();
 
-        if (txId == null)
+        if (txUid == null)
             return null;
 
-        TcpClientTransaction tx0 = txMap.get(txId);
+        TcpClientTransaction tx0 = txMap.get(txUid);
 
         // Also check isClosed() flag, since transaction can be closed by 
another thread.
         return tx0 == null || tx0.isClosed() ? null : tx0;
@@ -141,7 +145,10 @@ class TcpClientTransactions implements ClientTransactions {
      *
      */
     class TcpClientTransaction implements ClientTransaction {
-        /** Transaction id. */
+        /** Unique client-side transaction id. */
+        private final long txUid;
+
+        /** Server-side transaction id. */
         private final int txId;
 
         /** Client channel. */
@@ -155,18 +162,19 @@ class TcpClientTransactions implements ClientTransactions 
{
          * @param clientCh Client channel.
          */
         private TcpClientTransaction(int id, ClientChannel clientCh) {
+            txUid = txCnt.incrementAndGet();
             txId = id;
             this.clientCh = clientCh;
         }
 
         /** {@inheritDoc} */
         @Override public void commit() {
-            Integer threadTxId;
+            Long threadTxUid;
 
-            if (closed || (threadTxId = threadLocTxId.get()) == null)
+            if (closed || (threadTxUid = threadLocTxUid.get()) == null)
                 throw new ClientException("The transaction is already closed");
 
-            if (txId != threadTxId)
+            if (txUid != threadTxUid)
                 throw new ClientException("You can commit transaction only 
from the thread it was started");
 
             endTx(true);
@@ -202,14 +210,14 @@ class TcpClientTransactions implements ClientTransactions 
{
                     }, null);
             }
             finally {
-                txMap.remove(txId);
+                txMap.remove(txUid);
 
                 closed = true;
 
-                Integer threadTxId = threadLocTxId.get();
+                Long threadTxUid = threadLocTxUid.get();
 
-                if (threadTxId != null && txId == threadTxId)
-                    threadLocTxId.set(null);
+                if (threadTxUid != null && txUid == threadTxUid)
+                    threadLocTxUid.set(null);
             }
         }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 646c52b..1bb3698 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -22,8 +22,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -33,6 +35,7 @@ import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -41,6 +44,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -192,6 +196,72 @@ public class ReliabilityTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test that client works properly with servers txId intersection.
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testTxWithIdIntersection() throws Exception {
+        int CLUSTER_SIZE = 2;
+
+        try (LocalIgniteCluster cluster = 
LocalIgniteCluster.start(CLUSTER_SIZE);
+             IgniteClient client = Ignition.startClient(new 
ClientConfiguration()
+                 .setAddresses(cluster.clientAddresses().toArray(new 
String[CLUSTER_SIZE])))
+        ) {
+            ClientCache<Integer, Integer> cache = client.createCache(new 
ClientCacheConfiguration().setName("cache")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+            CyclicBarrier barrier = new CyclicBarrier(2);
+
+            GridTestUtils.runAsync(() -> {
+                try {
+                    // Another thread starts transaction here.
+                    barrier.await(1, TimeUnit.SECONDS);
+
+                    for (int i = 0; i < CLUSTER_SIZE; i++)
+                        
dropAllThinClientConnections(Ignition.allGrids().get(i));
+
+                    ClientTransaction tx = client.transactions().txStart();
+
+                    barrier.await(1, TimeUnit.SECONDS);
+
+                    // Another thread puts to cache here.
+                    barrier.await(1, TimeUnit.SECONDS);
+
+                    tx.commit();
+
+                    barrier.await(1, TimeUnit.SECONDS);
+                }
+                catch (Exception e) {
+                    log.error("Unexpected error", e);
+                }
+            });
+
+            ClientTransaction tx = client.transactions().txStart();
+
+            barrier.await(1, TimeUnit.SECONDS);
+
+            // Another thread drops connections and create new transaction 
here, which started on another node with the
+            // same transaction id as we started in this thread.
+            barrier.await(1, TimeUnit.SECONDS);
+
+            GridTestUtils.assertThrows(null, () -> {
+                cache.put(0, 0);
+
+                return null;
+            }, ClientException.class, "Transaction context has been lost due 
to connection errors");
+
+            tx.close();
+
+            barrier.await(1, TimeUnit.SECONDS);
+
+            // Another thread commit transaction here.
+            barrier.await(1, TimeUnit.SECONDS);
+
+            assertFalse(cache.containsKey(0));
+        }
+    }
+
+    /**
      * Drop all thin client connections on given Ignite instance.
      *
      * @param ignite Ignite.

Reply via email to