Repository: ignite Updated Branches: refs/heads/ignite-4154-2 e35b8a582 -> 5f3ddc5c4
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f3ddc5c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f3ddc5c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f3ddc5c Branch: refs/heads/ignite-4154-2 Commit: 5f3ddc5c4130f63940570e27feae7d077b5b963a Parents: e35b8a5 Author: sboikov <[email protected]> Authored: Mon Nov 7 15:43:11 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 7 16:39:25 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 6 +- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopology.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 28 +- .../GridDhtPartitionsExchangeFuture.java | 4 +- .../continuous/GridContinuousProcessor.java | 4 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 26 +- ...CacheExchangeMessageDuplicatedStateTest.java | 386 +++++++++++++++++++ ...ContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../testsuites/IgniteCacheTestSuite2.java | 3 + 10 files changed, 432 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 953ab8d..94a42a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -870,7 +870,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cacheCtx.affinity().affinityCache().similarAffinityKey()); if (exchId != null) - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); } } } @@ -887,7 +887,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top.similarAffinityKey()); if (exchId != null) - m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true)); } return m; @@ -994,7 +994,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cacheCtx.affinity().affinityCache().similarAffinityKey()); if (sndCounters) - m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 4418b11..1ebbc51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -886,7 +886,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 6e9b907..4ae4e47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology { @Nullable Map<Integer, Long> cntrMap); /** + * @param skipZeros If {@code true} then filters out zero counters. * @return Partition update counters. */ - public Map<Integer, Long> updateCounters(); + public Map<Integer, Long> updateCounters(boolean skipZeros); /** * @param part Partition to own. http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 08a1c89..f3751ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1032,7 +1032,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { Long cntr = this.cntrMap.get(e.getKey()); - if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO)) + if (cntr == null || cntr < e.getValue()) this.cntrMap.put(e.getKey(), e.getValue()); } @@ -1172,7 +1172,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { Long cntr = this.cntrMap.get(e.getKey()); - if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO)) + if (cntr == null || cntr < e.getValue()) this.cntrMap.put(e.getKey(), e.getValue()); } @@ -1503,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public Map<Integer, Long> updateCounters() { + @Override public Map<Integer, Long> updateCounters(boolean skipZeros) { lock.readLock().lock(); try { - Map<Integer, Long> res = new HashMap<>(cntrMap); + Map<Integer, Long> res; + + if (skipZeros) { + res = U.newHashMap(cntrMap.size()); + + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = e.getValue(); + + if (ZERO.equals(cntr)) + continue; + + res.put(e.getKey(), cntr); + } + } + else + res = new HashMap<>(cntrMap); for (int i = 0; i < locParts.length; i++) { GridDhtLocalPartition part = locParts[i]; @@ -1518,7 +1533,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { Long cntr0 = res.get(part.id()); long cntr1 = part.updateCounter(); - if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0L) + if (skipZeros && cntr1 == 0L) + continue; + + if (cntr0 == null || cntr1 > cntr0) res.put(part.id(), cntr1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e5b4c2d..a79aba3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -546,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); if (updateTop && clientTop != null) - cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters()); + cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); } top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); @@ -670,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (top.cacheId() == cacheCtx.cacheId()) { cacheCtx.topology().update(exchId, top.partitionMap(true), - top.updateCounters()); + top.updateCounters(false)); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 3a559e7..9fd9b6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -239,7 +239,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheContext cctx = interCache != null ? interCache.context() : null; if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); + cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false)); routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } @@ -1049,7 +1049,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); if (cache != null && !cache.isLocal() && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); + req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index f929121..f8e38d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1049,7 +1049,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - TcpDiscoveryAbstractMessage msg = null; + TcpDiscoveryAbstractMessage msg; while (!Thread.currentThread().isInterrupted()) { Socket sock; @@ -1063,8 +1063,7 @@ class ClientImpl extends TcpDiscoveryImpl { continue; } - if (msg == null) - msg = queue.poll(); + msg = queue.poll(); if (msg == null) { mux.wait(); @@ -1121,19 +1120,7 @@ class ClientImpl extends TcpDiscoveryImpl { } } } - catch (IOException e) { - if (log.isDebugEnabled()) - U.error(log, "Failed to send node left message (will stop anyway) " + - "[sock=" + sock + ", msg=" + msg + ']', e); - - U.closeQuiet(sock); - - synchronized (mux) { - if (sock == this.sock) - this.sock = null; // Connection has dead. - } - } - catch (IgniteCheckedException e) { + catch (Exception e) { if (spi.getSpiContext().isStopping()) { if (log.isDebugEnabled()) log.debug("Failed to send message, node is stopping [msg=" + msg + ", err=" + e + ']'); @@ -1141,7 +1128,12 @@ class ClientImpl extends TcpDiscoveryImpl { else U.error(log, "Failed to send message: " + msg, e); - msg = null; + U.closeQuiet(sock); + + synchronized (mux) { + if (sock == this.sock) + this.sock = null; // Connection has dead. + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java new file mode 100644 index 0000000..d07fdd3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String AFF1_CACHE1 = "a1c1"; + + /** */ + private static final String AFF1_CACHE2 = "a1c2"; + + /** */ + private static final String AFF2_CACHE1 = "a2c1"; + + /** */ + private static final String AFF2_CACHE2 = "a2c2"; + + /** */ + private static final String AFF3_CACHE1 = "a3c1"; + + /** */ + private static final String AFF4_FILTER_CACHE1 = "a4c1"; + + /** */ + private static final String AFF4_FILTER_CACHE2 = "a4c2"; + + /** */ + private static final String AFF5_FILTER_CACHE1 = "a5c1"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.record(GridDhtPartitionsSingleMessage.class, GridDhtPartitionsFullMessage.class); + + cfg.setCommunicationSpi(commSpi); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE1); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF1_CACHE2); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE1); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF2_CACHE2); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF3_CACHE1); + ccfg.setBackups(3); + + RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 64); + ccfg.setAffinity(aff); + + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF4_FILTER_CACHE2); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new RendezvousAffinityFunction()); + ccfgs.add(ccfg); + } + { + CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(AFF5_FILTER_CACHE1); + ccfg.setNodeFilter(new TestNodeFilter()); + ccfg.setAffinity(new FairAffinityFunction()); + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + } + + /** + * @throws Exception If failed. + */ + public void testExchangeMessages() throws Exception { + ignite(0); + + startGrid(1); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + startGrid(2); + + awaitPartitionMapExchange(); + + checkMessages(0, true); + + client = true; + + startGrid(3); + + awaitPartitionMapExchange(); + + checkMessages(0, false); + + stopGrid(0); + + awaitPartitionMapExchange(); + + checkMessages(1, true); + } + + /** + * @param crdIdx Coordinator node index. + * @param checkSingle {@code True} if need check single messages. + */ + private void checkMessages(int crdIdx, boolean checkSingle) { + checkFullMessages(crdIdx); + + if (checkSingle) + checkSingleMessages(crdIdx); + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkFullMessages(int crdIdx) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite(crdIdx).configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsFullMessage); + + checkFullMessage((GridDhtPartitionsFullMessage)msg); + } + } + + /** + * @param crdIdx Coordinator node index. + */ + private void checkSingleMessages(int crdIdx) { + int cnt = 0; + + for (Ignite ignite : Ignition.allGrids()) { + if (getTestGridName(crdIdx).equals(ignite.name()) || ignite.configuration().isClientMode()) + continue; + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + List<Object> msgs = commSpi0.recordedMessages(false); + + assertTrue(msgs.size() > 0); + + for (Object msg : msgs) { + assertTrue("Unexpected messages: " + msg, msg instanceof GridDhtPartitionsSingleMessage); + + checkSingleMessage((GridDhtPartitionsSingleMessage)msg); + } + + cnt++; + } + + assertTrue(cnt > 0); + } + + /** + * @param msg Message. + */ + private void checkFullMessage(GridDhtPartitionsFullMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkFullMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkFullMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param msg Message. + */ + private void checkSingleMessage(GridDhtPartitionsSingleMessage msg) { + Map<Integer, Integer> dupPartsData = GridTestUtils.getFieldValue(msg, "dupPartsData"); + + assertNotNull(dupPartsData); + + checkSingleMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF2_CACHE1, AFF2_CACHE2, dupPartsData, msg); + checkSingleMessage(AFF4_FILTER_CACHE1, AFF4_FILTER_CACHE2, dupPartsData, msg); + + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF3_CACHE1))); + assertFalse(dupPartsData.containsKey(CU.cacheId(AFF5_FILTER_CACHE1))); + + Map<Integer, Map<Integer, Long>> partCntrs = GridTestUtils.getFieldValue(msg, "partCntrs"); + + if (partCntrs != null) { + for (Map<Integer, Long> cntrs : partCntrs.values()) + assertTrue(cntrs.isEmpty()); + } + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkFullMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsFullMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionFullMap> parts = msg.partitions(); + + GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId); + + for (GridDhtPartitionMap2 map : emptyFullMap.values()) + assertEquals(0, map.map().size()); + + GridDhtPartitionFullMap fullMap = parts.get(dupCacheId); + + for (GridDhtPartitionMap2 map : fullMap.values()) + assertFalse(map.map().isEmpty()); + } + + /** + * @param cache1 Cache 1. + * @param cache2 Cache 2. + * @param dupPartsData Duplicated data map. + * @param msg Message. + */ + private void checkSingleMessage(String cache1, + String cache2, + Map<Integer, Integer> dupPartsData, + GridDhtPartitionsSingleMessage msg) + { + Integer cacheId; + Integer dupCacheId; + + if (dupPartsData.containsKey(CU.cacheId(cache1))) { + cacheId = CU.cacheId(cache1); + dupCacheId = CU.cacheId(cache2); + } + else { + cacheId = CU.cacheId(cache2); + dupCacheId = CU.cacheId(cache1); + } + + assertTrue(dupPartsData.containsKey(cacheId)); + assertEquals(dupCacheId, dupPartsData.get(cacheId)); + assertFalse(dupPartsData.containsKey(dupCacheId)); + + Map<Integer, GridDhtPartitionMap2> parts = msg.partitions(); + + GridDhtPartitionMap2 emptyMap = parts.get(cacheId); + + assertEquals(0, emptyMap.map().size()); + + GridDhtPartitionMap2 map = parts.get(dupCacheId); + + assertFalse(map.map().isEmpty()); + } + + /** + * + */ + private static class TestNodeFilter implements IgnitePredicate<ClusterNode> { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + // Do not start cache on coordinator. + return node.order() > 1; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 1b7fe2b..d2cb710 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -537,7 +537,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = grid(i).affinity(null); - Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(); + Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(false); for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) http://git-wip-us.apache.org/repos/asf/ignite/blob/5f3ddc5c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index dc412a9..ffb0539 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.CacheConfigurationLeakTest; import org.apache.ignite.internal.processors.cache.CacheDhtLocalPartitionAfterRemoveSelfTest; import org.apache.ignite.internal.processors.cache.CacheEnumOperationsSingleNodeTest; import org.apache.ignite.internal.processors.cache.CacheEnumOperationsTest; +import org.apache.ignite.internal.processors.cache.CacheExchangeMessageDuplicatedStateTest; import org.apache.ignite.internal.processors.cache.CrossCacheTxRandomOperationsTest; import org.apache.ignite.internal.processors.cache.GridCacheAtomicMessageCountSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheFinishPartitionsSelfTest; @@ -262,6 +263,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteNoCustomEventsOnNodeStart.class)); + suite.addTest(new TestSuite(CacheExchangeMessageDuplicatedStateTest.class)); + return suite; } }
