Merge with master - WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30633571 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30633571 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30633571 Branch: refs/heads/ignite-3477 Commit: 30633571f878f794c5ba73beb9ecfd9c9f52aa7e Parents: 9dc652d Author: Ilya Lantukh <[email protected]> Authored: Thu Dec 22 17:28:04 2016 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Thu Dec 22 17:28:04 2016 +0300 ---------------------------------------------------------------------- .../internal/client/GridClientClusterState.java | 33 +++++++++ .../router/impl/GridRouterClientImpl.java | 2 +- .../GridCachePartitionExchangeManager.java | 4 +- .../dht/GridClientPartitionTopology.java | 70 ++++++++++---------- .../dht/GridDhtPartitionTopology.java | 18 +++-- .../dht/GridDhtPartitionTopologyImpl.java | 54 ++++++++++++--- .../GridNearAtomicAbstractUpdateRequest.java | 2 - .../GridDhtPartitionsExchangeFuture.java | 2 +- .../IgniteCacheFullApiSelfTestSuite.java | 2 - 9 files changed, 127 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java new file mode 100644 index 0000000..4fa25ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientClusterState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +/** + * Interface for manage state of grid cluster. + */ +public interface GridClientClusterState { + /** + * @param active {@code True} activate, {@code False} deactivate. + */ + public void active(boolean active) throws GridClientException; + + /** + * @return {@code Boolean} - Current cluster state. {@code True} active, {@code False} inactive. + */ + public boolean active() throws GridClientException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java index 1dd366b..12e2cc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java @@ -22,11 +22,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientClosedException; +import org.apache.ignite.internal.client.GridClientClusterState; import org.apache.ignite.internal.client.GridClientCompute; import org.apache.ignite.internal.client.GridClientConfiguration; import org.apache.ignite.internal.client.GridClientData; import org.apache.ignite.internal.client.GridClientException; -import org.apache.ignite.internal.client.GridClientClusterState; import org.apache.ignite.internal.client.GridClientNode; import org.apache.ignite.internal.client.GridClientPredicate; import org.apache.ignite.internal.client.GridClientProtocol; http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6a9ea1b..c63da75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1243,7 +1243,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null); + updated |= top.update(null, entry.getValue(), null) != null; } if (!cctx.kernalContext().clientNode() && updated) @@ -1292,7 +1292,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) { - updated |= top.update(null, entry.getValue(), null, true); + updated |= top.update(null, entry.getValue(), null) != null; cctx.affinity().checkRebalanceState(top, cacheId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 33de577..ca71f51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -550,7 +550,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap) { if (log.isDebugEnabled()) @@ -564,7 +564,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return false; + return null; } if (node2part != null && node2part.compareTo(partMap) >= 0) { @@ -572,7 +572,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); - return false; + return null; } updateSeq.incrementAndGet(); @@ -635,7 +635,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); - return false; + return null; } finally { lock.writeLock().unlock(); @@ -643,10 +643,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - Map<Integer, T2<Long, Long>> cntrMap, - boolean checkEvictions) { + Map<Integer, T2<Long, Long>> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -655,21 +655,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); - return false; + return null; } lock.writeLock().lock(); try { if (stopping) - return false; + return null; if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + lastExchangeId + ", exchId=" + exchId + ']'); - return false; + return null; } if (exchId != null) @@ -686,45 +686,43 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); - return false; + return null; } long updateSeq = this.updateSeq.incrementAndGet(); - node2part.updateSequence(updateSeq); + node2part = new GridDhtPartitionFullMap(node2part, updateSeq); - boolean changed = cur == null || !cur.equals(parts); + boolean changed = false; - if (changed) { - node2part.put(parts.nodeId(), parts); + if (cur == null || !cur.equals(parts)) + changed = true; - // Add new mappings. - for (Integer p : parts.keySet()) { - Set<UUID> ids = part2node.get(p); + node2part.put(parts.nodeId(), parts); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + part2node = new HashMap<>(part2node); - ids.add(parts.nodeId()); - } + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : cur.keySet()) { - if (parts.containsKey(p)) - continue; + changed |= ids.add(parts.nodeId()); + } - Set<UUID> ids = part2node.get(p); + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = part2node.get(p); - if (ids != null) - ids.remove(parts.nodeId()); - } + if (ids != null) + changed |= ids.remove(parts.nodeId()); } } - else - cur.updateSequence(parts.updateSequence(), parts.topologyVersion()); if (cntrMap != null) { for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { @@ -740,7 +738,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Partition map after single update: " + fullMapString()); - return changed; + return changed ? localPartitionMap() : null; } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 5918da8..ac3e2c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -216,9 +216,9 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param partMap Update partition map. * @param cntrMap Partition update counters. - * @return {@code True} if topology state changed. + * @return Local partition map if there were evictions or {@code null} otherwise. */ - public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, T2<Long, Long>> cntrMap); @@ -226,13 +226,11 @@ public interface GridDhtPartitionTopology { * @param exchId Exchange ID. * @param parts Partitions. * @param cntrMap Partition update counters. - * @param checkEvictions Check evictions flag. - * @return {@code True} if topology state changed. + * @return Local partition map if there were evictions or {@code null} otherwise. */ - @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - @Nullable Map<Integer, T2<Long, Long>> cntrMap, - boolean checkEvictions); + @Nullable Map<Integer, T2<Long, Long>> cntrMap); /** * Checks if there is at least one owner for each partition in the cache topology. @@ -279,6 +277,12 @@ public interface GridDhtPartitionTopology { public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); /** + * @param nodeId Node to get partitions for. + * @return Partitions for node. + */ + @Nullable public GridDhtPartitionMap2 partitions(UUID nodeId); + + /** * Prints memory stats. * * @param threshold Threshold for number of entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ea3b7c6..f22c263 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1429,7 +1429,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); if (marked) - updateLocal(locPart.id(), cctx.localNodeId(), locPart.state(), updSeq); + updateLocal(locPart.id(), locPart.state(), updSeq); changed |= marked; } @@ -1494,7 +1494,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh boolean marked = locPart.own(); if (marked) - updateLocal(locPart.id(), cctx.localNodeId(), locPart.state(), updSeq); + updateLocal(locPart.id(), locPart.state(), updSeq); } for (UUID nodeId : nodeIds) { @@ -1582,6 +1582,42 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * @param updateSeq Update sequence. + * @return {@code True} if state changed. + */ + private boolean checkEvictions(long updateSeq) { + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + boolean changed = false; + + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + + changed = checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } + + return changed; + } + + /** {@inheritDoc} */ + @Override public void checkEvictions() { + lock.writeLock().lock(); + + try { + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part.newUpdateSequence(updateSeq); + + checkEvictions(updateSeq); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param updateSeq Update sequence. * @param aff Affinity assignments. * @return Checks if any of the local partitions need to be evicted. */ @@ -1824,15 +1860,15 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) { - lock.readLock().lock(); + lock.readLock().lock(); - try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } + try { + return node2part.get(nodeId); } + finally { + lock.readLock().unlock(); + } + } /** {@inheritDoc} */ @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) { http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index d5f8b64..b933186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -133,8 +133,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa */ public abstract boolean hasPrimary(); - public abstract boolean recovery(); - /** * @param res Response. * @return {@code True} if current response was {@code null}. http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5d90e35..5bb7536 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1746,7 +1746,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); else { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); http://git-wip-us.apache.org/repos/asf/ignite/blob/30633571/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java index 17757ab..7d4fee1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAto import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderFairAffinityMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderMultiNodeP2PDisabledFullApiSelfTest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderOffHeapTieredFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWrityOrderOffHeapMultiNodeFullApiSelfTest; @@ -231,7 +230,6 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite { suite.addTestSuite(GridCachePartitionedFullApiMultithreadedSelfTest.class); // Disabled striped pool. - suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.class); suite.addTestSuite(GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.class); // Other.
