Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 d92cfa435 -> b201a9647


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b201a964
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b201a964
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b201a964

Branch: refs/heads/ignite-3479
Commit: b201a9647bb24692114a02aa2768dda844b82791
Parents: d92cfa4
Author: sboikov <[email protected]>
Authored: Fri Sep 29 11:29:31 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Sep 29 13:10:01 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  3 +
 .../dht/GridPartitionedGetFuture.java           |  6 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  2 +-
 .../processors/cache/mvcc/MvccQueryTracker.java | 34 ++++++---
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  2 +
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 76 +++++++++++---------
 .../testframework/junits/GridAbstractTest.java  |  2 +-
 7 files changed, 76 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8f03911..adce492 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1659,6 +1659,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 if (e.getCause() instanceof ClusterTopologyCheckedException)
                     throw (ClusterTopologyCheckedException)e.getCause();
 
+                if (!ctx.discovery().alive(node))
+                    throw new ClusterTopologyCheckedException("Failed to send 
message, node left: " + node.id());
+
                 throw new IgniteCheckedException("Failed to send message (node 
may have left the grid or " +
                     "TCP connection cannot be established due to firewall 
issues) " +
                     "[node=" + node + ", topic=" + topic +

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4bfd0fe..68bc705 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -133,9 +133,11 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         if (!cctx.mvccEnabled())
             return null;
 
-        assert mvccTracker != null;
+        MvccCoordinatorVersion ver = mvccTracker.mvccVersion();
 
-        return mvccTracker.mvccVersion();
+        assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + 
"]";
+
+        return ver;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index cfd6c4a..753ee33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -210,7 +210,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
                     MvccCoordinator(crdNode.id(), topVer, new 
AffinityTopologyVersion(topVer, 0)) : null;
 
                 if (crd != null)
-                    log.info("Assigned mvcc coordinator: " + crd);
+                    log.info("Assigned mvcc coordinator [crd=" + crd + ", 
crdNode=" + crdNode +']');
                 else
                     U.warn(log, "New mvcc coordinator was not assigned 
[topVer=" + topVer + ']');
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index a460820..360af4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -29,7 +29,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * TODO IGNITE-3478: make sure clean up is called when related future is 
forcibly finished, i.e. on cache stop
  */
 public class MvccQueryTracker {
     /** */
@@ -75,12 +75,18 @@ public class MvccQueryTracker {
     @Nullable public MvccCoordinatorVersion 
onMvccCoordinatorChange(MvccCoordinator newCrd) {
         synchronized (this) {
             if (mvccVer != null) {
-                mvccCrd = newCrd;
+                assert mvccCrd != null : this;
+
+                if (!mvccCrd.equals(newCrd)) {
+                    mvccCrd = newCrd; // Need notify new coordinator.
 
-                return mvccVer;
+                    return mvccVer;
+                }
+                else
+                    return null;
             }
             else if (mvccCrd != null)
-                mvccCrd = null;
+                mvccCrd = null; // Mark for remap.
 
             return null;
         }
@@ -100,7 +106,7 @@ public class MvccQueryTracker {
                 mvccCrd0 = mvccCrd;
                 mvccVer0 = mvccVer;
 
-                mvccVer = null;
+                mvccVer = null; // Mark as finished.
             }
         }
 
@@ -134,12 +140,15 @@ public class MvccQueryTracker {
 
                 return;
             }
-            else
+            else {
                 waitNextTopology(topVer);
+
+                return;
+            }
         }
 
         IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
-            cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+            cctx.shared().coordinators().requestQueryCounter(mvccCrd0);
 
         cntrFut.listen(new 
IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
             @Override public void 
apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
@@ -151,8 +160,13 @@ public class MvccQueryTracker {
                     boolean needRemap = false;
 
                     synchronized (MvccQueryTracker.this) {
-                        if (mvccCrd != null)
+                        assert mvccVer == null : "[this=" + 
MvccQueryTracker.this +
+                            ", ver=" + mvccVer +
+                            ", rcvdVer=" + rcvdVer + "]";
+
+                        if (mvccCrd != null) {
                             mvccVer = rcvdVer;
+                        }
                         else
                             needRemap = true;
                     }
@@ -167,7 +181,7 @@ public class MvccQueryTracker {
                     IgniteLogger log = cctx.logger(MvccQueryTracker.class);
 
                     if (log.isDebugEnabled())
-                        log.debug("Mvcc coordinator failed: " + e);
+                        log.debug("Mvcc coordinator failed, need remap: " + e);
                 }
                 catch (IgniteCheckedException e) {
                     lsnr.onMvccVersionError(e);
@@ -175,7 +189,7 @@ public class MvccQueryTracker {
                     return;
                 }
 
-                // Coordinator failed on reassigned, need remap.
+                // Coordinator failed or reassigned, need remap.
                 if (canRemap)
                     waitNextTopology(topVer);
                 else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 0810b0f..700b27d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -183,6 +183,8 @@ class PreviousCoordinatorQueries {
                         prevQueriesDone = activeQueries.isEmpty();
                 }
             }
+            else
+                nodeQueries.put(cntr, newQryCnt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 074c4f8..487f2d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1677,40 +1677,61 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromServer() throws 
Exception {
-        readInProgressCoordinatorFailsSimple(false);
+        for (int i = 1; i <= 3; i++) {
+            readInProgressCoordinatorFailsSimple(false, i);
+
+            afterTest();
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromClient() throws 
Exception {
-        readInProgressCoordinatorFailsSimple(true);
+        for (int i = 1; i <= 3; i++) {
+            readInProgressCoordinatorFailsSimple(true, i);
+
+            afterTest();
+        }
     }
 
     /**
      * @param fromClient {@code True} if read from client node, otherwise from 
server node.
+     * @param crdChangeCnt Number of coordinator changes.
      * @throws Exception If failed.
      */
-    private void readInProgressCoordinatorFailsSimple(boolean fromClient) 
throws Exception {
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int 
crdChangeCnt) throws Exception {
+        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient 
+ ", crdChangeCnt=" + crdChangeCnt + ']');
+
         testSpi = true;
 
-        startGrids(4);
+        client = false;
+
+        final int SRVS = 3;
+        final int COORDS = crdChangeCnt + 1;
+
+        startGrids(SRVS + COORDS);
 
         client = true;
 
-        assertTrue(startGrid(4).configuration().isClientMode());
+        assertTrue(startGrid(SRVS + COORDS).configuration().isClientMode());
 
-        final Ignite getNode = fromClient ? ignite(4) : ignite(1);
+        final Ignite getNode = fromClient ? ignite(SRVS + COORDS) : 
ignite(COORDS);
+
+        String[] excludeNodes = new String[COORDS];
+
+        for (int i = 0; i < COORDS; i++)
+            excludeNodes[i] = testNodeName(i);
 
         final IgniteCache cache = 
getNode.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
-            setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), 
getTestIgniteInstanceName(1))));
+            setNodeFilter(new TestCacheNodeExcludingFilter(excludeNodes)));
 
         final Set<Integer> keys = new HashSet<>();
 
-        List<Integer> keys1 = primaryKeys(jcache(2), 10);
+        List<Integer> keys1 = primaryKeys(jcache(COORDS), 10);
 
         keys.addAll(keys1);
-        keys.addAll(primaryKeys(jcache(3), 10));
+        keys.addAll(primaryKeys(jcache(COORDS + 1), 10));
 
         Map<Integer, Integer> vals = new HashMap();
 
@@ -1754,17 +1775,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
 
         getNodeSpi.waitForBlocked();
 
-        final IgniteInternalFuture releaseWaitFut = GridTestUtils.runAsync(new 
Callable() {
-            @Override public Object call() throws Exception {
-                Thread.sleep(3000);
-
-                getNodeSpi.stopBlock(true);
-
-                return null;
-            }
-        }, "stop-block");
-
-        stopGrid(0);
+        for (int i = 0; i < crdChangeCnt; i++)
+            stopGrid(i);
 
         for (int i = 0; i < 10; i++) {
             vals = new HashMap();
@@ -1779,7 +1791,8 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
             }
         }
 
-        releaseWaitFut.get();
+        getNodeSpi.stopBlock(true);
+
         getFut.get();
 
         for (Ignite node : G.allGrids())
@@ -1792,6 +1805,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     public void testReadInProgressCoordinatorFails() throws Exception {
         readInProgressCoordinatorFails(false);
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -2573,7 +2587,7 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
     private void checkActiveQueriesCleanup(Ignite node) throws Exception {
         final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
-        assertTrue("Active queries not cleared", 
GridTestUtils.waitForCondition(
+        assertTrue("Active queries not cleared: " + node.name(), 
GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
                     Object activeQueries = GridTestUtils.getFieldValue(crd, 
"activeQueries");
@@ -2601,28 +2615,20 @@ public class CacheMvccTransactionsTest extends 
GridCommonAbstractTest {
                 }
             }, 8_000)
         );
-        assertTrue("Previous coordinator queries not empty", 
GridTestUtils.waitForCondition(
+
+        assertTrue("Previous coordinator queries not empty: " + node.name(), 
GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
                     Map queries = GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "activeQueries");
+                    Boolean prevDone = GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "prevQueriesDone");
 
-                    if (!queries.isEmpty())
-                        log.info("Previous coordinator queries: " + queries);
+                    if (!queries.isEmpty() || !prevDone)
+                        log.info("Previous coordinator state [prevDone=" + 
prevDone + ", queries=" + queries + ']');
 
                     return queries.isEmpty();
                 }
             }, 8_000)
         );
-
-        if 
(node.cluster().localNode().id().equals(crd.currentCoordinatorId())) {
-            assertTrue("prevQueriesDone flag is not set", 
GridTestUtils.waitForCondition(
-                new GridAbsPredicate() {
-                    @Override public boolean apply() {
-                        return GridTestUtils.getFieldValue(crd, 
"prevCrdQueries", "prevQueriesDone");
-                    }
-                }, 8_000)
-            );
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b201a964/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 0dcd65e..094d14c 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1456,7 +1456,7 @@ public abstract class GridAbstractTest extends TestCase {
      * @param idx Index of the Ignite instance.
      * @return Indexed Ignite instance name.
      */
-    public String testNodeName(int idx) {
+    protected String testNodeName(int idx) {
         return getTestIgniteInstanceName(idx);
     }
 

Reply via email to