Repository: ignite
Updated Branches:
  refs/heads/ignite-3477 d139e6476 -> c66bb4f8f


- fixed lock order for interrupt lock/topology write lock
- GridCacheTtlManager: disable near cache eviction logic for local cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c66bb4f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c66bb4f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c66bb4f8

Branch: refs/heads/ignite-3477
Commit: c66bb4f8f6a85607b262e863eb66489a56905b8c
Parents: d139e64
Author: sboikov <[email protected]>
Authored: Fri Dec 30 13:11:34 2016 +0300
Committer: sboikov <[email protected]>
Committed: Fri Dec 30 13:11:34 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheTtlManager.java   |  10 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 126 +++++++++----------
 .../GridDhtPartitionsExchangeFuture.java        |  38 ++++--
 3 files changed, 91 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index a336a80..bc09066 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
@@ -31,8 +29,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
@@ -40,7 +36,6 @@ import org.jsr166.LongAdder8;
  * Eagerly removes expired entries from cache when
  * {@link CacheConfiguration#isEagerTtl()} flag is set.
  */
-@SuppressWarnings("NakedNotify")
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
     private  GridConcurrentSkipListSetEx pendingEntries;
@@ -87,7 +82,7 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
 
         cctx.shared().ttl().register(this);
 
-        pendingEntries = cctx.config().getNearConfiguration() != null ? new 
GridConcurrentSkipListSetEx() : null;
+        pendingEntries = (!cctx.isLocal() && 
cctx.config().getNearConfiguration() != null) ? new 
GridConcurrentSkipListSetEx() : null;
     }
 
     /** {@inheritDoc} */
@@ -156,7 +151,6 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
     public boolean expire(int amount) {
         long now = U.currentTimeMillis();
 
-
         try {
             if (pendingEntries != null) {
                 GridNearCacheAdapter nearCache = cctx.near();
@@ -186,7 +180,7 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
             boolean more = cctx.offheap().expire(expireC, amount);
 
             if (more)
-                return more;
+                return true;
 
             if (amount != -1 && pendingEntries != null) {
                 EntryWrapper e = pendingEntries.firstx();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f22c263..0dd836d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -70,7 +70,8 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Partition topology.
  */
-@GridToStringExclude class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
+@GridToStringExclude
+class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -123,7 +124,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private volatile AffinityTopologyVersion rebalancedTopVer = 
AffinityTopologyVersion.NONE;
 
     /** */
-    private volatile boolean treatAllPartitionAsLocal;
+    private volatile boolean treatAllPartAsLoc;
 
     /**
      * @param cctx Context.
@@ -490,7 +491,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         ClusterState newState = exchFut.newClusterState();
 
-        treatAllPartitionAsLocal = (newState != null && newState == 
ClusterState.ACTIVE)
+        treatAllPartAsLoc = (newState != null && newState == 
ClusterState.ACTIVE)
             || (cctx.kernalContext().state().active()
             && discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
@@ -504,64 +505,64 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         cctx.shared().database().checkpointReadLock();
 
-        try {
-            U.writeLock(lock);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            cctx.shared().database().checkpointReadUnlock();
+        synchronized (cctx.shared().exchange().interruptLock()) {
+            if (Thread.currentThread().isInterrupted())
+                throw new IgniteCheckedException("Thread is interrupted: " + 
Thread.currentThread());
 
-            throw e;
-        }
+            try {
+                U.writeLock(lock);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                cctx.shared().database().checkpointReadUnlock();
 
-        try {
-            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+                throw e;
+            }
 
-            if (stopping)
-                return;
+            try {
+                GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology 
version [topVer=" +
-                topVer + ", exchId=" + exchId + ']';
+                if (stopping)
+                    return;
 
-            if (exchId.isLeft())
-                removeNode(exchId.nodeId());
+                assert topVer.equals(exchId.topologyVersion()) : "Invalid 
topology version [topVer=" +
+                    topVer + ", exchId=" + exchId + ']';
 
-            ClusterNode oldest = currentCoordinator();
+                if (exchId.isLeft())
+                    removeNode(exchId.nodeId());
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + 
", fullMap=" + fullMapString() + ']');
+                ClusterNode oldest = currentCoordinator();
 
-            long updateSeq = this.updateSeq.incrementAndGet();
+                if (log.isDebugEnabled())
+                    log.debug("Partition map beforeExchange [exchId=" + exchId 
+ ", fullMap=" + fullMapString() + ']');
 
-            cntrMap.clear();
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-            // If this is the oldest node.
-            if (oldest != null && (loc.equals(oldest) || 
exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq);
+                cntrMap.clear();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on 
oldest node [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq, node2part, false);
+                // If this is the oldest node.
+                if (oldest != null && (loc.equals(oldest) || 
exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
+                    if (node2part == null) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest 
node [exchId=" + exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq, node2part, false);
+                        if (log.isDebugEnabled())
+                            log.debug("Created brand new full topology map on 
oldest node [exchId=" +
+                                exchId + ", fullMap=" + fullMapString() + ']');
+                    }
+                    else if (!node2part.valid()) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq, node2part, false);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node 
(previous oldest node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-            }
+                        if (log.isDebugEnabled())
+                            log.debug("Created new full topology map on oldest 
node [exchId=" + exchId + ", fullMap=" +
+                                node2part + ']');
+                    }
+                    else if (!node2part.nodeId().equals(loc.id())) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), 
oldest.order(), updateSeq, node2part, false);
 
-            synchronized (cctx.shared().exchange().interruptLock()) {
-                if (Thread.currentThread().isInterrupted())
-                    throw new IgniteCheckedException("Thread is interrupted: " 
+ Thread.currentThread());
+                        if (log.isDebugEnabled())
+                            log.debug("Copied old map into new map on oldest 
node (previous oldest node left) [exchId=" +
+                                exchId + ", fullMap=" + fullMapString() + ']');
+                    }
+                }
 
                 if (affReady)
                     initPartitions0(exchFut, updateSeq);
@@ -570,18 +571,18 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
                     createPartitions(aff, updateSeq);
                 }
-            }
 
-            consistencyCheck();
+                consistencyCheck();
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + 
exchId + ", fullMap=" +
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
+                if (log.isDebugEnabled())
+                    log.debug("Partition map after beforeExchange [exchId=" + 
exchId + ", fullMap=" +
+                        fullMapString() + ']');
+            }
+            finally {
+                lock.writeLock().unlock();
 
-            cctx.shared().database().checkpointReadUnlock();
+                cctx.shared().database().checkpointReadUnlock();
+            }
         }
 
         // Wait for evictions.
@@ -590,7 +591,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
-        treatAllPartitionAsLocal = false;
+        treatAllPartAsLoc = false;
 
         boolean changed = waitForRent();
 
@@ -767,17 +768,16 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
 
-                if (!treatAllPartitionAsLocal && !belongs)
+                if (!treatAllPartAsLoc && !belongs)
                     throw new GridDhtInvalidPartitionException(p, "Adding 
entry to evicted partition " +
                         "(often may be caused by inconsistent 'key.hashCode()' 
implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" 
+ this.topVer + ']');
             }
-            else if (loc != null && state == RENTING && 
cctx.allowFastEviction()) {
+            else if (loc != null && state == RENTING && 
cctx.allowFastEviction())
                 throw new GridDhtInvalidPartitionException(p, "Adding entry to 
partition that is concurrently evicted.");
-            }
 
             if (loc == null) {
-                if (!treatAllPartitionAsLocal && !belongs)
+                if (!treatAllPartAsLoc && !belongs)
                     throw new GridDhtInvalidPartitionException(p, "Creating 
partition which does not belong to " +
                         "local node (often may be caused by inconsistent 
'key.hashCode()' implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" 
+ this.topVer + ']');
@@ -989,7 +989,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             List<ClusterNode> nodes = new ArrayList<>(size);
 
             for (UUID id : nodeIds) {
-                if (topVer.topologyVersion() > 0 && !allIds.contains(id))
+                if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
                     continue;
 
                 if (hasState(p, id, state, states)) {
@@ -1905,7 +1905,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     continue;
 
                 if (cntr0 == null || cntr1 > cntr0.get1())
-                    res.put(part.id(), new T2<Long, Long>(cntr1, 
part.updateCounter()));
+                    res.put(part.id(), new T2<>(cntr1, part.updateCounter()));
             }
 
             return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index eb98fa6..53c2dbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1491,18 +1491,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         }
     }
 
-    private static class CounterWithNodes {
-        private final long cnt;
-
-        private final Set<UUID> nodes = new HashSet<>();
-
-        private CounterWithNodes(long cnt, UUID firstNode) {
-            this.cnt = cnt;
-
-            nodes.add(firstNode);
-        }
-    }
-
     /**
      * Detect lost partitions.
      */
@@ -2048,4 +2036,30 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             "srvNodes", srvNodes,
             "super", super.toString());
     }
+
+    /**
+     *
+     */
+    private static class CounterWithNodes {
+        /** */
+        private final long cnt;
+
+        /** */
+        private final Set<UUID> nodes = new HashSet<>();
+
+        /**
+         * @param cnt Count.
+         * @param firstNode Node ID.
+         */
+        private CounterWithNodes(long cnt, UUID firstNode) {
+            this.cnt = cnt;
+
+            nodes.add(firstNode);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CounterWithNodes.class, this);
+        }
+    }
 }

Reply via email to