Performance optimizations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b02ad0de Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b02ad0de Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b02ad0de Branch: refs/heads/master Commit: b02ad0deaae78424356f9a4b1748fc43b21eac03 Parents: 7cb3e68 Author: yzhdanov <[email protected]> Authored: Fri Feb 17 14:10:20 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Feb 17 14:10:20 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalGatewayImpl.java | 25 +- .../org/apache/ignite/internal/GridTopic.java | 2 +- .../client/util/GridClientConsistentHash.java | 14 +- .../managers/communication/GridIoManager.java | 29 ++- .../discovery/GridDiscoveryManager.java | 258 +++++-------------- .../eventstorage/GridEventStorageManager.java | 34 ++- .../affinity/GridAffinityAssignmentCache.java | 8 +- .../cache/CacheAffinitySharedManager.java | 2 +- .../cache/GridCacheEvictionManager.java | 60 +++-- .../processors/cache/GridCacheGateway.java | 48 ++-- .../processors/cache/GridCacheIoManager.java | 19 +- .../processors/cache/GridCacheUtils.java | 6 +- .../cache/affinity/GridCacheAffinityImpl.java | 2 +- .../dht/GridClientPartitionTopology.java | 13 +- .../dht/GridDhtAssignmentFetchFuture.java | 5 +- .../distributed/dht/GridDhtCacheAdapter.java | 4 +- .../dht/GridDhtPartitionTopologyImpl.java | 14 +- .../GridDhtAtomicAbstractUpdateFuture.java | 7 +- .../GridDhtAtomicAbstractUpdateRequest.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 16 +- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +- .../GridDhtAtomicSingleUpdateRequest.java | 5 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 10 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 31 ++- .../GridNearAtomicSingleUpdateFuture.java | 7 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 7 +- .../atomic/GridNearAtomicUpdateResponse.java | 35 ++- .../dht/preloader/GridDhtPreloader.java | 39 ++- .../near/GridNearSingleGetRequest.java | 5 + .../cache/transactions/IgniteTxHandler.java | 4 +- .../cache/version/GridCacheVersion.java | 2 +- .../cache/version/GridCacheVersionManager.java | 2 +- .../clock/GridClockSyncProcessor.java | 2 +- .../ignite/internal/util/GridBusyLock.java | 2 +- .../util/StripedCompositeReadWriteLock.java | 50 +++- .../ignite/internal/util/StripedExecutor.java | 10 +- .../nio/GridAbstractCommunicationClient.java | 37 +-- .../util/nio/GridCommunicationClient.java | 5 - .../communication/tcp/TcpCommunicationSpi.java | 9 + .../org/apache/ignite/thread/IgniteThread.java | 26 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 55 ---- .../discovery/GridDiscoveryManagerSelfTest.java | 214 --------------- .../testsuites/IgniteKernalSelfTestSuite.java | 5 +- .../ignite/tools/classgen/ClassesGenerator.java | 4 +- 45 files changed, 462 insertions(+), 684 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index fe8c580..7cbf84a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -22,9 +22,11 @@ import java.io.Serializable; import java.io.StringWriter; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -40,7 +42,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** */ @GridToStringExclude - private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); + private final ReadWriteLock rwLock = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()); /** */ @GridToStringExclude @@ -73,13 +76,15 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { if (stackTrace == null) stackTrace = stackTrace(); - rwLock.readLock(); + Lock lock = rwLock.readLock(); + + lock.lock(); GridKernalState state = this.state.get(); if (state != GridKernalState.STARTED) { // Unlock just acquired lock. - rwLock.readUnlock(); + lock.unlock(); if (state == GridKernalState.DISCONNECTED) { assert reconnectFut != null; @@ -96,7 +101,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { if (stackTrace == null) stackTrace = stackTrace(); - rwLock.readLock(); + rwLock.readLock().lock(); if (state.get() == GridKernalState.DISCONNECTED) throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); @@ -104,7 +109,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public void readUnlock() { - rwLock.readUnlock(); + rwLock.readLock().unlock(); } /** {@inheritDoc} */ @@ -118,7 +123,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { // Busy wait is intentional. while (true) try { - if (rwLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) + if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS)) break; else Thread.sleep(200); @@ -135,7 +140,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public boolean tryWriteLock(long timeout) throws InterruptedException { - boolean acquired = rwLock.tryWriteLock(timeout, TimeUnit.MILLISECONDS); + boolean acquired = rwLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS); if (acquired) { if (stackTrace == null) @@ -194,7 +199,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public void writeUnlock() { - rwLock.writeUnlock(); + rwLock.writeLock().unlock(); } /** {@inheritDoc} */ @@ -222,4 +227,4 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { @Override public String toString() { return S.toString(GridKernalGatewayImpl.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 2962540..c2e0452 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -769,4 +769,4 @@ public enum GridTopic { return S.toString(T8.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java index 8134906..0c9a3fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java @@ -439,13 +439,9 @@ public class GridClientConsistentHash<N> { /** {@inheritDoc} */ @Override public String toString() { - StringBuilder sb = new StringBuilder(getClass().getSimpleName()); - - sb.append(" [affSeed=").append(affSeed). - append(", circle=").append(circle). - append(", nodesComp=").append(nodesComp). - append(", nodes=").append(nodes).append("]"); - - return sb.toString(); + return getClass().getSimpleName() + " [affSeed=" + affSeed + + ", circle=" + circle + + ", nodesComp=" + nodesComp + + ", nodes=" + nodes + "]"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 84b4543..108ecd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; @@ -57,7 +58,7 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; @@ -160,7 +161,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final Marshaller marsh; /** Busy lock. */ - private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); + private final ReadWriteLock busyLock = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()); /** Lock to sync maps access. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -577,7 +579,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // Busy wait is intentional. while (true) { try { - if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS)) + if (busyLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS)) break; else Thread.sleep(200); @@ -601,7 +603,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa stopping = true; } finally { - busyLock.writeUnlock(); + busyLock.writeLock().unlock(); } } @@ -623,7 +625,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert nodeId != null; assert msg != null; - busyLock.readLock(); + Lock busyLock0 = busyLock.readLock(); + + busyLock0.lock(); try { if (stopping) { @@ -712,7 +716,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa U.error(log, "Failed to process message (will ignore): " + msg, e); } finally { - busyLock.readUnlock(); + busyLock0.unlock(); } } @@ -798,11 +802,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); - if (msg0.processFromNioThread()) { + if (msg0.processFromNioThread()) c.run(); + else + ctx.getStripedExecutorService().execute(-1, c); - return; - } + return; } if (ctx.config().getStripedPoolSize() > 0 && @@ -2173,7 +2178,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - busyLock.readLock(); + Lock lock = busyLock.readLock(); + + lock.lock(); try { if (stopping) { @@ -2251,7 +2258,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } finally { - busyLock.readUnlock(); + lock.unlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 71d8ad9..9ea707d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.managers.discovery; -import java.io.Externalizable; import java.io.Serializable; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; @@ -74,6 +73,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -144,9 +144,6 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP; * Discovery SPI manager. */ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { - /** Fake key for {@code null}-named caches. Used inside {@link DiscoCache}. */ - private static final String NULL_CACHE_NAME = UUID.randomUUID().toString(); - /** Metrics update frequency. */ private static final long METRICS_UPDATE_FREQ = 3000; @@ -1577,7 +1574,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).allNodes(); + return resolveDiscoCache(CU.cacheId(null), topVer).allNodes(); } /** @@ -1585,7 +1582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return All server nodes for given topology version. */ public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).srvNodes; + return resolveDiscoCache(CU.cacheId(null), topVer).srvNodes; } /** @@ -1596,7 +1593,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Node. */ public ClusterNode node(AffinityTopologyVersion topVer, UUID id) { - return resolveDiscoCache(null, topVer).node(id); + return resolveDiscoCache(CU.cacheId(null), topVer).node(id); } /** @@ -1607,49 +1604,38 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache nodes. */ public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).cacheNodes(cacheName, topVer.topologyVersion()); + return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName, topVer.topologyVersion()); } /** - * Gets all nodes with at least one cache configured. + * Gets cache nodes for cache with given ID. * + * @param cacheId Cache ID. * @param topVer Topology version. * @return Collection of cache nodes. */ - public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).allNodesWithCaches(topVer.topologyVersion()); + public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) { + return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId, topVer.topologyVersion()); } /** - * Gets cache remote nodes for cache with given name. - * - * @param topVer Topology version. - * @return Collection of cache nodes. - */ - public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).remoteCacheNodes(topVer.topologyVersion()); - } - - /** - * Gets cache nodes for cache with given name. + * Gets all nodes with at least one cache configured. * - * @param cacheName Cache name. * @param topVer Topology version. * @return Collection of cache nodes. */ - Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).aliveCacheNodes(cacheName, topVer.topologyVersion()); + public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(topVer.topologyVersion()); } /** * Gets cache remote nodes for cache with given name. * - * @param cacheName Cache name. * @param topVer Topology version. * @return Collection of cache nodes. */ - Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).aliveRemoteCacheNodes(cacheName, topVer.topologyVersion()); + public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).remoteCacheNodes(topVer.topologyVersion()); } /** @@ -1657,7 +1643,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Oldest alive server nodes with at least one cache configured. */ @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) { - DiscoCache cache = resolveDiscoCache(null, topVer); + DiscoCache cache = resolveDiscoCache(CU.cacheId(null), topVer); Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry(); @@ -1672,7 +1658,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of cache affinity nodes. */ public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).cacheAffinityNodes(cacheName, topVer.topologyVersion()); + int cacheId = CU.cacheId(cacheName); + + return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion()); + } + + /** + * Gets cache nodes for cache with given ID that participate in affinity calculation. + * + * @param cacheId Cache ID. + * @param topVer Topology version. + * @return Collection of cache affinity nodes. + */ + public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) { + return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion()); } /** @@ -1742,31 +1741,34 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Checks if cache with given name has at least one node with near cache enabled. + * Checks if cache with given ID has at least one node with near cache enabled. * - * @param cacheName Cache name. + * @param cacheId Cache ID. * @param topVer Topology version. * @return {@code True} if cache with given name has at least one node with near cache enabled. */ - public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheName, topVer).hasNearCache(cacheName); + public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) { + return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId); } /** * Gets discovery cache for given topology version. * - * @param cacheName Cache name (participates in exception message). + * @param cacheId Cache ID (participates in exception message). * @param topVer Topology version. * @return Discovery cache. */ - private DiscoCache resolveDiscoCache(@Nullable String cacheName, AffinityTopologyVersion topVer) { + private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) { Snapshot snap = topSnap.get(); DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ? snap.discoCache : discoCacheHist.get(topVer); if (cache == null) { - throw new IgniteException("Failed to resolve nodes topology [cacheName=" + cacheName + + DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId); + + throw new IgniteException("Failed to resolve nodes topology [" + + "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") + ", topVer=" + topVer + ", history=" + discoCacheHist.keySet() + ", snap=" + snap + @@ -2093,19 +2095,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { evts.add(new GridTuple5<>(type, topVer, node, topSnapshot, data)); } - /** - * @param node Node to get a short description for. - * @return Short description for the node to be used in 'quiet' mode. - */ - private String quietNode(ClusterNode node) { - assert node != null; - - return "nodeId8=" + node.id().toString().substring(0, 8) + ", " + - "addrs=" + U.addressesAsString(node) + ", " + - "order=" + node.order() + ", " + - "CPUs=" + node.metrics().getTotalCpus(); - } - /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { while (!isCancelled()) { @@ -2415,11 +2404,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Topology await version. */ private long awaitVer; - /** Empty constructor required by {@link Externalizable}. */ - private DiscoTopologyFuture() { - // No-op. - } - /** * @param ctx Context. * @param awaitVer Await version. @@ -2509,19 +2493,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Cache nodes by cache name. */ @GridToStringInclude - private final Map<String, Collection<ClusterNode>> allCacheNodes; - - /** Remote cache nodes by cache name. */ - @GridToStringInclude - private final Map<String, Collection<ClusterNode>> rmtCacheNodes; + private final Map<Integer, Collection<ClusterNode>> allCacheNodes; /** Cache nodes by cache name. */ @GridToStringInclude - private final Map<String, Collection<ClusterNode>> affCacheNodes; + private final Map<Integer, Collection<ClusterNode>> affCacheNodes; /** Caches where at least one node has near cache enabled. */ @GridToStringInclude - private final Set<String> nearEnabledCaches; + private final Set<Integer> nearEnabledCaches; /** Nodes grouped by version. */ private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer; @@ -2539,18 +2519,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final long maxOrder; /** - * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes; - - /** - * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link - * #maskNull(String)} before passing raw cache names to it. - */ - private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; - - /** * Cached alive server remote nodes with caches. */ private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches; @@ -2578,20 +2546,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { allNodes = Collections.unmodifiableList(all); - Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f); - Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); - Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); + Map<Integer, Collection<ClusterNode>> cacheMap = U.newHashMap(allNodes.size()); + Map<Integer, Collection<ClusterNode>> dhtNodesMap = U.newHashMap(allNodes.size()); + Collection<ClusterNode> nodesWithCaches = U.newHashSet(allNodes.size()); + Collection<ClusterNode> rmtNodesWithCaches = U.newHashSet(allNodes.size()); - aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); - aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; - Set<String> nearEnabledSet = new HashSet<>(); + Set<Integer> nearEnabledSet = new HashSet<>(); List<ClusterNode> srvNodes = new ArrayList<>(); @@ -2620,21 +2585,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { addToMap(cacheMap, cacheName, node); - if (alive(node.id())) - addToMap(aliveCacheNodes, maskNull(cacheName), node); - if (filter.dataNode(node)) addToMap(dhtNodesMap, cacheName, node); if (filter.nearNode(node)) - nearEnabledSet.add(cacheName); - - if (!loc.id().equals(node.id())) { - addToMap(rmtCacheMap, cacheName, node); - - if (alive(node.id())) - addToMap(aliveRmtCacheNodes, maskNull(cacheName), node); - } + nearEnabledSet.add(CU.cacheId(cacheName)); hasCaches = true; } @@ -2674,7 +2629,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { maxOrder = maxOrder0; allCacheNodes = Collections.unmodifiableMap(cacheMap); - rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap); affCacheNodes = Collections.unmodifiableMap(dhtNodesMap); allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); @@ -2684,7 +2638,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { daemonNodes = Collections.unmodifiableList(new ArrayList<>( F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON)))); - Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f); + Map<UUID, ClusterNode> nodeMap = U.newHashMap(allNodes().size() + daemonNodes.size()); for (ClusterNode n : F.concat(false, allNodes(), daemonNodes())) nodeMap.put(n.id(), n); @@ -2699,13 +2653,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. * @param rich Node to add */ - private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { - Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName); + private void addToMap(Map<Integer, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + Collection<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); if (cacheNodes == null) { cacheNodes = new ArrayList<>(allNodes.size()); - cacheMap.put(cacheName, cacheNodes); + cacheMap.put(CU.cacheId(cacheName), cacheNodes); } cacheNodes.add(rich); @@ -2727,28 +2681,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets collection of nodes which have version equal or greater than {@code ver}. - * - * @param ver Version to check. - * @return Collection of nodes with version equal or greater than {@code ver}. - */ - Collection<ClusterNode> elderNodes(IgniteProductVersion ver) { - Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver); - - if (entry == null) - return Collections.emptyList(); - - return entry.getValue(); - } - - /** - * @return Versions map. - */ - NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() { - return nodesByVer; - } - - /** * Gets collection of nodes with at least one cache configured. * * @param topVer Topology version (maximum allowed node order). @@ -2766,61 +2698,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return Collection of nodes. */ Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, allCacheNodes.get(cacheName)); + return filter(topVer, allCacheNodes.get(CU.cacheId(cacheName))); } /** - * Gets all remote nodes that have at least one cache configured. + * Gets all nodes that have cache with given ID. * + * @param cacheId Cache ID. * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> remoteCacheNodes(final long topVer) { - return filter(topVer, rmtNodesWithCaches); - } - - /** - * Gets all nodes that have cache with given name and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of nodes. - */ - Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, affCacheNodes.get(cacheName)); + Collection<ClusterNode> cacheNodes(Integer cacheId, final long topVer) { + return filter(topVer, allCacheNodes.get(cacheId)); } /** - * Gets all alive nodes that have cache with given name. + * Gets all remote nodes that have at least one cache configured. * - * @param cacheName Cache name. * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveCacheNodes.get(maskNull(cacheName))); + Collection<ClusterNode> remoteCacheNodes(final long topVer) { + return filter(topVer, rmtNodesWithCaches); } /** - * Gets all alive remote nodes that have cache with given name. + * Gets all nodes that have cache with given ID and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. * - * @param cacheName Cache name. + * @param cacheId Cache ID. * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) { - return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName))); + Collection<ClusterNode> cacheAffinityNodes(int cacheId, final long topVer) { + return filter(topVer, affCacheNodes.get(cacheId)); } /** - * Checks if cache with given name has at least one node with near cache enabled. + * Checks if cache with given ID has at least one node with near cache enabled. * - * @param cacheName Cache name. + * @param cacheId Cache ID. * @return {@code True} if cache with given name has at least one node with near cache enabled. */ - boolean hasNearCache(@Nullable String cacheName) { - return nearEnabledCaches.contains(cacheName); + boolean hasNearCache(int cacheId) { + return nearEnabledCaches.contains(cacheId); } /** @@ -2832,51 +2753,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (leftNode.order() > maxOrder) return; - filterNodeMap(aliveCacheNodes, leftNode); - - filterNodeMap(aliveRmtCacheNodes, leftNode); - aliveSrvNodesWithCaches.remove(leftNode); } /** - * Creates a copy of nodes map without the given node. - * - * @param map Map to copy. - * @param exclNode Node to exclude. - */ - private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { - for (String cacheName : registeredCaches.keySet()) { - String maskedName = maskNull(cacheName); - - while (true) { - Collection<ClusterNode> oldNodes = map.get(maskedName); - - if (oldNodes == null || oldNodes.isEmpty()) - break; - - Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes); - - if (!newNodes.remove(exclNode)) - break; - - if (map.replace(maskedName, oldNodes, newNodes)) - break; - } - } - } - - /** - * Replaces {@code null} with {@code NULL_CACHE_NAME}. - * - * @param cacheName Cache name. - * @return Masked name. - */ - private String maskNull(@Nullable String cacheName) { - return cacheName == null ? NULL_CACHE_NAME : cacheName; - } - - /** * @param topVer Topology version. * @param nodes Nodes. * @return Filtered collection (potentially empty, but never {@code null}). http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 607bb96..b5d5ee2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -100,6 +100,9 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> /** Events of these types should be recorded. */ private volatile int[] inclEvtTypes; + /** */ + private boolean stopped; + /** * Maps event type to boolean ({@code true} for recordable events). * This array is used for listeners notification. It may be wider, @@ -212,7 +215,16 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @return {@code true} if entered to busy state. */ private boolean enterBusy() { - return busyLock.readLock().tryLock(); + if (!busyLock.readLock().tryLock()) + return false; + + if (stopped) { + busyLock.readLock().unlock(); + + return false; + } + + return true; } /** @@ -225,15 +237,23 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> /** {@inheritDoc} */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) @Override public void onKernalStop0(boolean cancel) { - // Acquire write lock so that any new thread could not be started. busyLock.writeLock().lock(); - if (msgLsnr != null) - ctx.io().removeMessageListener(TOPIC_EVENT, msgLsnr); + try { + if (msgLsnr != null) + ctx.io().removeMessageListener( + TOPIC_EVENT, + msgLsnr); + + msgLsnr = null; - msgLsnr = null; + lsnrs.clear(); - lsnrs.clear(); + stopped = true; + } + finally { + busyLock.writeLock().unlock(); + } } /** {@inheritDoc} */ @@ -1203,4 +1223,4 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> return lsnr.hashCode(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a388c7a..144b162 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -63,7 +63,7 @@ public class GridAffinityAssignmentCache { private final String cacheName; /** */ - private final Integer cacheId; + private final int cacheId; /** Number of backups. */ private final int backups; @@ -169,7 +169,7 @@ public class GridAffinityAssignmentCache { /** * @return Cache ID. */ - public Integer cacheId() { + public int cacheId() { return cacheId; } @@ -266,7 +266,7 @@ public class GridAffinityAssignmentCache { List<ClusterNode> sorted; if (!locCache) { - sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer)); + sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheId(), topVer)); Collections.sort(sorted, GridNodeOrderComparator.INSTANCE); } @@ -617,4 +617,4 @@ public class GridAffinityAssignmentCache { return S.toString(AffinityReadyFuture.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 2890887..7bf5fd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -843,7 +843,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return true; // If local node did not initiate exchange or local node is the only cache node in grid. - Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheName(), fut.topologyVersion()); + Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion()); DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index f8722d6..9284143 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -160,6 +160,9 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { /** Stopping flag. */ private volatile boolean stopping; + /** Stopped flag. */ + private boolean stopped; + /** Current future. */ private final AtomicReference<EvictionFuture> curEvictFut = new AtomicReference<>(); @@ -311,19 +314,28 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { busyLock.block(); - // Stop backup worker. - if (evictSync && !cctx.isNear() && backupWorker != null) { - backupWorker.cancel(); + try { + // Stop backup worker. + if (evictSync && !cctx.isNear() && backupWorker != null) { + backupWorker.cancel(); - U.join(backupWorkerThread, log); - } + U.join( + backupWorkerThread, + log); + } - // Cancel all active futures. - for (EvictionFuture fut : futs.values()) - fut.cancel(); + // Cancel all active futures. + for (EvictionFuture fut : futs.values()) + fut.cancel(); - if (log.isDebugEnabled()) - log.debug("Eviction manager stopped on node: " + cctx.nodeId()); + if (log.isDebugEnabled()) + log.debug("Eviction manager stopped on node: " + cctx.nodeId()); + } + finally { + stopped = true; + + busyLock.unblock(); + } } /** @@ -345,7 +357,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { log.debug("Processing eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() + ", res=" + res + ']'); - if (!busyLock.enterBusy()) + if (!enterBusy()) return; try { @@ -363,6 +375,22 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { } /** + * @return {@code True} if entered busy. + */ + private boolean enterBusy() { + if (!busyLock.enterBusy()) + return false; + + if (stopped) { + busyLock.leaveBusy(); + + return false; + } + + return true; + } + + /** * @param nodeId Sender node ID. * @param req Request. */ @@ -370,7 +398,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { assert nodeId != null; assert req != null; - if (!busyLock.enterBusy()) + if (!enterBusy()) return; try { @@ -811,7 +839,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (!cctx.isNear() && evictSync && !cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), topVer)) return; - if (!busyLock.enterBusy()) + if (!enterBusy()) return; try { @@ -1145,7 +1173,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - if (!busyLock.enterBusy()) { + if (!enterBusy()) { if (log.isDebugEnabled()) log.debug("Will not notify eviction future completion (grid is stopping): " + f); @@ -1187,7 +1215,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { * @param topVer Topology version on future complete. */ private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) { - if (!busyLock.enterBusy()) + if (!enterBusy()) return; try { @@ -1366,7 +1394,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (!evictSyncAgr) return; - if (!busyLock.enterBusy()) + if (!enterBusy()) return; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 1562d70..1bf9468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -17,13 +17,15 @@ package org.apache.ignite.internal.processors.cache; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -45,7 +47,8 @@ public class GridCacheGateway<K, V> { private IgniteFuture<?> reconnectFut; /** */ - private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); + private StripedCompositeReadWriteLock rwLock = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()); /** * @param ctx Cache context. @@ -63,7 +66,7 @@ public class GridCacheGateway<K, V> { if (ctx.deploymentEnabled()) ctx.deploy().onEnter(); - rwLock.readLock(); + rwLock.readLock().lock(); checkState(true, true); } @@ -78,7 +81,7 @@ public class GridCacheGateway<K, V> { if (state != State.STARTED) { if (lock) - rwLock.readUnlock(); + rwLock.readLock().unlock(); if (state == State.STOPPED) { if (stopErr) @@ -106,7 +109,7 @@ public class GridCacheGateway<K, V> { onEnter(); // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop. - rwLock.readLock(); + rwLock.readLock().lock(); return checkState(true, false); } @@ -139,10 +142,10 @@ public class GridCacheGateway<K, V> { */ public void leave() { try { - leaveNoLock(); + leaveNoLock(); } finally { - rwLock.readUnlock(); + rwLock.readLock().unlock(); } } @@ -168,7 +171,9 @@ public class GridCacheGateway<K, V> { onEnter(); - rwLock.readLock(); + Lock lock = rwLock.readLock(); + + lock.lock(); checkState(true, true); @@ -178,7 +183,7 @@ public class GridCacheGateway<K, V> { return setOperationContextPerCall(opCtx); } catch (Throwable e) { - rwLock.readUnlock(); + lock.unlock(); throw e; } @@ -219,7 +224,7 @@ public class GridCacheGateway<K, V> { leaveNoLock(prev); } finally { - rwLock.readUnlock(); + rwLock.readLock().unlock(); } } @@ -269,14 +274,14 @@ public class GridCacheGateway<K, V> { * */ public void writeLock(){ - rwLock.writeLock(); + rwLock.writeLock().lock(); } /** * */ public void writeUnlock() { - rwLock.writeUnlock(); + rwLock.writeLock().unlock(); } /** @@ -295,15 +300,14 @@ public class GridCacheGateway<K, V> { boolean interrupted = false; while (true) { - if (rwLock.tryWriteLock()) - break; - else { - try { + try { + if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS)) + break; + else U.sleep(200); - } - catch (IgniteInterruptedCheckedException ignore) { - interrupted = true; - } + } + catch (IgniteInterruptedCheckedException | InterruptedException ignore) { + interrupted = true; } } @@ -314,7 +318,7 @@ public class GridCacheGateway<K, V> { state.set(State.STOPPED); } finally { - rwLock.writeUnlock(); + rwLock.writeLock().unlock(); } } @@ -331,4 +335,4 @@ public class GridCacheGateway<K, V> { /** */ STOPPED } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 924ce79..d20310b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -74,7 +75,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; @@ -120,7 +121,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { private boolean stopping; /** Mutex. */ - private final GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); + private final StripedCompositeReadWriteLock rw = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()); /** Deployment enabled. */ private boolean depEnabled; @@ -316,7 +318,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { // Busy wait is intentional. while (true) { try { - if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) + if (rw.writeLock().tryLock(200, TimeUnit.MILLISECONDS)) break; else Thread.sleep(200); @@ -335,7 +337,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { stopping = true; } finally { - rw.writeUnlock(); + rw.writeLock().unlock(); } } @@ -347,7 +349,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg, final IgniteBiInClosure<UUID, GridCacheMessage> c) { - rw.readLock(); + Lock lock = rw.readLock(); + + lock.lock(); try { if (stopping) { @@ -378,7 +382,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled) cctx.deploy().ignoreOwnership(false); - rw.readUnlock(); + lock.unlock(); } } @@ -821,9 +825,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) { try { - // We will not end up with storing a bunch of new UUIDs - // in each cache entry, since node ID is stored in NIO session - // on handshake. c.apply(nodeId, msg); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e274485..1c59390 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -454,7 +454,7 @@ public class GridCacheUtils { * that may have already left). */ public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheNodes(ctx.namex(), topOrder); + return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder); } /** @@ -487,7 +487,7 @@ public class GridCacheUtils { * @return All nodes on which cache with the same name is started. */ public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) { - return ctx.discovery().cacheAffinityNodes(ctx.namex(), AffinityTopologyVersion.NONE); + return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), AffinityTopologyVersion.NONE); } /** @@ -498,7 +498,7 @@ public class GridCacheUtils { * @return Affinity nodes. */ public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheAffinityNodes(ctx.namex(), topOrder); + return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 11361a2..41b3281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -196,7 +196,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { int nodesCnt; if (!cctx.isLocal()) - nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.name(), topVer).size(); + nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), topVer).size(); else nodesCnt = 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 7c1f760..a1fbd72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -88,7 +88,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ - private GridDhtTopologyFuture topReadyFut; + private volatile GridDhtTopologyFuture topReadyFut; /** */ private final GridAtomicLong updateSeq = new GridAtomicLong(1); @@ -216,16 +216,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public GridDhtTopologyFuture topologyVersionFuture() { - lock.readLock().lock(); - - try { - assert topReadyFut != null; + assert topReadyFut != null; - return topReadyFut; - } - finally { - lock.readLock().unlock(); - } + return topReadyFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index d1e3780..b5cb5cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -78,9 +78,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin AffinityTopologyVersion topVer ) { this.ctx = ctx; - this.key = new T2<>(CU.cacheId(cacheName), topVer); + int cacheId = CU.cacheId(cacheName); + this.key = new T2<>(cacheId, topVer); - Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer); LinkedList<ClusterNode> tmp = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index f5865e6..1cd3cfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1199,8 +1199,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (expVer.equals(curVer)) return false; - Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); - Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer); if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 75a275c..966a186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -99,7 +99,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private volatile boolean stopping; /** A future that will be completed when topology with version topVer will be ready to use. */ - private GridDhtTopologyFuture topReadyFut; + private volatile GridDhtTopologyFuture topReadyFut; /** */ private final GridAtomicLong updateSeq = new GridAtomicLong(1); @@ -311,16 +311,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public GridDhtTopologyFuture topologyVersionFuture() { - lock.readLock().lock(); - - try { - assert topReadyFut != null; + assert topReadyFut != null; - return topReadyFut; - } - finally { - lock.readLock().unlock(); - } + return topReadyFut; } /** {@inheritDoc} */ @@ -752,6 +745,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part != null) list.add(part); } + return list; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 1b175d0..4cb113e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -111,10 +111,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes) { + GridNearAtomicUpdateResponse updateRes + ) { this.cctx = cctx; - futVer = cctx.versions().next(updateReq.topologyVersion()); + this.futVer = cctx.isLocalNode(updateRes.nodeId()) ? + cctx.versions().next(updateReq.topologyVersion()) : // Generate new if request mapped to local. + updateReq.futureVersion(); this.updateReq = updateReq; this.completionCb = completionCb; this.updateRes = updateRes; http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index f0bea07..deb9ce4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -134,7 +134,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + long updateCntr ); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1b6179e..4745ff7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -1775,6 +1774,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled()); + res.partition(req.partition()); + assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); GridDhtAtomicAbstractUpdateFuture dhtFut = null; @@ -2435,7 +2436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); boolean readersOnly = false; @@ -2670,7 +2671,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = req.topologyVersion(); - boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); + boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer); CacheStorePartialUpdateException storeErr = null; @@ -2996,7 +2997,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { for (GridCacheMapEntry entry : locked) { if (entry != null && entry.deleted()) { if (skip == null) - skip = new HashSet<>(locked.size(), 1.0f); + skip = U.newHashSet(locked.size()); skip.add(entry.key()); } @@ -3142,7 +3143,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { AffinityTopologyVersion topVer = updateReq.topologyVersion(); - Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); + Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer); // We are on primary node for some key. assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + @@ -3186,7 +3187,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (msgLog.isDebugEnabled()) - msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']'); + msgLog.debug("Received near atomic update response " + + "[futId=" + res.futureVersion() + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); @@ -3217,6 +3219,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(), ctx.deploymentEnabled()); + res.partition(req.partition()); + Boolean replicate = ctx.isDrEnabled(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 20d6e90..0dc2754 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -67,7 +67,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { - super(cctx, completionCb, writeVer, updateReq, updateRes); + super(cctx, + completionCb, + writeVer, + updateReq, + updateRes); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index 0af7cf5..a7e6c24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -161,7 +161,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + long updateCntr ) { assert entryProcessor == null; assert ttl <= 0 : ttl; @@ -177,8 +177,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat if (addPrevVal) this.prevVal = prevVal; - if (updateCntr != null) - this.updateCntr = updateCntr; + this.updateCntr = updateCntr; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index efb35c4..5429adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -66,7 +66,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { - super(cctx, completionCb, writeVer, updateReq, updateRes); + super(cctx, + completionCb, + writeVer, + updateReq, + updateRes); keys = new ArrayList<>(updateReq.size()); mappings = U.newHashMap(updateReq.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 1854e52..7144963 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -227,7 +227,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateCntr + long updateCntr ) { keys.add(key); @@ -248,12 +248,10 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque prevVals.add(prevVal); } - if (updateCntr != null) { - if (updateCntrs == null) - updateCntrs = new GridLongList(); + if (updateCntrs == null) + updateCntrs = new GridLongList(); - updateCntrs.add(updateCntr); - } + updateCntrs.add(updateCntr); // In case there is no conflict, do not create the list. if (conflictVer != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index ff12af0..c3d3ca9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -69,6 +69,9 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> nearEvicted; + /** */ + private int partId = -1; + /** * Empty constructor required by {@link Externalizable}. */ @@ -157,6 +160,18 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri nearEvicted.add(key); } + /** + * @param partId Partition ID to set. + */ + public void partition(int partId) { + this.partId = partId; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -234,6 +249,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri writer.incrementState(); + case 7: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + } return true; @@ -282,6 +303,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri reader.incrementState(); + case 7: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtAtomicUpdateResponse.class); @@ -294,7 +323,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 891a20c..0a816a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -379,12 +379,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override protected void mapOnTopology() { - cache.topology().readLock(); - AffinityTopologyVersion topVer; - GridCacheVersion futVer; + cache.topology().readLock(); + try { if (cache.topology().stopping()) { onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + @@ -454,7 +453,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda updVer = this.updVer; if (updVer == null) { - updVer = cctx.versions().next(topVer); + updVer = futVer; if (log.isDebugEnabled()) log.debug("Assigned fast-map version for update on near node: " + updVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 2315a18..f182ecb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -488,12 +488,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** {@inheritDoc} */ @Override protected void mapOnTopology() { - cache.topology().readLock(); - AffinityTopologyVersion topVer; - GridCacheVersion futVer; + cache.topology().readLock(); + try { if (cache.topology().stopping()) { onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + @@ -628,7 +627,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu updVer = this.updVer; if (updVer == null) { - updVer = cctx.versions().next(topVer); + updVer = futVer; if (log.isDebugEnabled()) log.debug("Assigned fast-map version for update on near node: " + updVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/b02ad0de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 2e38733..22e01ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -105,6 +105,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Near expire times. */ private GridLongList nearExpireTimes; + /** Partition ID. */ + private int partId = -1; + /** * Empty constructor required by {@link Externalizable}. */ @@ -154,6 +157,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** + * @param partId Partition ID for proper striping on near node. + */ + public void partition(int partId) { + this.partId = partId; + } + + /** * Sets update error. * * @param err Error. @@ -431,6 +441,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return addDepInfo; } @@ -510,12 +525,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 12: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); case 13: + if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 14: if (!writer.writeMessage("ret", ret)) return false; @@ -610,7 +631,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 12: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + partId = reader.readInt("partId"); if (!reader.isLastRead()) return false; @@ -618,6 +639,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 13: + remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: ret = reader.readMessage("ret"); if (!reader.isLastRead()) @@ -637,7 +666,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } /** {@inheritDoc} */
