Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 542c2c906 -> 8e5ec813e


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e5ec813
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e5ec813
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e5ec813

Branch: refs/heads/ignite-5075
Commit: 8e5ec813e7250c67ac94c1b0d9c6b8bfb740bdbb
Parents: 542c2c9
Author: sboikov <[email protected]>
Authored: Fri May 12 14:39:10 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri May 12 15:08:29 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheGroupInfrastructure.java         | 78 ++++++++++++++++----
 .../processors/cache/GridCacheProcessor.java    | 25 ++-----
 .../processors/cache/GridCacheTtlManager.java   | 17 ++++-
 .../cache/IgniteCacheOffheapManager.java        |  8 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 47 ++++++------
 .../distributed/dht/GridDhtLocalPartition.java  | 13 +---
 .../dht/preloader/GridDhtPartitionDemander.java |  2 +-
 7 files changed, 120 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index ec33685..e638eb0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -110,8 +112,7 @@ public class CacheGroupInfrastructure {
     private boolean needsRecovery;
 
     /** */
-    private GridCacheContext singleCacheCtx;
-
+    private final List<GridCacheContext> caches;
 
     /**
      * @param grpId Group ID.
@@ -152,6 +153,8 @@ public class CacheGroupInfrastructure {
             (sharedGroup() || memPlc.config().getPageEvictionMode() != 
DataPageEvictionMode.DISABLED);
 
         log = ctx.kernalContext().log(getClass());
+
+        caches = new ArrayList<>();
     }
 
     /**
@@ -187,21 +190,44 @@ public class CacheGroupInfrastructure {
     }
 
     /**
-     * @param singleCacheCtx Cache context if group contains single cache.
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onCacheStarted(GridCacheContext cctx) throws 
IgniteCheckedException {
+        addCacheContext(cctx);
+
+        offheapMgr.onCacheStarted(cctx);
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    private void addCacheContext(GridCacheContext cctx) {
+        assert sharedGroup() || caches.isEmpty();
+
+        boolean add = caches.add(cctx);
+
+        assert add : cctx.name();
+    }
+
+    /**
+     * @param cctx Cache context.
      */
-    public void cacheContext(GridCacheContext singleCacheCtx) {
-        assert !sharedGroup();
+    private void removeCacheContext(GridCacheContext cctx) {
+        assert sharedGroup() || caches.size() == 1 : caches.size();
 
-        this.singleCacheCtx = singleCacheCtx;
+        boolean rmv = caches.remove(cctx);
+
+        assert rmv : cctx.name();
     }
 
     /**
      * @return Cache context if group contains single cache.
      */
-    public GridCacheContext cacheContext() {
-        assert !sharedGroup();
+    public GridCacheContext singleCacheContext() {
+        assert !sharedGroup() && caches.size() == 1;
 
-        return singleCacheCtx;
+        return caches.get(0);
     }
 
     // TODO IGNITE-5075: need separate caches with/without queries?
@@ -360,17 +386,20 @@ public class CacheGroupInfrastructure {
      *
      */
     public void onKernalStop() {
-        preldr.onKernalStop();
+        if (preldr != null) // null for LOCAL cache.
+            preldr.onKernalStop();
 
         offheapMgr.onKernalStop();
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param cctx Cache context.
      * @param destroy Destroy flag.
      */
-    void stopCache(int cacheId, boolean destroy) {
-        offheapMgr.stopCache(cacheId, destroy);
+    void stopCache(GridCacheContext cctx, boolean destroy) {
+        offheapMgr.stopCache(cctx.cacheId(), destroy);
+
+        removeCacheContext(cctx);
     }
 
     /**
@@ -383,6 +412,29 @@ public class CacheGroupInfrastructure {
     }
 
     /**
+     * @return {@code True} if group contains caches.
+     */
+    boolean hasCaches() {
+        return !caches.isEmpty();
+    }
+
+    /**
+     * @param part Partition ID.
+     */
+    public void onPartitionEvicted(int part) {
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.isDrEnabled())
+                cctx.dr().partitionEvicted(part);
+
+            cctx.continuousQueries().onPartitionEvicted(part);
+
+            cctx.dataStructures().onPartitionEvicted(part);
+        }
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     public void start() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 776f2c8..3e9c6ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1253,7 +1253,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 mgr.stop(cancel, destroy);
         }
 
-        ctx.group().stopCache(ctx.cacheId(), destroy);
+        ctx.group().stopCache(ctx, destroy);
 
         ctx.kernalContext().continuous().onCacheStop(ctx);
 
@@ -1888,9 +1888,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             affNode,
             true);
 
-        if (!grp.sharedGroup())
-            grp.cacheContext(cacheCtx);
-
         cacheCtx.dynamicDeploymentId(desc.deploymentId());
 
         GridCacheAdapter cache = cacheCtx.cache();
@@ -1901,6 +1898,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         startCache(cache, schema != null ? schema : new QuerySchema());
 
+        grp.onCacheStarted(cacheCtx);
+
         onKernalStart(cache);
     }
 
@@ -2050,22 +2049,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                         CacheGroupInfrastructure grp = 
prepareCacheStop(req.request());
 
-                        if (grp != null) {
-                            boolean stopGrp = true;
-
-                            if (!grp.sharedGroup()) {
-                                for (GridCacheContext cctx : 
sharedCtx.cacheContexts()) {
-                                    if (cctx.group() == grp) {
-                                        stopGrp = false;
-
-                                        break;
-                                    }
-                                }
-                            }
-
-                            if (stopGrp)
-                                stopCacheGroup(grp.groupId());
-                        }
+                        if (grp != null && !grp.hasCaches())
+                            stopCacheGroup(grp.groupId());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 614b3e3..ad6aa5a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -38,7 +38,10 @@ import org.jsr166.LongAdder8;
  */
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
-    private  GridConcurrentSkipListSetEx pendingEntries;
+    private GridConcurrentSkipListSetEx pendingEntries;
+
+    /** */
+    private boolean eagerTtlEnabled;
 
     /** */
     private final IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> 
expireC =
@@ -79,11 +82,20 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
         if (cleanupDisabled)
             return;
 
+        eagerTtlEnabled = true;
+
         cctx.shared().ttl().register(this);
 
         pendingEntries = (!cctx.isLocal() && 
cctx.config().getNearConfiguration() != null) ? new 
GridConcurrentSkipListSetEx() : null;
     }
 
+    /**
+     * @return {@code True} if eager ttl is enabled for cache.
+     */
+    boolean eagerTtlEnabled() {
+        return eagerTtlEnabled;
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         if (pendingEntries != null)
@@ -153,7 +165,6 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
 
         try {
             if (pendingEntries != null) {
-                //todo may be not only for near? may be for local too.
                 GridNearCacheAdapter nearCache = cctx.near();
 
                 GridCacheVersion obsoleteVer = null;
@@ -178,7 +189,7 @@ public class GridCacheTtlManager extends 
GridCacheManagerAdapter {
                 }
             }
 
-            boolean more = cctx.offheap().expire(expireC, amount);
+            boolean more = cctx.offheap().expire(cctx, expireC, amount);
 
             if (more)
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9e1729b..95556f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -47,6 +47,12 @@ public interface IgniteCacheOffheapManager {
     public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure 
grp) throws IgniteCheckedException;;
 
     /**
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onCacheStarted(GridCacheContext cctx) throws 
IgniteCheckedException;
+
+    /**
      *
      */
     public void onKernalStop();
@@ -134,7 +140,7 @@ public interface IgniteCacheOffheapManager {
      * @param c Closure.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean expire(IgniteInClosure2X<GridCacheEntryEx, 
GridCacheVersion> c, int amount) throws IgniteCheckedException;
+    public boolean expire(GridCacheContext cctx, 
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws 
IgniteCheckedException;
 
     /**
      * Gets the number of entries pending expire.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index eb2b1b5..e037572 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -29,10 +29,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -52,10 +50,8 @@ import 
org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions;
 import 
org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -90,7 +86,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     protected CacheGroupInfrastructure grp;
 
     /** */
-    private IgniteLogger log;
+    protected IgniteLogger log;
 
     /** */
     // TODO GG-11208 need restore size after restart.
@@ -148,11 +144,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
     }
 
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void initDataStructures() throws IgniteCheckedException {
-        if (ctx.ttl().eagerTtlEnabled()) {
+    /** {@inheritDoc} */
+    public void onCacheStarted(GridCacheContext cctx) throws 
IgniteCheckedException{
+        if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && 
pendingEntries == null) {
             String name = "PendingEntries";
 
             long rootPage = allocateForTree();
@@ -167,6 +161,13 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
     }
 
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void initDataStructures() throws IgniteCheckedException {
+        // No-op.
+    }
+
     /** {@inheritDoc} */
     @Override public void stopCache(int cacheId, final boolean destroy) {
         if (destroy && grp.affinityNode())
@@ -819,9 +820,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
     /** {@inheritDoc} */
     @Override public boolean expire(
+        GridCacheContext cctx,
         IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
         int amount
     ) throws IgniteCheckedException {
+        // TODO IGNITE-5075 filter by cache ID if needed.
         if (hasPendingEntries && pendingEntries != null) {
             GridCacheVersion obsoleteVer = null;
 
@@ -836,18 +839,18 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 if (amount != -1 && cleared > amount)
                     return true;
-// TODO: IGNITE-5075.
-//                if (row.key.partition() == -1)
-//                    row.key.partition(cctx.affinity().partition(row.key));
-//
-//                assert row.key != null && row.link != 0 && row.expireTime != 
0 : row;
-//
-//                if (pendingEntries.remove(row) != null) {
-//                    if (obsoleteVer == null)
-//                        obsoleteVer = ctx.versions().next();
-//
-//                    c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
-//                }
+
+                if (row.key.partition() == -1)
+                    row.key.partition(cctx.affinity().partition(row.key));
+
+                assert row.key != null && row.link != 0 && row.expireTime != 0 
: row;
+
+                if (pendingEntries.remove(row) != null) {
+                    if (obsoleteVer == null)
+                        obsoleteVer = ctx.versions().next();
+
+                    c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
+                }
 
                 cleared++;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index dc1ad31..3119b52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -163,8 +163,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
             }
         };
 
-        // TODO IGNITE-5075.
-        int delQueueSize = CU.isSystemCache(grp.name()) ? 100 :
+        int delQueueSize = CU.isSystemCache(grp.config().getName()) ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
 
         rmvQueueMaxSize = U.ceilPow2(delQueueSize);
@@ -726,13 +725,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         assert state() == EVICTED : this;
         assert evictGuard.get() == -1;
 
-// TODO IGNITE-5075.
-//        if (cctx.isDrEnabled())
-//            cctx.dr().partitionEvicted(id);
-//
-//        cctx.continuousQueries().onPartitionEvicted(id);
-//
-//        cctx.dataStructures().onPartitionEvicted(id);
+        grp.onPartitionEvicted(id);
 
         destroyCacheDataStore();
 
@@ -921,7 +914,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         }
 
         if (!grp.allowFastEviction()) {
-            GridCacheContext cctx = grp.sharedGroup() ? null : 
grp.cacheContext();
+            GridCacheContext cctx = grp.sharedGroup() ? null : 
grp.singleCacheContext();
 
             try {
                 GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 8a3b1de..1a9eb68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -751,7 +751,7 @@ public class GridDhtPartitionDemander {
             GridCacheEntryEx cached = null;
 
             try {
-                GridCacheContext cctx = grp.sharedGroup() ? 
ctx.cacheContext(entry.cacheId()) : grp.cacheContext();
+                GridCacheContext cctx = grp.sharedGroup() ? 
ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext();
 
                 cached = cctx.dht().entryEx(entry.key());
 

Reply via email to