ignite-4851 : Made partition state change atomic with size check.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c2ebb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c2ebb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c2ebb6

Branch: refs/heads/ignite-3477-master
Commit: 89c2ebb6d5a28d715b2352553409ab0001cb9563
Parents: 07ecddc
Author: Ilya Lantukh <[email protected]>
Authored: Mon Apr 3 18:57:01 2017 +0300
Committer: Ilya Lantukh <[email protected]>
Committed: Mon Apr 3 18:57:01 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  2 +-
 .../cache/GridCacheConcurrentMapImpl.java       | 28 +++-------
 .../cache/GridCacheLocalConcurrentMap.java      | 54 ++++++++++++++++++++
 .../distributed/dht/GridDhtLocalPartition.java  | 47 ++++++++++++++---
 .../distributed/near/GridNearCacheAdapter.java  |  3 +-
 5 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/89c2ebb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 93daeda..9a6ff11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -563,7 +563,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             if (!isLocal())
                 initSize /= ctx.affinity().partitions();
 
-            map = new GridCacheConcurrentMapImpl(ctx, entryFactory(), 
initSize);
+            map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), 
initSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c2ebb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index 10f5ca3..c1dbd0c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -37,7 +37,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_DESTROYED;
 /**
  * Implementation of concurrent cache map.
  */
-public class GridCacheConcurrentMapImpl implements GridCacheConcurrentMap {
+public abstract class GridCacheConcurrentMapImpl implements 
GridCacheConcurrentMap {
     /** Default load factor. */
     private static final float DFLT_LOAD_FACTOR = 0.75f;
 
@@ -53,9 +53,6 @@ public class GridCacheConcurrentMapImpl implements 
GridCacheConcurrentMap {
     /** Cache context. */
     private final GridCacheContext ctx;
 
-    /** Public size counter. */
-    private final AtomicInteger pubSize = new AtomicInteger();
-
     /**
      * Creates a new, empty map with the specified initial
      * capacity.
@@ -211,8 +208,12 @@ public class GridCacheConcurrentMapImpl implements 
GridCacheConcurrentMap {
                     topVer);
         }
 
-        if (sizeChange != 0)
-            pubSize.addAndGet(sizeChange);
+        assert Math.abs(sizeChange) <= 1;
+
+        if (sizeChange == -1)
+            decrementPublicSize(cur);
+        else if (sizeChange == 1)
+            incrementPublicSize(cur);
 
         return cur;
     }
@@ -242,21 +243,6 @@ public class GridCacheConcurrentMapImpl implements 
GridCacheConcurrentMap {
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
-        return pubSize.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        pubSize.incrementAndGet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        pubSize.decrementAndGet();
-    }
-
-    /** {@inheritDoc} */
     @Override public Set<KeyCacheObject> keySet(final CacheEntryPredicate... 
filter) {
         final IgnitePredicate<KeyCacheObject> p = new 
IgnitePredicate<KeyCacheObject>() {
             @Override public boolean apply(KeyCacheObject key) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c2ebb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
new file mode 100644
index 0000000..db99272
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * GridCacheConcurrentMap implementation for local and near caches.
+ */
+public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl {
+    /** */
+    private final AtomicInteger pubSize = new AtomicInteger();
+
+    public GridCacheLocalConcurrentMap(GridCacheContext ctx,
+        GridCacheMapEntryFactory factory, int initialCapacity) {
+        super(ctx, factory, initialCapacity);
+    }
+
+    public GridCacheLocalConcurrentMap(GridCacheContext ctx,
+        GridCacheMapEntryFactory factory, int initialCapacity, float 
loadFactor, int concurrencyLevel) {
+        super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int publicSize() {
+        return pubSize.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+        pubSize.incrementAndGet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+        pubSize.decrementAndGet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c2ebb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 1c7db68..d3ec2af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -512,7 +512,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             if (partState == OWNING)
                 return true;
 
-            assert partState== MOVING || partState == LOST;
+            assert partState == MOVING || partState == LOST;
 
             if (casState(state, OWNING)) {
                 if (log.isDebugEnabled())
@@ -608,7 +608,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
         GridDhtPartitionState partState = getPartState(state);
 
-        if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) &&
+        if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && 
getSize(state) == 0 &&
             partState == RENTING && getReservations(state) == 0 && 
!groupReserved() &&
             casState(state, EVICTED)) {
             if (log.isDebugEnabled())
@@ -652,7 +652,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      *
      */
     private void clearEvicting() {
-       boolean free;
+        boolean free;
 
         while (true) {
             int cnt = evictGuard.get();
@@ -726,7 +726,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
                 // Attempt to evict partition entries from cache.
                 clearAll();
 
-                if (isEmpty() && casState(state, EVICTED)) {
+                if (isEmpty() && getSize(state) == 0 && casState(state, 
EVICTED)) {
                     if (log.isDebugEnabled())
                         log.debug("Evicted partition: " + this);
                     // finishDestroy() will be initiated by clearEvicting().
@@ -808,6 +808,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         store.updateCounter(val);
     }
 
+    /**
+     * @param val Initial update index value.
+     */
     public void initialUpdateCounter(long val) {
         store.updateInitialCounter(val);
     }
@@ -970,8 +973,38 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             "createTime", U.format(createTime));
     }
 
+    /** {@inheritDoc} */
+    @Override public int publicSize() {
+        return getSize(state.get());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrementPublicSize(GridCacheEntryEx e) {
+        while (true) {
+            long state = this.state.get();
+
+            assert getPartState(state) != EVICTED;
+
+            if (this.state.compareAndSet(state, setSize(state, getSize(state) 
+ 1)))
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decrementPublicSize(GridCacheEntryEx e) {
+        while (true) {
+            long state = this.state.get();
+
+            assert getPartState(state) != EVICTED;
+            assert getSize(state) > 0;
+
+            if (this.state.compareAndSet(state, setSize(state, getSize(state) 
- 1)))
+                return;
+        }
+    }
+
     private static GridDhtPartitionState getPartState(long state) {
-        return GridDhtPartitionState.fromOrdinal((int) (state & 
(0x0000000000000007L)));
+        return GridDhtPartitionState.fromOrdinal((int)(state & 
(0x0000000000000007L)));
     }
 
     private static long setPartState(long state, GridDhtPartitionState 
partState) {
@@ -979,7 +1012,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     private static int getReservations(long state) {
-        return (int) ((state & 0x00000000FFFF0000L) >> 16);
+        return (int)((state & 0x00000000FFFF0000L) >> 16);
     }
 
     private static long setReservations(long state, int reservations) {
@@ -991,7 +1024,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     private static long setSize(long state, int size) {
-        return (state & (~0xFFFFFFFF00000000L)) | ((long) size << 32);
+        return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c2ebb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 7c1c38b..0d62985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheLocalConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCachePreloader;
@@ -118,7 +119,7 @@ public abstract class GridNearCacheAdapter<K, V> extends 
GridDistributedCacheAda
 
     /** {@inheritDoc} */
     @Override public void onReconnected() {
-        map = new GridCacheConcurrentMapImpl(
+        map = new GridCacheLocalConcurrentMap(
             ctx,
             entryFactory(),
             ctx.config().getNearConfiguration().getNearStartSize());

Reply via email to