Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 30421e399 -> 54b871422


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54b87142
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54b87142
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54b87142

Branch: refs/heads/ignite-3478
Commit: 54b871422625636a54a14efffe39a49c192af071
Parents: 30421e3
Author: sboikov <[email protected]>
Authored: Mon Sep 18 17:31:37 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Sep 18 17:31:37 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java | 16 ++++-
 .../GridNearPessimisticTxPrepareFuture.java     | 15 ++++-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 38 +++++++-----
 .../cache/mvcc/MvccResponseListener.java        | 29 +++++++++
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 64 ++++++++++++++++++++
 5 files changed, 144 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 42c2914..0fe17a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -61,6 +61,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -104,7 +106,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARED;
  */
 @SuppressWarnings("unchecked")
 public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
-    implements GridCacheVersionedFuture<GridNearTxPrepareResponse>, 
IgniteDiagnosticAware {
+    implements GridCacheVersionedFuture<GridNearTxPrepareResponse>, 
IgniteDiagnosticAware, MvccResponseListener {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -1239,7 +1241,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 if (crd.isLocal())
                     
tx.mvccCoordinatorVersion(cctx.coordinators().requestTxCounterOnCoordinator(tx));
                 else {
-                    IgniteInternalFuture<Long> crdCntrFut = 
cctx.coordinators().requestTxCounter(crd, tx);
+                    IgniteInternalFuture<Long> crdCntrFut = 
cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
 
                     if (tx.onePhaseCommit())
                         waitCrdCntrFut = crdCntrFut;
@@ -1299,6 +1301,16 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onMvccResponse(MvccCoordinatorVersion res) {
+        tx.mvccCoordinatorVersion(res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMvccError(IgniteCheckedException e) {
+        // TODO IGNITE-3478.
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index f55bb28..8247b46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -57,7 +58,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter {
+public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter implements MvccResponseListener {
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -432,7 +433,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
                 tx.mvccCoordinatorVersion(mvccVer);
             }
             else {
-                IgniteInternalFuture<Long> cntrFut = 
cctx.coordinators().requestTxCounter(mvccCrd, tx);
+                IgniteInternalFuture<Long> cntrFut = 
cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion());
 
                 add((IgniteInternalFuture)cntrFut);
             }
@@ -442,6 +443,16 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
+    @Override public void onMvccResponse(MvccCoordinatorVersion res) {
+        tx.mvccCoordinatorVersion(res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMvccError(IgniteCheckedException e) {
+        ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, 
e);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean onOwnerChanged(GridCacheEntryEx entry, 
GridCacheMvccCandidate owner) {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index b3cf54e..0f7e71e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -149,27 +149,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
 
     /**
      * @param crd Coordinator.
-     * @param tx Transaction.
+     * @param lsnr Response listener.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(ClusterNode crd, GridDhtTxLocalAdapter tx) {
+    public IgniteInternalFuture<MvccCoordinatorVersion> 
requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion 
txVer) {
         assert !crd.isLocal() : crd;
 
         MvccVersionFuture fut = new 
MvccVersionFuture(futIdCntr.incrementAndGet(),
             crd,
-            tx);
+            lsnr);
 
         verFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 MSG_TOPIC,
-                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()),
+                new CoordinatorTxCounterRequest(fut.id, txVer),
                 MSG_POLICY);
         }
         catch (IgniteCheckedException e) {
-            if (verFuts.remove(fut.id) != null)
-                fut.onDone(e);
+            fut.onError(e);
         }
 
         return fut;
@@ -679,7 +678,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         private final Long id;
 
         /** */
-        private GridDhtTxLocalAdapter tx;
+        private MvccResponseListener lsnr;
 
         /** */
         public final ClusterNode crd;
@@ -691,10 +690,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
          * @param id Future ID.
          * @param crd Coordinator.
          */
-        MvccVersionFuture(Long id, ClusterNode crd, @Nullable 
GridDhtTxLocalAdapter tx) {
+        MvccVersionFuture(Long id, ClusterNode crd, @Nullable 
MvccResponseListener lsnr) {
             this.id = id;
             this.crd = crd;
-            this.tx = tx;
+            this.lsnr = lsnr;
 
             if (STAT_CNTRS)
                 startTime = System.nanoTime();
@@ -706,19 +705,30 @@ public class CacheCoordinatorsSharedManager<K, V> extends 
GridCacheSharedManager
         void onResponse(MvccCoordinatorVersionResponse res) {
             assert res.counter() != COUNTER_NA;
 
-            if (tx != null)
-                tx.mvccCoordinatorVersion(res);
+            if (lsnr != null)
+                lsnr.onMvccResponse(res);
 
             onDone(res);
         }
 
+        void onError(IgniteCheckedException err) {
+            if (verFuts.remove(id) != null) {
+                if (lsnr != null)
+                    lsnr.onMvccError(err);
+
+                onDone(err);
+            }
+        }
+
         /**
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId) && verFuts.remove(id) != null) {
-                onDone(new ClusterTopologyCheckedException("Failed to request 
coordinator version, " +
-                    "coordinator failed: " + nodeId));
+            if (crd.id().equals(nodeId)) {
+                ClusterTopologyCheckedException err = new 
ClusterTopologyCheckedException("Failed to request coordinator version, " +
+                    "coordinator failed: " + nodeId);
+
+                onError(err);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
new file mode 100644
index 0000000..11d0da0
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ *
+ */
+public interface MvccResponseListener {
+    public void onMvccResponse(MvccCoordinatorVersion res);
+
+    public void onMvccError(IgniteCheckedException e);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54b87142/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index f724afb..11980a9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -1352,6 +1354,68 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorFailurePessimisticTx() throws Exception {
+        testSpi = true;
+
+        startGrids(3);
+
+        client = true;
+
+        final Ignite client = startGrid(3);
+
+        final IgniteCache cache = client.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT));
+
+        final Integer key1 = primaryKey(jcache(1));
+        final Integer key2 = primaryKey(jcache(2));
+
+        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(ignite(0));
+
+        crdSpi.blockMessages(MvccCoordinatorVersionResponse.class, 
client.name());
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                try {
+                    try (Transaction tx = 
client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.put(key1, 1);
+                        cache.put(key2, 2);
+
+                        tx.commit();
+                    }
+
+                    fail();
+                }
+                catch (ClusterTopologyException e) {
+                    info("Expected exception: " + e);
+                }
+
+                return null;
+            }
+        }, "tx-thread");
+
+        crdSpi.waitForBlocked();
+
+        stopGrid(0);
+
+        fut.get();
+
+        assertNull(cache.get(key1));
+        assertNull(cache.get(key2));
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            cache.put(key1, 1);
+            cache.put(key2, 2);
+
+            tx.commit();
+        }
+
+        assertEquals(1, cache.get(key1));
+        assertEquals(2, cache.get(key2));
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.

Reply via email to