Repository: ignite Updated Branches: refs/heads/ignite-462-2 cd486cbe4 -> 70c9562ca
IGNITE-426 Fixed tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70c9562c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70c9562c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70c9562c Branch: refs/heads/ignite-462-2 Commit: 70c9562cadbd51dbaedf7566a7d922eef1df06f1 Parents: cd486cb Author: Tikhonov Nikolay <[email protected]> Authored: Wed Nov 4 20:57:20 2015 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Wed Nov 4 20:57:20 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 - .../dht/GridDhtPartitionTopologyImpl.java | 69 +++++++++++--------- ...ContinuousQueryFailoverAbstractSelfTest.java | 7 +- ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 3 +- 4 files changed, 42 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/70c9562c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0065403..ceeb0b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -23,10 +23,8 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; http://git-wip-us.apache.org/repos/asf/ignite/blob/70c9562c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6d64bd2..b3e60ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -605,6 +605,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Created local partition: " + loc); } + + Long cntr = this.cntrMap.get(p); + + if (cntr != null) + loc.updateCounter(cntr); } finally { lock.writeLock().unlock(); @@ -861,6 +866,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; + if (cntrMap != null) { + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = this.cntrMap.get(part.id()); + + if (cntr != null) + part.updateCounter(cntr); + } + } + if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + @@ -929,22 +950,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part2node = p2n; - if (cntrMap != null) { - for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (GridDhtLocalPartition part : locParts.values()) { - Long cntr = cntrMap.get(part.id()); - - if (cntr != null) - part.updateCounter(cntr); - } - } - boolean changed = checkEvictions(updateSeq); updateRebalanceVersion(); @@ -982,6 +987,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; + if (cntrMap != null) { + for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = this.cntrMap.get(part.id()); + + if (cntr != null) + part.updateCounter(cntr); + } + } + if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + @@ -1042,22 +1063,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (cntrMap != null) { - for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (GridDhtLocalPartition part : locParts.values()) { - Long cntr = cntrMap.get(part.id()); - - if (cntr != null) - part.updateCounter(cntr); - } - } - changed |= checkEvictions(updateSeq); updateRebalanceVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/70c9562c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index a0b1878..dc0d1ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -394,10 +394,10 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @param nodes Count nodes. * @param killedNodeIdx Killed node index. - * @param updCntrs Update counters. + * @param expCntrs Update counters. * @return {@code True} if counters matches. */ - private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) { + private boolean checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> expCntrs) throws Exception { for (int i = 0; i < nodes; i++) { if (i == killedNodeIdx) continue; @@ -406,10 +406,9 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Map<Integer, Long> act = grid(i).cachex(null).context().topology().updateCounters(); - for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { + for (Map.Entry<Integer, Long> e : expCntrs.entrySet()) if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) assertEquals(e.getValue(), act.get(e.getKey())); - } } return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/70c9562c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java index e33db45..5d33cbf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java @@ -26,7 +26,8 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; /** * */ -public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { +public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest extends + CacheContinuousQueryFailoverAbstractSelfTest { /** {@inheritDoc} */ @Override protected CacheAtomicWriteOrderMode writeOrderMode() { return PRIMARY;
