http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 10caf07..2ac1ba6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; -import java.util.AbstractSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; @@ -34,13 +32,15 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; -import org.apache.ignite.internal.processors.cache.CachePeekModes; import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; @@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdat import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -70,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridIteratorAdapter; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CI3; @@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; @@ -86,8 +88,11 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; +import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** * DHT cache adapter. @@ -96,18 +101,36 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** */ private static final long serialVersionUID = 0L; - /** Topology. */ - private GridDhtPartitionTopologyImpl top; - - /** Preloader. */ - protected GridCachePreloader preldr; - /** Multi tx future holder. */ private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>(); /** Multi tx futures. */ private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>(); + /** Force key futures. */ + private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); + + /** */ + private volatile boolean stopping; + + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + DiscoveryEvent e = (DiscoveryEvent)evt; + + ClusterNode loc = ctx.localNode(); + + assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : e; + + final ClusterNode n = e.eventNode(); + + assert !loc.id().equals(n.id()); + + for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) + f.onDiscoveryEvent(e); + } + }; + /** * Empty constructor required for {@link Externalizable}. */ @@ -116,6 +139,176 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * Adds future to future map. + * + * @param fut Future to add. + * @return {@code False} if node cache is stopping and future was completed with error. + */ + public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { + forceKeyFuts.put(fut.futureId(), fut); + + if (stopping) { + fut.onDone(stopError()); + + return false; + } + + return true; + } + + /** + * Removes future from future map. + * + * @param fut Future to remove. + */ + public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) { + forceKeyFuts.remove(fut.futureId(), fut); + } + + /** + * @param node Node. + * @param msg Message. + */ + protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { + GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); + + if (f != null) + f.onResult(msg); + else if (log.isDebugEnabled()) + log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + + ", res=" + msg + ']'); + } + /** + * @param node Node originated request. + * @param msg Force keys message. + */ + protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { + IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); + + if (fut.isDone()) + processForceKeysRequest0(node, msg); + else + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + processForceKeysRequest0(node, msg); + } + }); + } + + /** + * @param node Node originated request. + * @param msg Force keys message. + */ + private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { + try { + ClusterNode loc = ctx.localNode(); + + GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( + ctx.cacheId(), + msg.futureId(), + msg.miniId(), + ctx.deploymentEnabled()); + + GridDhtPartitionTopology top = ctx.topology(); + + for (KeyCacheObject k : msg.keys()) { + int p = ctx.affinity().partition(k); + + GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); + + // If this node is no longer an owner. + if (locPart == null && !top.owners(p).contains(loc)) { + res.addMissed(k); + + continue; + } + + GridCacheEntryEx entry; + + while (true) { + try { + entry = ctx.dht().entryEx(k); + + entry.unswap(); + + GridCacheEntryInfo info = entry.info(); + + if (info == null) { + assert entry.obsolete() : entry; + + continue; + } + + if (!info.isNew()) + res.addInfo(info); + + ctx.evicts().touch(entry, msg.topologyVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry: " + k); + } + catch (GridDhtInvalidPartitionException ignore) { + if (log.isDebugEnabled()) + log.debug("Local node is no longer an owner: " + p); + + res.addMissed(k); + + break; + } + } + } + + if (log.isDebugEnabled()) + log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); + + ctx.io().send(node, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + + ", req=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e); + } + } + + /** + * + */ + public void dumpDebugInfo() { + if (!forceKeyFuts.isEmpty()) { + U.warn(log, "Pending force key futures [cache=" + ctx.name() + "]:"); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + U.warn(log, ">>> " + fut); + } + } + + @Override public void onKernalStop() { + super.onKernalStop(); + + stopping = true; + + IgniteCheckedException err = stopError(); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + fut.onDone(err); + + ctx.gridEvents().removeLocalEventListener(discoLsnr); + } + + /** + * @return Node stop exception. + */ + private IgniteCheckedException stopError() { + return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); + } + + /** * @param nodeId Sender node ID. * @param res Near get response. */ @@ -160,7 +353,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param ctx Context. */ protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { - this(ctx, new GridCachePartitionedConcurrentMap(ctx)); + this(ctx, new GridCachePartitionedConcurrentMap(ctx.group())); } /** @@ -174,83 +367,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override protected void init() { - super.init(); - - top = new GridDhtPartitionTopologyImpl(ctx, entryFactory()); - } - - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - super.start(); - - ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) { processTtlUpdateRequest(req); } }); - } - - /** {@inheritDoc} */ - @Override public void stop() { - super.stop(); - - if (preldr != null) - preldr.stop(); - - // Clean up to help GC. - preldr = null; - top = null; - } - /** {@inheritDoc} */ - @Override public void onReconnected() { - super.onReconnected(); - - ctx.affinity().onReconnected(); - - top.onReconnected(); - - if (preldr != null) - preldr.onReconnected(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - if (preldr != null) - preldr.onKernalStart(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop() { - super.onKernalStop(); - - if (preldr != null) - preldr.onKernalStop(); + ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @Override public void printMemoryStats() { super.printMemoryStats(); - top.printMemoryStats(1024); - } - - /** - * @return Cache map entry factory. - */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtCacheEntry(ctx, topVer, key); - } - }; + ctx.group().topology().printMemoryStats(1024); } /** @@ -262,21 +393,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return Partition topology. */ public GridDhtPartitionTopology topology() { - return top; + return ctx.group().topology(); } /** {@inheritDoc} */ @Override public GridCachePreloader preloader() { - return preldr; - } - - /** - * @return DHT preloader. - */ - public GridDhtPreloader dhtPreloader() { - assert preldr instanceof GridDhtPreloader; - - return (GridDhtPreloader)preldr; + return ctx.group().preloader(); } /** @@ -300,6 +422,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (tup != null) throw new IgniteCheckedException("Nested multi-update locks are not supported"); + GridDhtPartitionTopology top = ctx.group().topology(); + top.readLock(); GridDhtTopologyFuture topFut; @@ -342,7 +466,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (tup == null) throw new IgniteCheckedException("Multi-update was not started or released twice."); - top.readLock(); + ctx.group().topology().readLock(); try { IgniteUuid lockId = tup.get1(); @@ -355,7 +479,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap multiFut.onDone(lockId); } finally { - top.readUnlock(); + ctx.group().topology().readUnlock(); } } @@ -516,7 +640,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return; try { - GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), + GridDhtLocalPartition part = ctx.group().topology().localPartition(ctx.affinity().partition(key), AffinityTopologyVersion.NONE, true); // Reserve to make sure that partition does not get unloaded. @@ -576,7 +700,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap long sum = 0; for (GridDhtLocalPartition p : topology().currentLocalPartitions()) - sum += p.dataStore().size(); + sum += p.dataStore().cacheSize(ctx.cacheId()); return sum; } @@ -594,7 +718,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap for (GridDhtLocalPartition p : topology().currentLocalPartitions()) { if (p.primary(topVer)) - sum += p.dataStore().size(); + sum += p.dataStore().cacheSize(ctx.cacheId()); } return sum; @@ -809,7 +933,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res = new GridNearSingleGetResponse(ctx.cacheId(), req.futureId(), - req.topologyVersion(), + null, res0, false, req.addDeploymentInfo()); @@ -818,9 +942,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res.setContainsValue(); } else { - AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); + AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().topologyVersion(); - assert topVer.compareTo(req.topologyVersion()) >= 0 : "Wrong ready topology version for " + + assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " + "invalid partitions response [topVer=" + topVer + ", req=" + req + ']'; res = new GridNearSingleGetResponse(ctx.cacheId(), @@ -908,9 +1032,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } if (!F.isEmpty(fut.invalidPartitions())) - res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().readyAffinityVersion()); - else - res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion()); + res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().topologyVersion()); try { ctx.io().send(nodeId, res, ctx.ioPolicy()); @@ -1096,7 +1218,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap false); if (part != null) - part.onDeferredDelete(entry.key(), ver); + part.onDeferredDelete(entry.context().cacheId(), entry.key(), ver); } /** @@ -1108,8 +1230,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (expVer.equals(curVer)) return false; - Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer); - Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer); + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); + Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) return true; @@ -1147,7 +1269,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Specified affinity topology version. * @return Local entries iterator. */ - public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, + private Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, final boolean backup, final boolean keepBinary, final AffinityTopologyVersion topVer) { @@ -1161,7 +1283,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param topVer Specified affinity topology version. * @return Local entries iterator. */ - public Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary, + private Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary, final boolean backup, final AffinityTopologyVersion topVer) { assert primary || backup; @@ -1208,7 +1330,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridDhtLocalPartition part = partIt.next(); if (primary == part.primary(topVer)) { - curIt = part.entries().iterator(); + curIt = part.entries(ctx.cacheId()).iterator(); break; } @@ -1253,4 +1375,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return topVer; } } + + /** + * + */ + protected abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void apply(UUID nodeId, M msg) { + ClusterNode node = ctx.node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']'); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']'); + + onMessage(node, msg); + } + + /** + * @param node Node. + * @param msg Message. + */ + protected abstract void onMessage(ClusterNode node, M msg); + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 8c0b0c2..ebb2cfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -93,8 +93,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected long nextPartCounter() { - return locPart.nextUpdateCounter(); + @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer, + boolean primary, + @Nullable Long primaryCntr) { + return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, primaryCntr); } /** {@inheritDoc} */ @@ -139,7 +141,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { assert !Thread.holdsLock(this); // Remove this entry from partition mapping. - cctx.dht().topology().onRemoved(this); + cctx.topology().onRemoved(this); } /** @@ -715,8 +717,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** * @return Cache name. */ - protected String cacheName() { - return cctx.dht().near().name(); + protected final String cacheName() { + return cctx.name(); } /** {@inheritDoc} */ @@ -726,12 +728,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** {@inheritDoc} */ @Override protected void incrementMapPublicSize() { - locPart.incrementPublicSize(this); + locPart.incrementPublicSize(null, this); } /** {@inheritDoc} */ @Override protected void decrementMapPublicSize() { - locPart.decrementPublicSize(this); + locPart.decrementPublicSize(null, this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 458bc4a..49922fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -166,7 +166,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * Initializes future. */ void init() { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); + GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer); if (fut != null) { if (!F.isEmpty(fut.invalidPartitions())) { @@ -292,9 +292,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col */ private boolean map(KeyCacheObject key) { try { + int keyPart = cctx.affinity().partition(key); + GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? - cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : - cache().topology().localPartition(key, false); + cache().topology().localPartition(keyPart, topVer, true) : + cache().topology().localPartition(keyPart); if (part == null) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 9a7cfdc..1a81f6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -87,8 +87,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa /** Topology version .*/ private AffinityTopologyVersion topVer; - /** Retries because ownership changed. */ - private Collection<Integer> retries; + /** Retry because ownership changed. */ + private Integer retry; /** Subject ID. */ private UUID subjId; @@ -194,17 +194,21 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * */ private void map() { - if (cctx.dht().dhtPreloader().needForceKeys()) { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request( + if (cctx.group().preloader().needForceKeys()) { + GridDhtFuture<Object> fut = cctx.group().preloader().request( + cctx, Collections.singleton(key), topVer); if (fut != null) { if (!F.isEmpty(fut.invalidPartitions())) { - if (retries == null) - retries = new HashSet<>(); + assert fut.invalidPartitions().size() == 1 : fut.invalidPartitions(); - retries.addAll(fut.invalidPartitions()); + retry = F.first(fut.invalidPartitions()); + + onDone((GridCacheEntryInfo)null); + + return; } fut.listen( @@ -239,17 +243,14 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa * */ private void map0() { - // Assign keys to primary nodes. - int part = cctx.affinity().partition(key); + assert retry == null : retry; - if (retries == null || !retries.contains(part)) { - if (!map(key)) { - retries = Collections.singleton(part); + if (!map(key)) { + retry = cctx.affinity().partition(key); - onDone((GridCacheEntryInfo)null); + onDone((GridCacheEntryInfo)null); - return; - } + return; } getAsync(); @@ -257,7 +258,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa /** {@inheritDoc} */ @Override public Collection<Integer> invalidPartitions() { - return retries == null ? Collections.<Integer>emptyList() : retries; + return retry == null ? Collections.<Integer>emptyList() : Collections.singletonList(retry); } /** @@ -266,9 +267,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa */ private boolean map(KeyCacheObject key) { try { + int keyPart = cctx.affinity().partition(key); + GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? - cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) : - cache().topology().localPartition(key, false); + cache().topology().localPartition(keyPart, topVer, true) : + cache().topology().localPartition(keyPart); if (part == null) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index a35c168..a53e864 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -30,31 +33,33 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@ -71,6 +76,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Key partition. */ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable { + /** */ + private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key + ) { + return new GridDhtCacheEntry(ctx, topVer, key); + } + }; + /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -101,29 +117,48 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @GridToStringExclude private final GridFutureAdapter<?> rent; - /** Context. */ - private final GridCacheContext cctx; + /** */ + @GridToStringExclude + private final GridCacheSharedContext ctx; + + /** */ + @GridToStringExclude + private final CacheGroupContext grp; /** Create time. */ @GridToStringExclude private final long createTime = U.currentTimeMillis(); /** Eviction history. */ + @GridToStringExclude private final Map<KeyCacheObject, GridCacheVersion> evictHist = new HashMap<>(); /** Lock. */ + @GridToStringExclude private final ReentrantLock lock = new ReentrantLock(); + /** */ + @GridToStringExclude + private final ConcurrentMap<Integer, CacheMapHolder> cacheMaps; + + /** */ + @GridToStringExclude + private final CacheMapHolder singleCacheEntryMap; + /** Remove queue. */ + @GridToStringExclude private final ConcurrentLinkedDeque8<RemovedEntryHolder> rmvQueue = new ConcurrentLinkedDeque8<>(); /** Group reservations. */ + @GridToStringExclude private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); /** */ + @GridToStringExclude private final CacheDataStore store; /** Partition updates. */ + @GridToStringExclude private final ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>(); /** Last applied update. */ @@ -137,21 +172,30 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements private boolean reload; /** - * @param cctx Context. + * @param ctx Context. + * @param grp Cache group. * @param id Partition ID. - * @param entryFactory Entry factory. */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition( - GridCacheContext cctx, - int id, - GridCacheMapEntryFactory entryFactory - ) { - super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions())); + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + GridDhtLocalPartition(GridCacheSharedContext ctx, + CacheGroupContext grp, + int id) { + super(ENTRY_FACTORY); this.id = id; - this.cctx = cctx; + this.ctx = ctx; + this.grp = grp; - log = U.logger(cctx.kernalContext(), logRef, this); + log = U.logger(ctx.kernalContext(), logRef, this); + + if (grp.sharedGroup()) { + singleCacheEntryMap = null; + cacheMaps = new ConcurrentHashMap<>(); + } + else { + singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap()); + cacheMaps = null; + } rent = new GridFutureAdapter<Object>() { @Override public String toString() { @@ -159,15 +203,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } }; - int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : - Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); + int delQueueSize = grp.systemCache() ? 100 : + Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); rmvQueueMaxSize = U.ceilPow2(delQueueSize); rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000); try { - store = cctx.offheap().createCacheDataStore(id); + store = grp.offheap().createCacheDataStore(id); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -176,6 +220,62 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @return Entries map. + */ + private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() { + return new ConcurrentHashMap8<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()), + 0.75f, + Runtime.getRuntime().availableProcessors() * 2); + } + + /** {@inheritDoc} */ + @Override public int internalSize() { + if (grp.sharedGroup()) { + int size = 0; + + for (CacheMapHolder hld : cacheMaps.values()) + size += hld.map.size(); + + return size; + } + + return singleCacheEntryMap.map.size(); + } + + /** {@inheritDoc} */ + @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + if (grp.sharedGroup()) + return cacheMapHolder(cctx); + + return singleCacheEntryMap; + } + + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + } + + /** + * @param cctx Cache context. + * @return Map holder. + */ + private CacheMapHolder cacheMapHolder(GridCacheContext cctx) { + assert grp.sharedGroup(); + + CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed()); + + if (hld != null) + return hld; + + CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap())); + + if (old != null) + hld = old; + + return hld; + } + + /** * @return Data store. */ public CacheDataStore dataStore() { @@ -242,10 +342,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if partition is empty. */ public boolean isEmpty() { - if (cctx.allowFastEviction()) + if (grp.allowFastEviction()) return internalSize() == 0; - return store.size() == 0 && internalSize() == 0; + return store.fullSize() == 0 && internalSize() == 0; } /** @@ -309,6 +409,20 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Version. + */ + private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { + CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; + + if (entry != null && entry.markObsoleteVersion(ver)) + removeEntry(entry); + } + + /** * */ public void cleanupRemoveQueue() { @@ -316,10 +430,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements RemovedEntryHolder item = rmvQueue.pollFirst(); if (item != null) - cctx.dht().removeVersionedEntry(item.key(), item.version()); + removeVersionedEntry(item.cacheId(), item.key(), item.version()); } - if (!cctx.isDrEnabled()) { + if (!grp.isDrEnabled()) { RemovedEntryHolder item = rmvQueue.peekFirst(); while (item != null && item.expireTime() < U.currentTimeMillis()) { @@ -328,7 +442,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (item == null) break; - cctx.dht().removeVersionedEntry(item.key(), item.version()); + removeVersionedEntry(item.cacheId(), item.key(), item.version()); item = rmvQueue.peekFirst(); } @@ -336,13 +450,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId cacheId Cache ID. * @param key Removed key. * @param ver Removed version. */ - public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) { + public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) { cleanupRemoveQueue(); - rmvQueue.add(new RemovedEntryHolder(key, ver, rmvdEntryTtl)); + rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl)); } /** @@ -434,7 +549,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override protected void release(int sizeChange, GridCacheEntryEx e) { + @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup() && sizeChange != 0) + hld.size.addAndGet(sizeChange); + release0(sizeChange); } @@ -482,16 +600,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code true} if cas succeeds. */ private boolean casState(long state, GridDhtPartitionState toState) { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { synchronized (this) { boolean update = this.state.compareAndSet(state, setPartState(state, toState)); if (update) try { - cctx.shared().wal().log(new PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter())); + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter())); } catch (IgniteCheckedException e) { - log.error("Error while writing to log", e); + U.error(log, "Error while writing to log", e); } return update; @@ -624,7 +742,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements GridDhtPartitionState partState = getPartState(state); - if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 && + if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 && partState == RENTING && getReservations(state) == 0 && !groupReserved() && casState(state, EVICTED)) { if (log.isDebugEnabled()) @@ -634,7 +752,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements finishDestroy(updateSeq); } else if (partState == RENTING || shouldBeRenting()) - cctx.preloader().evictPartitionAsync(this); + grp.preloader().evictPartitionAsync(this); } /** @@ -710,18 +828,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert state() == EVICTED : this; assert evictGuard.get() == -1; - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(id); - - cctx.continuousQueries().onPartitionEvicted(id); - - cctx.dataStructures().onPartitionEvicted(id); + grp.onPartitionEvicted(id); destroyCacheDataStore(); rent.onDone(); - ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); + ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); } @@ -759,7 +872,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements */ private void destroyCacheDataStore() { try { - cctx.offheap().destroyCacheDataStore(dataStore()); + grp.offheap().destroyCacheDataStore(dataStore()); } catch (IgniteCheckedException e) { log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e); @@ -778,7 +891,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if local node is primary for this partition. */ public boolean primary(AffinityTopologyVersion topVer) { - return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer); + List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0)); } /** @@ -786,14 +901,23 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if local node is backup for this partition. */ public boolean backup(AffinityTopologyVersion topVer) { - return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer); + List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return nodes.indexOf(ctx.localNode()) > 0; } /** + * @param cacheId ID of cache initiated counter update. + * @param topVer Topology version for current operation. * @return Next update index. */ - public long nextUpdateCounter() { - return store.nextUpdateCounter(); + long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { + long nextCntr = store.nextUpdateCounter(); + + if (grp.sharedGroup()) + grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary); + + return nextCntr; } /** @@ -830,40 +954,128 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @throws NodeStoppingException If node stopping. */ public void clearAll() throws NodeStoppingException { - GridCacheVersion clearVer = cctx.versions().next(); + GridCacheVersion clearVer = ctx.versions().next(); + + GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); + + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); + if (grp.sharedGroup()) { + for (CacheMapHolder hld : cacheMaps.values()) + clear(hld.map, extras, rec); + } + else + clear(singleCacheEntryMap.map, extras, rec); - Iterator<GridCacheMapEntry> it = allEntries().iterator(); + if (!grp.allowFastEviction()) { + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; - GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); + try { + GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); + + while (it0.hasNext()) { + ctx.database().checkpointReadLock(); + + try { + CacheDataRow row = it0.next(); + + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) { + hld = cacheMaps.get(row.cacheId()); + + if (hld == null) + continue; + } + + assert hld != null; + + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); + + ctx.database().checkpointReadLock(); + + try {if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + if (rec) { + hld.cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false);} + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; + + break; // Partition is already concurrently cleared and evicted. + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get iterator for evicted partition: " + id); + + rent.onDone(e); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get iterator for evicted partition: " + id, e); + } + } + } + + /** + * @param map Map to clear. + * @param extras Obsolete extras. + * @param evt Unload event flag. + * @throws NodeStoppingException + */ + private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map, + GridCacheObsoleteEntryExtras extras, + boolean evt) throws NodeStoppingException { + Iterator<GridCacheMapEntry> it = map.values().iterator(); while (it.hasNext()) { GridCacheMapEntry cached = null; - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { cached = it.next(); - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) { removeEntry(cached); if (!cached.isInternal()) { - if (rec) { - cctx.events().addEvent(cached.partition(), + if (evt) { + grp.addCacheEvent(cached.partition(), cached.key(), - cctx.localNodeId(), - (IgniteUuid)null, - null, + ctx.localNodeId(), EVT_CACHE_REBALANCE_OBJECT_UNLOADED, null, false, cached.rawGet(), cached.hasValue(), - null, - null, - null, false); } } @@ -886,66 +1098,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); } finally { - cctx.shared().database().checkpointReadUnlock(); - } - } - - if (!cctx.allowFastEviction()) { - try { - GridIterator<CacheDataRow> it0 = cctx.offheap().iterator(id); - - while (it0.hasNext()) { - try { - CacheDataRow row = it0.next(); - - GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(), - row.key(), - true, - false); - - cctx.shared().database().checkpointReadLock(); - - try { - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - if (rec) { - cctx.events().addEvent(cached.partition(), - cached.key(), - cctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } - } - } - finally { - cctx.shared().database().checkpointReadUnlock(); - } - } - catch (GridDhtInvalidPartitionException e) { - assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - - break; // Partition is already concurrently cleared and evicted. - } - } - } - catch (NodeStoppingException e) { - if (log.isDebugEnabled()) - log.debug("Failed to get iterator for evicted partition: " + id); - - rent.onDone(e); - - throw e; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to get iterator for evicted partition: " + id, e); + ctx.database().checkpointReadUnlock(); } } } @@ -955,7 +1108,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements */ private void clearDeferredDeletes() { for (RemovedEntryHolder e : rmvQueue) - cctx.dht().removeVersionedEntry(e.key(), e.version()); + removeVersionedEntry(e.cacheId(), e.key(), e.version()); } /** {@inheritDoc} */ @@ -980,6 +1133,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtLocalPartition.class, this, + "grp", grp.cacheOrGroupName(), "state", state(), "reservations", reservations(), "empty", isEmpty(), @@ -987,12 +1141,25 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public int publicSize() { + @Override public int publicSize(int cacheId) { + if (grp.sharedGroup()) { + CacheMapHolder hld = cacheMaps.get(cacheId); + + return hld != null ? hld.size.get() : 0; + } + return getSize(state.get()); } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { + @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.incrementAndGet(); + } + while (true) { long state = this.state.get(); @@ -1002,7 +1169,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { + @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.decrementAndGet(); + } + while (true) { long state = this.state.get(); @@ -1014,6 +1188,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId Cache ID. + */ + void onCacheStopped(int cacheId) { + assert grp.sharedGroup() : grp.cacheOrGroupName(); + + for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) { + RemovedEntryHolder e = it.next(); + + if (e.cacheId() == cacheId) + it.remove(); + } + + cacheMaps.remove(cacheId); + } + + /** * @param state Composite state. * @return Partition state. */ @@ -1068,6 +1258,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * Removed entry holder. */ private static class RemovedEntryHolder { + /** */ + private final int cacheId; + /** Cache key */ private final KeyCacheObject key; @@ -1078,11 +1271,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements private final long expireTime; /** + * @param cacheId Cache ID. * @param key Key. * @param ver Entry version. * @param ttl TTL. */ - private RemovedEntryHolder(KeyCacheObject key, GridCacheVersion ver, long ttl) { + private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) { + this.cacheId = cacheId; this.key = key; this.ver = ver; @@ -1090,6 +1285,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** * @return Key. */ KeyCacheObject key() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java index ea6ca06..87abd6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java @@ -174,7 +174,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse { } if (preloadEntries != null) - marshalInfos(preloadEntries, cctx); + marshalInfos(preloadEntries, cctx.shared(), cctx.cacheObjectContext()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index c6715e5..4e0608d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -88,9 +88,9 @@ public interface GridDhtPartitionTopology { public boolean stopping(); /** - * @return Cache ID. + * @return Cache group ID. */ - public int cacheId(); + public int groupId(); /** * Pre-initializes this topology. @@ -146,13 +146,12 @@ public interface GridDhtPartitionTopology { public void releasePartitions(int... parts); /** - * @param key Cache key. - * @param create If {@code true}, then partition will be created if it's not there. + * @param part Partition number. * @return Local partition. * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create) + @Nullable public GridDhtLocalPartition localPartition(int part) throws GridDhtInvalidPartitionException; /**
