Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-2 e35b8a582 -> 5f3ddc5c4


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f3ddc5c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f3ddc5c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f3ddc5c

Branch: refs/heads/ignite-4154-2
Commit: 5f3ddc5c4130f63940570e27feae7d077b5b963a
Parents: e35b8a5
Author: sboikov <[email protected]>
Authored: Mon Nov 7 15:43:11 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 7 16:39:25 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopology.java           |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  28 +-
 .../GridDhtPartitionsExchangeFuture.java        |   4 +-
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  26 +-
 ...CacheExchangeMessageDuplicatedStateTest.java | 386 +++++++++++++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 10 files changed, 432 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 953ab8d..94a42a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -870,7 +870,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         
cacheCtx.affinity().affinityCache().similarAffinityKey());
 
                     if (exchId != null)
-                        m.addPartitionUpdateCounters(cacheCtx.cacheId(), 
cacheCtx.topology().updateCounters());
+                        m.addPartitionUpdateCounters(cacheCtx.cacheId(), 
cacheCtx.topology().updateCounters(true));
                 }
             }
         }
@@ -887,7 +887,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 top.similarAffinityKey());
 
             if (exchId != null)
-                m.addPartitionUpdateCounters(top.cacheId(), 
top.updateCounters());
+                m.addPartitionUpdateCounters(top.cacheId(), 
top.updateCounters(true));
         }
 
         return m;
@@ -994,7 +994,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     cacheCtx.affinity().affinityCache().similarAffinityKey());
 
                 if (sndCounters)
-                    m.partitionUpdateCounters(cacheCtx.cacheId(), 
cacheCtx.topology().updateCounters());
+                    m.partitionUpdateCounters(cacheCtx.cacheId(), 
cacheCtx.topology().updateCounters(true));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 4418b11..1ebbc51 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -886,7 +886,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 6e9b907..4ae4e47 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology {
         @Nullable Map<Integer, Long> cntrMap);
 
     /**
+     * @param skipZeros If {@code true} then filters out zero counters.
      * @return Partition update counters.
      */
-    public Map<Integer, Long> updateCounters();
+    public Map<Integer, Long> updateCounters(boolean skipZeros);
 
     /**
      * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 08a1c89..f3751ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1032,7 +1032,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
                     Long cntr = this.cntrMap.get(e.getKey());
 
-                    if ((cntr == null || cntr < e.getValue()) && 
!e.getValue().equals(ZERO))
+                    if (cntr == null || cntr < e.getValue())
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
@@ -1172,7 +1172,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
                     Long cntr = this.cntrMap.get(e.getKey());
 
-                    if ((cntr == null || cntr < e.getValue()) && 
!e.getValue().equals(ZERO))
+                    if (cntr == null || cntr < e.getValue())
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
@@ -1503,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
-            Map<Integer, Long> res = new HashMap<>(cntrMap);
+            Map<Integer, Long> res;
+
+            if (skipZeros) {
+                res = U.newHashMap(cntrMap.size());
+
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = e.getValue();
+
+                    if (ZERO.equals(cntr))
+                        continue;
+
+                    res.put(e.getKey(), cntr);
+                }
+            }
+            else
+                res = new HashMap<>(cntrMap);
 
             for (int i = 0; i < locParts.length; i++) {
                 GridDhtLocalPartition part = locParts[i];
@@ -1518,7 +1533,10 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 Long cntr0 = res.get(part.id());
                 long cntr1 = part.updateCounter();
 
-                if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0L)
+                if (skipZeros && cntr1 == 0L)
+                    continue;
+
+                if (cntr0 == null || cntr1 > cntr0)
                     res.put(part.id(), cntr1);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e5b4c2d..a79aba3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -546,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                     
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
-                    cacheCtx.topology().update(exchId, 
clientTop.partitionMap(true), clientTop.updateCounters());
+                    cacheCtx.topology().update(exchId, 
clientTop.partitionMap(true), clientTop.updateCounters(false));
             }
 
             top.updateTopologyVersion(exchId, this, updSeq, 
stopping(cacheCtx.cacheId()));
@@ -670,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                             if (top.cacheId() == cacheCtx.cacheId()) {
                                 cacheCtx.topology().update(exchId,
                                     top.partitionMap(true),
-                                    top.updateCounters());
+                                    top.updateCounters(false));
 
                                 break;
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3a559e7..9fd9b6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -239,7 +239,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                 GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
 
                                 if (cctx != null && cntrsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(), 
cctx.topology().updateCounters());
+                                    cntrsPerNode.put(ctx.localNodeId(), 
cctx.topology().updateCounters(false));
 
                                 routine.handler().updateCounters(topVer, 
cntrsPerNode, cntrs);
                             }
@@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && 
cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), 
cache.context().topology().updateCounters());
+                    req.addUpdateCounters(ctx.localNodeId(), 
cache.context().topology().updateCounters(false));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f929121..f8e38d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
-            TcpDiscoveryAbstractMessage msg = null;
+            TcpDiscoveryAbstractMessage msg;
 
             while (!Thread.currentThread().isInterrupted()) {
                 Socket sock;
@@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         continue;
                     }
 
-                    if (msg == null)
-                        msg = queue.poll();
+                    msg = queue.poll();
 
                     if (msg == null) {
                         mux.wait();
@@ -1121,19 +1120,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                 }
-                catch (IOException e) {
-                    if (log.isDebugEnabled())
-                        U.error(log, "Failed to send node left message (will 
stop anyway) " +
-                            "[sock=" + sock + ", msg=" + msg + ']', e);
-
-                    U.closeQuiet(sock);
-
-                    synchronized (mux) {
-                        if (sock == this.sock)
-                            this.sock = null; // Connection has dead.
-                    }
-                }
-                catch (IgniteCheckedException e) {
+                catch (Exception e) {
                     if (spi.getSpiContext().isStopping()) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to send message, node is 
stopping [msg=" + msg + ", err=" + e + ']');
@@ -1141,7 +1128,12 @@ class ClientImpl extends TcpDiscoveryImpl {
                     else
                         U.error(log, "Failed to send message: " + msg, e);
 
-                    msg = null;
+                    U.closeQuiet(sock);
+
+                    synchronized (mux) {
+                        if (sock == this.sock)
+                            this.sock = null; // Connection has dead.
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
new file mode 100644
index 0000000..d07fdd3
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheExchangeMessageDuplicatedStateTest extends 
GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String AFF1_CACHE1 = "a1c1";
+
+    /** */
+    private static final String AFF1_CACHE2 = "a1c2";
+
+    /** */
+    private static final String AFF2_CACHE1 = "a2c1";
+
+    /** */
+    private static final String AFF2_CACHE2 = "a2c2";
+
+    /** */
+    private static final String AFF3_CACHE1 = "a3c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE1 = "a4c1";
+
+    /** */
+    private static final String AFF4_FILTER_CACHE2 = "a4c2";
+
+    /** */
+    private static final String AFF5_FILTER_CACHE1 = "a5c1";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+
+        commSpi.record(GridDhtPartitionsSingleMessage.class, 
GridDhtPartitionsFullMessage.class);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE1);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF1_CACHE2);
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE1);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF2_CACHE2);
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF3_CACHE1);
+            ccfg.setBackups(3);
+
+            RendezvousAffinityFunction aff = new 
RendezvousAffinityFunction(false, 64);
+            ccfg.setAffinity(aff);
+
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF4_FILTER_CACHE2);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new RendezvousAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+        {
+            CacheConfiguration ccfg = new CacheConfiguration();
+            ccfg.setName(AFF5_FILTER_CACHE1);
+            ccfg.setNodeFilter(new TestNodeFilter());
+            ccfg.setAffinity(new FairAffinityFunction());
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new 
CacheConfiguration[ccfgs.size()]));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExchangeMessages() throws Exception {
+        ignite(0);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, true);
+
+        client = true;
+
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(0, false);
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        checkMessages(1, true);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     * @param checkSingle {@code True} if need check single messages.
+     */
+    private void checkMessages(int crdIdx, boolean checkSingle) {
+        checkFullMessages(crdIdx);
+
+        if (checkSingle)
+            checkSingleMessages(crdIdx);
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkFullMessages(int crdIdx) {
+        TestRecordingCommunicationSpi commSpi0 =
+            
(TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi();
+
+        List<Object> msgs = commSpi0.recordedMessages(false);
+
+        assertTrue(msgs.size() > 0);
+
+        for (Object msg : msgs) {
+            assertTrue("Unexpected messages: " + msg, msg instanceof 
GridDhtPartitionsFullMessage);
+
+            checkFullMessage((GridDhtPartitionsFullMessage)msg);
+        }
+    }
+
+    /**
+     * @param crdIdx Coordinator node index.
+     */
+    private void checkSingleMessages(int crdIdx) {
+        int cnt = 0;
+
+        for (Ignite ignite : Ignition.allGrids()) {
+            if (getTestGridName(crdIdx).equals(ignite.name()) || 
ignite.configuration().isClientMode())
+                continue;
+
+            TestRecordingCommunicationSpi commSpi0 =
+                
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+            List<Object> msgs = commSpi0.recordedMessages(false);
+
+            assertTrue(msgs.size() > 0);
+
+            for (Object msg : msgs) {
+                assertTrue("Unexpected messages: " + msg, msg instanceof 
GridDhtPartitionsSingleMessage);
+
+                checkSingleMessage((GridDhtPartitionsSingleMessage)msg);
+            }
+
+            cnt++;
+        }
+
+        assertTrue(cnt > 0);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, 
"dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, 
msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = 
GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) {
+        Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, 
"dupPartsData");
+
+        assertNotNull(dupPartsData);
+
+        checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg);
+        checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, 
dupPartsData, msg);
+
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1)));
+        assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1)));
+
+        Map<Integer, Map<Integer, Long>> partCntrs = 
GridTestUtils.getFieldValue(msg, "partCntrs");
+
+        if (partCntrs != null) {
+            for (Map<Integer, Long> cntrs : partCntrs.values())
+                assertTrue(cntrs.isEmpty());
+        }
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkFullMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsFullMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions();
+
+        GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId);
+
+        for (GridDhtPartitionMap2 map : emptyFullMap.values())
+            assertEquals(0, map.map().size());
+
+        GridDhtPartitionFullMap fullMap = parts.get(dupCacheId);
+
+        for (GridDhtPartitionMap2 map : fullMap.values())
+            assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     * @param cache1 Cache 1.
+     * @param cache2 Cache 2.
+     * @param dupPartsData Duplicated data map.
+     * @param msg Message.
+     */
+    private void checkSingleMessage(String cache1,
+        String cache2,
+        Map<Integer, Integer> dupPartsData,
+        GridDhtPartitionsSingleMessage msg)
+    {
+        Integer cacheId;
+        Integer dupCacheId;
+
+        if (dupPartsData.containsKey(CU.cacheId(cache1))) {
+            cacheId = CU.cacheId(cache1);
+            dupCacheId = CU.cacheId(cache2);
+        }
+        else {
+            cacheId = CU.cacheId(cache2);
+            dupCacheId = CU.cacheId(cache1);
+        }
+
+        assertTrue(dupPartsData.containsKey(cacheId));
+        assertEquals(dupCacheId, dupPartsData.get(cacheId));
+        assertFalse(dupPartsData.containsKey(dupCacheId));
+
+        Map<Integer, GridDhtPartitionMap2> parts = msg.partitions();
+
+        GridDhtPartitionMap2 emptyMap = parts.get(cacheId);
+
+        assertEquals(0, emptyMap.map().size());
+
+        GridDhtPartitionMap2 map = parts.get(dupCacheId);
+
+        assertFalse(map.map().isEmpty());
+    }
+
+    /**
+     *
+     */
+    private static class TestNodeFilter implements 
IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            // Do not start cache on coordinator.
+            return node.order() > 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 1b7fe2b..d2cb710 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -537,7 +537,7 @@ public abstract class 
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             Affinity<Object> aff = grid(i).affinity(null);
 
-            Map<Integer, Long> act = 
grid(i).cachex(null).context().topology().updateCounters();
+            Map<Integer, Long> act = 
grid(i).cachex(null).context().topology().updateCounters(false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
                 if 
(aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc412a9..ffb0539 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheConfigurationLeakTest;
 import 
org.apache.ignite.internal.processors.cache.CacheDhtLocalPartitionAfterRemoveSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheEnumOperationsSingleNodeTest;
 import org.apache.ignite.internal.processors.cache.CacheEnumOperationsTest;
+import 
org.apache.ignite.internal.processors.cache.CacheExchangeMessageDuplicatedStateTest;
 import 
org.apache.ignite.internal.processors.cache.CrossCacheTxRandomOperationsTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAtomicMessageCountSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheFinishPartitionsSelfTest;
@@ -262,6 +263,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class));
 
+        suite.addTest(new 
TestSuite(CacheExchangeMessageDuplicatedStateTest.class));
+
         return suite;
     }
 }

Reply via email to