Repository: ignite Updated Branches: refs/heads/ignite-5075 dc8e10259 -> 36b7037ab
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36b7037a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36b7037a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36b7037a Branch: refs/heads/ignite-5075 Commit: 36b7037ab64ec2ca64e2e994c6af0753256a62fa Parents: dc8e102 Author: sboikov <[email protected]> Authored: Wed May 17 15:38:19 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 17 15:38:19 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtCacheAdapter.java | 242 +++++++++++++++- .../dht/GridDhtTransactionalCacheAdapter.java | 17 +- .../dht/atomic/GridDhtAtomicCache.java | 17 +- .../dht/preloader/GridDhtForceKeysFuture.java | 14 +- .../dht/preloader/GridDhtPreloader.java | 280 +------------------ 5 files changed, 278 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 7789673..d2d59b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; -import java.util.AbstractSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; @@ -34,13 +32,15 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; -import org.apache.ignite.internal.processors.cache.CachePeekModes; import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; @@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdat import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -70,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridIteratorAdapter; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CI3; @@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; @@ -86,8 +88,11 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; +import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** * DHT cache adapter. @@ -102,6 +107,30 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** Multi tx futures. */ private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>(); + /** Force key futures. */ + private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); + + /** */ + private volatile boolean stopping; + + /** Discovery listener. */ + private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + DiscoveryEvent e = (DiscoveryEvent)evt; + + ClusterNode loc = ctx.localNode(); + + assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : e; + + final ClusterNode n = e.eventNode(); + + assert !loc.id().equals(n.id()); + + for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) + f.onDiscoveryEvent(e); + } + }; + /** * Empty constructor required for {@link Externalizable}. */ @@ -110,6 +139,176 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * Adds future to future map. + * + * @param fut Future to add. + * @return {@code False} if node cache is stopping and future was completed with error. + */ + public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { + forceKeyFuts.put(fut.futureId(), fut); + + if (stopping) { + fut.onDone(stopError()); + + return false; + } + + return true; + } + + /** + * Removes future from future map. + * + * @param fut Future to remove. + */ + public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) { + forceKeyFuts.remove(fut.futureId(), fut); + } + + /** + * @param node Node. + * @param msg Message. + */ + protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { + GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); + + if (f != null) + f.onResult(msg); + else if (log.isDebugEnabled()) + log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + + ", res=" + msg + ']'); + } + /** + * @param node Node originated request. + * @param msg Force keys message. + */ + protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { + IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); + + if (fut.isDone()) + processForceKeysRequest0(node, msg); + else + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + processForceKeysRequest0(node, msg); + } + }); + } + + /** + * @param node Node originated request. + * @param msg Force keys message. + */ + private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { + try { + ClusterNode loc = ctx.localNode(); + + GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( + ctx.cacheId(), + msg.futureId(), + msg.miniId(), + ctx.deploymentEnabled()); + + GridDhtPartitionTopology top = ctx.topology(); + + for (KeyCacheObject k : msg.keys()) { + int p = ctx.affinity().partition(k); + + GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); + + // If this node is no longer an owner. + if (locPart == null && !top.owners(p).contains(loc)) { + res.addMissed(k); + + continue; + } + + GridCacheEntryEx entry; + + while (true) { + try { + entry = ctx.dht().entryEx(k); + + entry.unswap(); + + GridCacheEntryInfo info = entry.info(); + + if (info == null) { + assert entry.obsolete() : entry; + + continue; + } + + if (!info.isNew()) + res.addInfo(info); + + ctx.evicts().touch(entry, msg.topologyVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry: " + k); + } + catch (GridDhtInvalidPartitionException ignore) { + if (log.isDebugEnabled()) + log.debug("Local node is no longer an owner: " + p); + + res.addMissed(k); + + break; + } + } + } + + if (log.isDebugEnabled()) + log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); + + ctx.io().send(node, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + + ", req=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e); + } + } + + /** + * + */ + public void dumpDebugInfo() { + if (!forceKeyFuts.isEmpty()) { + U.warn(log, "Pending force key futures [cache=" + ctx.name() + "]:"); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + U.warn(log, ">>> " + fut); + } + } + + @Override public void onKernalStop() { + super.onKernalStop(); + + stopping = true; + + IgniteCheckedException err = stopError(); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + fut.onDone(err); + + ctx.gridEvents().removeLocalEventListener(discoLsnr); + } + + /** + * @return Node stop exception. + */ + private IgniteCheckedException stopError() { + return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); + } + + /** * @param nodeId Sender node ID. * @param res Near get response. */ @@ -174,6 +373,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap processTtlUpdateRequest(req); } }); + + ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @@ -1194,4 +1395,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return topVer; } } + + /** + * + */ + protected abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void apply(UUID nodeId, M msg) { + ClusterNode node = ctx.node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']'); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']'); + + onMessage(node, msg); + } + + /** + * @param node Node. + * @param msg Message. + */ + protected abstract void onMessage(ClusterNode node, M msg); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 8f46f89..5fd3111 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; @@ -159,6 +160,20 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach processDhtUnlockRequest(nodeId, req); } }); + + ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class, + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class, + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c080470..c171514 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -69,7 +69,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -398,6 +399,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class, + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class, + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); + if (near == null) { ctx.io().addHandler( false, http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 0f39081..845619d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec /** Future ID. */ private IgniteUuid futId = IgniteUuid.randomUuid(); - /** Preloader. */ - private GridDhtPreloader preloader; - /** Trackable flag. */ private boolean trackable; @@ -112,13 +109,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param cctx Cache context. * @param topVer Topology version. * @param keys Keys. - * @param preloader Preloader. */ public GridDhtForceKeysFuture( GridCacheContext<K, V> cctx, AffinityTopologyVersion topVer, - Collection<KeyCacheObject> keys, - GridDhtPreloader preloader + Collection<KeyCacheObject> keys ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; @@ -126,7 +121,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec this.cctx = cctx; this.keys = keys; this.topVer = topVer; - this.preloader = preloader; top = cctx.dht().topology(); @@ -158,7 +152,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) { if (super.onDone(res, err)) { if (trackable) - preloader.remoteFuture(this); + cctx.dht().removeFuture(this); return true; } @@ -170,7 +164,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param evt Discovery event. */ @SuppressWarnings( {"unchecked"}) - void onDiscoveryEvent(DiscoveryEvent evt) { + public void onDiscoveryEvent(DiscoveryEvent evt) { topCntr.incrementAndGet(); int type = evt.type(); @@ -244,7 +238,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int curTopVer = topCntr.get(); - if (!preloader.addFuture(this)) { + if (!cctx.dht().addFuture(this)) { assert isDone() : this; return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index afde2cc..52c1600 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -23,29 +23,20 @@ import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -59,22 +50,14 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; -import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** * DHT cache preloader. @@ -86,9 +69,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** */ private GridDhtPartitionTopology top; - /** Force key futures. */ - private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); - /** Partition suppliers. */ private GridDhtPartitionSupplier supplier; @@ -111,40 +91,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final AtomicInteger partsEvictOwning = new AtomicInteger(); /** */ - private volatile boolean stopping; - - /** */ private boolean stopped; - /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - if (!enterBusy()) - return; - - DiscoveryEvent e = (DiscoveryEvent)evt; - - try { - ClusterNode loc = ctx.localNode(); - - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED; - - final ClusterNode n = e.eventNode(); - - assert !loc.id().equals(n.id()); - - for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) - f.onDiscoveryEvent(e); - - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - } - finally { - leaveBusy(); - } - } - }; - /** * @param grp Cache group. */ @@ -161,26 +109,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("Starting DHT rebalancer..."); - ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysRequest.class, - new MessageHandler<GridDhtForceKeysRequest>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { - processForceKeysRequest(node, msg); - } - }); - - ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysResponse.class, - new MessageHandler<GridDhtForceKeysResponse>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { - processForceKeyResponse(node, msg); - } - }); - supplier = new GridDhtPartitionSupplier(grp); demander = new GridDhtPartitionDemander(grp); demander.start(); - - ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @@ -199,10 +131,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("DHT rebalancer onKernalStop callback."); - stopping = true; - - ctx.gridEvents().removeLocalEventListener(discoLsnr); - // Acquire write busy lock. busyLock.writeLock().lock(); @@ -213,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (demander != null) demander.stop(); - IgniteCheckedException err = stopError(); - - for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) - fut.onDone(err); - top = null; stopped = true; @@ -226,12 +149,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { busyLock.writeLock().unlock(); } } - /** - * @return Node stop exception. - */ - private IgniteCheckedException stopError() { - return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); - } /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { @@ -457,132 +374,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * @param node Node originated request. - * @param msg Force keys message. - */ - private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { - IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); - - if (fut.isDone()) - processForceKeysRequest0(node, msg); - else - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - processForceKeysRequest0(node, msg); - } - }); - } - - /** - * @param node Node originated request. - * @param msg Force keys message. - */ - private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { - if (!enterBusy()) - return; - - try { - GridCacheContext cctx = ctx.cacheContext(msg.cacheId()); - - ClusterNode loc = cctx.localNode(); - - GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( - cctx.cacheId(), - msg.futureId(), - msg.miniId(), - cctx.deploymentEnabled()); - - for (KeyCacheObject k : msg.keys()) { - int p = cctx.affinity().partition(k); - - GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); - - // If this node is no longer an owner. - if (locPart == null && !top.owners(p).contains(loc)) { - res.addMissed(k); - - continue; - } - - GridCacheEntryEx entry; - - while (true) { - try { - entry = cctx.dht().entryEx(k); - - entry.unswap(); - - GridCacheEntryInfo info = entry.info(); - - if (info == null) { - assert entry.obsolete() : entry; - - continue; - } - - if (!info.isNew()) - res.addInfo(info); - - cctx.evicts().touch(entry, msg.topologyVersion()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry: " + k); - } - catch (GridDhtInvalidPartitionException ignore) { - if (log.isDebugEnabled()) - log.debug("Local node is no longer an owner: " + p); - - res.addMissed(k); - - break; - } - } - } - - if (log.isDebugEnabled()) - log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); - - cctx.io().send(node, res, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + - ", req=" + msg + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e); - } - finally { - leaveBusy(); - } - } - - /** - * @param node Node. - * @param msg Message. - */ - private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { - if (!enterBusy()) - return; - - try { - GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); - - if (f != null) - f.onResult(msg); - else if (log.isDebugEnabled()) - log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + - ", res=" + msg + ']'); - } - finally { - leaveBusy(); - } - } - - /** * Resends partitions on partition evict within configured timeout. * * @param part Evicted partition. @@ -643,13 +434,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** + * @param cctx Cache context. * @param keys Keys to request. * @param topVer Topology version. * @return Future for request. */ @SuppressWarnings({"unchecked", "RedundantCast"}) private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); + if (cctx.isNear()) + cctx = cctx.near().dht().context(); + + final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -703,33 +498,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } } - /** - * Adds future to future map. - * - * @param fut Future to add. - * @return {@code False} if node cache is stopping and future was completed with error. - */ - boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { - forceKeyFuts.put(fut.futureId(), fut); - - if (stopping) { - fut.onDone(stopError()); - - return false; - } - - return true; - } - - /** - * Removes future from future map. - * - * @param fut Future to remove. - */ - void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) { - forceKeyFuts.remove(fut.futureId(), fut); - } - /** {@inheritDoc} */ @Override public void evictPartitionAsync(GridDhtLocalPartition part) { partsToEvict.putIfAbsent(part.id(), part); @@ -791,44 +559,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void dumpDebugInfo() { - if (!forceKeyFuts.isEmpty()) { - U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:"); - - for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) - U.warn(log, ">>> " + fut); - } - supplier.dumpDebugInfo(); } - - /** - * - */ - private abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void apply(UUID nodeId, M msg) { - ClusterNode node = ctx.node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']'); - - return; - } - - if (log.isDebugEnabled()) - log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']'); - - onMessage(node, msg); - } - - /** - * @param node Node. - * @param msg Message. - */ - protected abstract void onMessage(ClusterNode node, M msg); - } }
