Repository: ignite Updated Branches: refs/heads/ignite-3477 d139e6476 -> c66bb4f8f
- fixed lock order for interrupt lock/topology write lock - GridCacheTtlManager: disable near cache eviction logic for local cache Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c66bb4f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c66bb4f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c66bb4f8 Branch: refs/heads/ignite-3477 Commit: c66bb4f8f6a85607b262e863eb66489a56905b8c Parents: d139e64 Author: sboikov <[email protected]> Authored: Fri Dec 30 13:11:34 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 30 13:11:34 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheTtlManager.java | 10 +- .../dht/GridDhtPartitionTopologyImpl.java | 126 +++++++++---------- .../GridDhtPartitionsExchangeFuture.java | 38 ++++-- 3 files changed, 91 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index a336a80..bc09066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.processors.cache; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; @@ -31,8 +29,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -40,7 +36,6 @@ import org.jsr166.LongAdder8; * Eagerly removes expired entries from cache when * {@link CacheConfiguration#isEagerTtl()} flag is set. */ -@SuppressWarnings("NakedNotify") public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ private GridConcurrentSkipListSetEx pendingEntries; @@ -87,7 +82,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { cctx.shared().ttl().register(this); - pendingEntries = cctx.config().getNearConfiguration() != null ? new GridConcurrentSkipListSetEx() : null; + pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration() != null) ? new GridConcurrentSkipListSetEx() : null; } /** {@inheritDoc} */ @@ -156,7 +151,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { public boolean expire(int amount) { long now = U.currentTimeMillis(); - try { if (pendingEntries != null) { GridNearCacheAdapter nearCache = cctx.near(); @@ -186,7 +180,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { boolean more = cctx.offheap().expire(expireC, amount); if (more) - return more; + return true; if (amount != -1 && pendingEntries != null) { EntryWrapper e = pendingEntries.firstx(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f22c263..0dd836d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -70,7 +70,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Partition topology. */ -@GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +@GridToStringExclude +class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -123,7 +124,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; /** */ - private volatile boolean treatAllPartitionAsLocal; + private volatile boolean treatAllPartAsLoc; /** * @param cctx Context. @@ -490,7 +491,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh ClusterState newState = exchFut.newClusterState(); - treatAllPartitionAsLocal = (newState != null && newState == ClusterState.ACTIVE) + treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) || (cctx.kernalContext().state().active() && discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() @@ -504,64 +505,64 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh cctx.shared().database().checkpointReadLock(); - try { - U.writeLock(lock); - } - catch (IgniteInterruptedCheckedException e) { - cctx.shared().database().checkpointReadUnlock(); + synchronized (cctx.shared().exchange().interruptLock()) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteCheckedException("Thread is interrupted: " + Thread.currentThread()); - throw e; - } + try { + U.writeLock(lock); + } + catch (IgniteInterruptedCheckedException e) { + cctx.shared().database().checkpointReadUnlock(); - try { - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + throw e; + } - if (stopping) - return; + try { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + if (stopping) + return; - if (exchId.isLeft()) - removeNode(exchId.nodeId()); + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; - ClusterNode oldest = currentCoordinator(); + if (exchId.isLeft()) + removeNode(exchId.nodeId()); - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + ClusterNode oldest = currentCoordinator(); - long updateSeq = this.updateSeq.incrementAndGet(); + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - cntrMap.clear(); + long updateSeq = this.updateSeq.incrementAndGet(); - // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + cntrMap.clear(); - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + // If this is the oldest node. + if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); - } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - } + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - synchronized (cctx.shared().exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - throw new IgniteCheckedException("Thread is interrupted: " + Thread.currentThread()); + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + } if (affReady) initPartitions0(exchFut, updateSeq); @@ -570,18 +571,18 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh createPartitions(aff, updateSeq); } - } - consistencyCheck(); + consistencyCheck(); - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + finally { + lock.writeLock().unlock(); - cctx.shared().database().checkpointReadUnlock(); + cctx.shared().database().checkpointReadUnlock(); + } } // Wait for evictions. @@ -590,7 +591,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { - treatAllPartitionAsLocal = false; + treatAllPartAsLoc = false; boolean changed = waitForRent(); @@ -767,17 +768,16 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (loc != null && state == EVICTED) { locParts.set(p, loc = null); - if (!treatAllPartitionAsLocal && !belongs) + if (!treatAllPartAsLoc && !belongs) throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); } - else if (loc != null && state == RENTING && cctx.allowFastEviction()) { + else if (loc != null && state == RENTING && cctx.allowFastEviction()) throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted."); - } if (loc == null) { - if (!treatAllPartitionAsLocal && !belongs) + if (!treatAllPartAsLoc && !belongs) throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); @@ -989,7 +989,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh List<ClusterNode> nodes = new ArrayList<>(size); for (UUID id : nodeIds) { - if (topVer.topologyVersion() > 0 && !allIds.contains(id)) + if (topVer.topologyVersion() > 0 && !F.contains(allIds, id)) continue; if (hasState(p, id, state, states)) { @@ -1905,7 +1905,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh continue; if (cntr0 == null || cntr1 > cntr0.get1()) - res.put(part.id(), new T2<Long, Long>(cntr1, part.updateCounter())); + res.put(part.id(), new T2<>(cntr1, part.updateCounter())); } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index eb98fa6..53c2dbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1491,18 +1491,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - private static class CounterWithNodes { - private final long cnt; - - private final Set<UUID> nodes = new HashSet<>(); - - private CounterWithNodes(long cnt, UUID firstNode) { - this.cnt = cnt; - - nodes.add(firstNode); - } - } - /** * Detect lost partitions. */ @@ -2048,4 +2036,30 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT "srvNodes", srvNodes, "super", super.toString()); } + + /** + * + */ + private static class CounterWithNodes { + /** */ + private final long cnt; + + /** */ + private final Set<UUID> nodes = new HashSet<>(); + + /** + * @param cnt Count. + * @param firstNode Node ID. + */ + private CounterWithNodes(long cnt, UUID firstNode) { + this.cnt = cnt; + + nodes.add(firstNode); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CounterWithNodes.class, this); + } + } }
