http://git-wip-us.apache.org/repos/asf/ignite/blob/c56c4b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 1d7fb9e..26f37a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -19,148 +19,46 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.eviction.EvictionFilter; import org.apache.ignite.cache.eviction.EvictionPolicy; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridBusyLock; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; -import org.apache.ignite.internal.util.lang.IgnitePair; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.cache.CacheMode.LOCAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.events.EventType.EVT_CACHE_ENTRY_EVICTED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; -import static org.jsr166.ConcurrentLinkedDeque8.Node; /** - * TODO GG-11140 (old evictions implementation, now created for near cache, evictions to be reconsidered as part of GG-11140). + * */ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements CacheEvictionManager { - /** Attribute name used to queue node in entry metadata. */ - private static final int META_KEY = GridMetadataAwareAdapter.EntryKey.CACHE_EVICTION_MANAGER_KEY.key(); - /** Eviction policy. */ private EvictionPolicy plc; /** Eviction filter. */ private EvictionFilter filter; - /** Eviction buffer. */ - private final ConcurrentLinkedDeque8<EvictionInfo> bufEvictQ = new ConcurrentLinkedDeque8<>(); - - /** Active eviction futures. */ - private final Map<Long, EvictionFuture> futs = new ConcurrentHashMap8<>(); - - /** Futures count modification lock. */ - private final Lock futsCntLock = new ReentrantLock(); - - /** Futures count condition. */ - private final Condition futsCntCond = futsCntLock.newCondition(); - - /** Active futures count. */ - private volatile int activeFutsCnt; - - /** Max active futures count. */ - private int maxActiveFuts; - - /** Generator of future IDs. */ - private final AtomicLong idGen = new AtomicLong(); - - /** Evict backup synchronized flag. */ - private boolean evictSync; - - /** Evict near synchronized flag. */ - private boolean nearSync; - - /** Flag to hold {@code evictSync || nearSync} result. */ - private boolean evictSyncAgr; - /** Policy enabled. */ private boolean plcEnabled; - /** Backup entries worker. */ - private BackupWorker backupWorker; - - /** Backup entries worker thread. */ - private IgniteThread backupWorkerThread; - /** Busy lock. */ private final GridBusyLock busyLock = new GridBusyLock(); - /** Stopping flag. */ - private volatile boolean stopping; - /** Stopped flag. */ private boolean stopped; - /** Current future. */ - private final AtomicReference<EvictionFuture> curEvictFut = new AtomicReference<>(); - /** First eviction flag. */ private volatile boolean firstEvictWarn; @@ -170,157 +68,21 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements plc = cctx.isNear() ? cfg.getNearConfiguration().getNearEvictionPolicy() : cfg.getEvictionPolicy(); - plcEnabled = plc != null && cctx.isNear(); + plcEnabled = plc != null; filter = cfg.getEvictionFilter(); - if (cfg.getEvictMaxOverflowRatio() < 0) - throw new IgniteCheckedException("Configuration parameter 'maxEvictOverflowRatio' cannot be negative."); - - if (cfg.getEvictSynchronizedKeyBufferSize() < 0) - throw new IgniteCheckedException("Configuration parameter 'evictSynchronizedKeyBufferSize' cannot be negative."); - - if (!cctx.isLocal()) { - evictSync = cfg.isEvictSynchronized() && !cctx.isNear(); - - nearSync = isNearEnabled(cctx) && !cctx.isNear() && cfg.isEvictSynchronized(); - } - else { - if (cfg.isEvictSynchronized()) - U.warn(log, "Ignored 'evictSynchronized' configuration property for LOCAL cache: " + cctx.namexx()); - - if (cfg.getNearConfiguration() != null && cfg.isEvictSynchronized()) - U.warn(log, "Ignored 'evictNearSynchronized' configuration property for LOCAL cache: " + cctx.namexx()); - } - - if (cctx.isDht() && !nearSync && evictSync && isNearEnabled(cctx)) - throw new IgniteCheckedException("Illegal configuration (may lead to data inconsistency) " + - "[evictSync=true, evictNearSync=false]"); - - reportConfigurationProblems(); - - evictSyncAgr = evictSync || nearSync; - - if (evictSync && !cctx.isNear() && plcEnabled) { - backupWorker = new BackupWorker(); - - cctx.events().addListener( - new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT || - evt.type() == EVT_NODE_JOINED; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - // Notify backup worker on each topology change. - if (cctx.discovery().cacheAffinityNode(discoEvt.eventNode(), cctx.name())) - backupWorker.addEvent(discoEvt); - } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); - } - - if (evictSyncAgr) { - if (cfg.getEvictSynchronizedTimeout() <= 0) - throw new IgniteCheckedException("Configuration parameter 'evictSynchronousTimeout' should be positive."); - - if (cfg.getEvictSynchronizedConcurrencyLevel() <= 0) - throw new IgniteCheckedException("Configuration parameter 'evictSynchronousConcurrencyLevel' " + - "should be positive."); - - maxActiveFuts = cfg.getEvictSynchronizedConcurrencyLevel(); - - cctx.io().addHandler(cctx.cacheId(), GridCacheEvictionRequest.class, new CI2<UUID, GridCacheEvictionRequest>() { - @Override public void apply(UUID nodeId, GridCacheEvictionRequest msg) { - processEvictionRequest(nodeId, msg); - } - }); - - cctx.io().addHandler(cctx.cacheId(), GridCacheEvictionResponse.class, new CI2<UUID, GridCacheEvictionResponse>() { - @Override public void apply(UUID nodeId, GridCacheEvictionResponse msg) { - processEvictionResponse(nodeId, msg); - } - }); - - cctx.events().addListener( - new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - for (EvictionFuture fut : futs.values()) - fut.onNodeLeft(discoEvt.eventNode().id()); - } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT); - } - if (log.isDebugEnabled()) log.debug("Eviction manager started on node: " + cctx.nodeId()); } - /** - * Outputs warnings if potential configuration problems are detected. - */ - private void reportConfigurationProblems() { - CacheMode mode = cctx.config().getCacheMode(); - - if (plcEnabled && !cctx.isNear() && mode == PARTITIONED) { - if (!evictSync) { - U.warn(log, "Evictions are not synchronized with other nodes in topology " + - "which provides 2x-3x better performance but may cause data inconsistency if cache store " + - "is not configured (consider changing 'evictSynchronized' configuration property).", - "Evictions are not synchronized for cache: " + cctx.namexx()); - } - - if (!nearSync && isNearEnabled(cctx)) { - U.warn(log, "Evictions on primary node are not synchronized with near caches on other nodes " + - "which provides 2x-3x better performance but may cause data inconsistency (consider changing " + - "'nearEvictSynchronized' configuration property).", - "Evictions are not synchronized with near caches on other nodes for cache: " + cctx.namexx()); - } - } - } - - /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { - super.onKernalStart0(); - - if (plcEnabled && evictSync && !cctx.isNear()) { - // Add dummy event to worker. - DiscoveryEvent evt = cctx.discovery().localJoinEvent(); - - backupWorker.addEvent(evt); - - backupWorkerThread = new IgniteThread(backupWorker); - backupWorkerThread.start(); - } - } - /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { super.onKernalStop0(cancel); - // Change stopping first. - stopping = true; - busyLock.block(); try { - // Stop backup worker. - if (evictSync && !cctx.isNear() && backupWorker != null) { - backupWorker.cancel(); - - U.join( - backupWorkerThread, - log); - } - - // Cancel all active futures. - for (EvictionFuture fut : futs.values()) - fut.cancel(); - if (log.isDebugEnabled()) log.debug("Eviction manager stopped on node: " + cctx.nodeId()); } @@ -332,42 +94,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } /** - * @return Current size of evict queue. - */ - public int evictQueueSize() { - return bufEvictQ.sizex(); - } - - /** - * @param nodeId Sender node ID. - * @param res Response. - */ - private void processEvictionResponse(UUID nodeId, GridCacheEvictionResponse res) { - assert nodeId != null; - assert res != null; - - if (log.isDebugEnabled()) - log.debug("Processing eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() + - ", res=" + res + ']'); - - if (!enterBusy()) - return; - - try { - EvictionFuture fut = futs.get(res.futureId()); - - if (fut != null) - fut.onResponse(nodeId, res); - else if (log.isDebugEnabled()) - log.debug("Eviction future for response is not found [res=" + res + ", node=" + nodeId + - ", localNode=" + cctx.nodeId() + ']'); - } - finally { - busyLock.leaveBusy(); - } - } - - /** * @return {@code True} if entered busy. */ private boolean enterBusy() { @@ -384,324 +110,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } /** - * @param nodeId Sender node ID. - * @param req Request. - */ - private void processEvictionRequest(UUID nodeId, GridCacheEvictionRequest req) { - assert nodeId != null; - assert req != null; - - if (!enterBusy()) - return; - - try { - if (req.classError() != null) { - if (log.isDebugEnabled()) - log.debug("Class got undeployed during eviction: " + req.classError()); - - sendEvictionResponse(nodeId, new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true)); - - return; - } - - AffinityTopologyVersion topVer = lockTopology(); - - try { - if (!topVer.equals(req.topologyVersion())) { - if (log.isDebugEnabled()) - log.debug("Topology version is different [locTopVer=" + topVer + - ", rmtTopVer=" + req.topologyVersion() + ']'); - - sendEvictionResponse(nodeId, - new GridCacheEvictionResponse(cctx.cacheId(), req.futureId(), true)); - - return; - } - - processEvictionRequest0(nodeId, req); - } - finally { - unlockTopology(); - } - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param nodeId Sender node ID. - * @param req Request. - */ - private void processEvictionRequest0(UUID nodeId, GridCacheEvictionRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing eviction request [node=" + nodeId + ", localNode=" + cctx.nodeId() + - ", reqSize=" + req.entries().size() + ']'); - - // Partition -> {{Key, Version}, ...}. - // Group DHT and replicated cache entries by their partitions. - Map<Integer, Collection<CacheEvictionEntry>> dhtEntries = new HashMap<>(); - - Collection<CacheEvictionEntry> nearEntries = new LinkedList<>(); - - for (CacheEvictionEntry e : req.entries()) { - boolean near = e.near(); - - if (!near) { - // Lock is required. - Collection<CacheEvictionEntry> col = - F.addIfAbsent(dhtEntries, cctx.affinity().partition(e.key()), - new LinkedList<CacheEvictionEntry>()); - - assert col != null; - - col.add(e); - } - else - nearEntries.add(e); - } - - GridCacheEvictionResponse res = new GridCacheEvictionResponse(cctx.cacheId(), req.futureId()); - - GridCacheVersion obsoleteVer = cctx.versions().next(); - - // DHT and replicated cache entries. - for (Map.Entry<Integer, Collection<CacheEvictionEntry>> e : dhtEntries.entrySet()) { - int part = e.getKey(); - - boolean locked = lockPartition(part); // Will return false if preloading is disabled. - - try { - for (CacheEvictionEntry t : e.getValue()) { - KeyCacheObject key = t.key(); - GridCacheVersion ver = t.version(); - boolean near = t.near(); - - assert !near; - - boolean evicted = evictLocally(key, ver, near, obsoleteVer); - - if (log.isDebugEnabled()) - log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near + - ", evicted=" + evicted +']'); - - if (locked && evicted) - // Preloading is in progress, we need to save eviction info. - saveEvictionInfo(key, ver, part); - - if (!evicted) - res.addRejected(key); - } - } - finally { - if (locked) - unlockPartition(part); - } - } - - // Near entries. - for (CacheEvictionEntry t : nearEntries) { - KeyCacheObject key = t.key(); - GridCacheVersion ver = t.version(); - boolean near = t.near(); - - assert near; - - boolean evicted = evictLocally(key, ver, near, obsoleteVer); - - if (log.isDebugEnabled()) - log.debug("Evicted key [key=" + key + ", ver=" + ver + ", near=" + near + - ", evicted=" + evicted +']'); - - if (!evicted) - res.addRejected(key); - } - - sendEvictionResponse(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - private void sendEvictionResponse(UUID nodeId, GridCacheEvictionResponse res) { - try { - cctx.io().send(nodeId, res, cctx.ioPolicy()); - - if (log.isDebugEnabled()) - log.debug("Sent eviction response [node=" + nodeId + ", localNode=" + cctx.nodeId() + - ", res" + res + ']'); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send eviction response since initiating node left grid " + - "[node=" + nodeId + ", localNode=" + cctx.nodeId() + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send eviction response to node [node=" + nodeId + - ", localNode=" + cctx.nodeId() + ", res" + res + ']', e); - } - } - - /** - * @param key Key. - * @param ver Version. - * @param p Partition ID. - */ - private void saveEvictionInfo(KeyCacheObject key, GridCacheVersion ver, int p) { - assert cctx.rebalanceEnabled(); - - if (!cctx.isNear()) { - try { - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, - AffinityTopologyVersion.NONE, false); - - assert part != null; - - part.onEntryEvicted(key, ver); - } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition does not belong to local node [part=" + p + - ", nodeId" + cctx.localNode().id() + ']'); - } - } - else - assert false : "Failed to save eviction info: " + cctx.namexx(); - } - - /** - * @param p Partition ID. - * @return {@code True} if partition has been actually locked, - * {@code false} if preloading is finished or disabled and no lock is needed. - */ - private boolean lockPartition(int p) { - if (!cctx.rebalanceEnabled()) - return false; - - if (!cctx.isNear()) { - try { - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, - false); - - if (part != null && part.reserve()) { - part.lock(); - - if (part.state() != MOVING) { - part.unlock(); - - part.release(); - - return false; - } - - return true; - } - } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition does not belong to local node [part=" + p + - ", nodeId" + cctx.localNode().id() + ']'); - } - } - - // No lock is needed. - return false; - } - - /** - * @param p Partition ID. - */ - private void unlockPartition(int p) { - if (!cctx.rebalanceEnabled()) - return; - - if (!cctx.isNear()) { - try { - GridDhtLocalPartition part = cctx.dht().topology().localPartition(p, AffinityTopologyVersion.NONE, - false); - - if (part != null) { - part.unlock(); - - part.release(); - } - } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition does not belong to local node [part=" + p + - ", nodeId" + cctx.localNode().id() + ']'); - } - } - } - - /** - * Locks topology (for DHT cache only) and returns its version. - * - * @return Topology version after lock. - */ - private AffinityTopologyVersion lockTopology() { - if (!cctx.isNear()) { - cctx.dht().topology().readLock(); - - return cctx.dht().topology().topologyVersion(); - } - - return AffinityTopologyVersion.ZERO; - } - - /** - * Unlocks topology. - */ - private void unlockTopology() { - if (!cctx.isNear()) - cctx.dht().topology().readUnlock(); - } - - /** - * @param key Key to evict. - * @param ver Entry version on initial node. - * @param near {@code true} if entry should be evicted from near cache. - * @param obsoleteVer Obsolete version. - * @return {@code true} if evicted successfully, {@code false} if could not be evicted. - */ - private boolean evictLocally(KeyCacheObject key, - final GridCacheVersion ver, - boolean near, - GridCacheVersion obsoleteVer) - { - assert key != null; - assert ver != null; - assert obsoleteVer != null; - assert evictSyncAgr; - assert !cctx.isNear() || cctx.isReplicated(); - - if (log.isDebugEnabled()) - log.debug("Evicting key locally [key=" + key + ", ver=" + ver + ", obsoleteVer=" + obsoleteVer + - ", localNode=" + cctx.localNode() + ']'); - - GridCacheAdapter cache = near ? cctx.dht().near() : cctx.cache(); - - GridCacheEntryEx entry = cache.peekEx(key); - - if (entry == null) - return true; - - try { - // If entry should be evicted from near cache it can be done safely - // without any consistency risks. We don't use filter in this case. - // If entry should be evicted from DHT cache, we do not compare versions - // as well because versions may change outside the transaction. - return evict0(cache, entry, obsoleteVer, null, false); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to evict entry on remote node [key=" + key + ", localNode=" + cctx.nodeId() + ']', e); - - return false; - } - } - - /** * @param cache Cache from which to evict entry. * @param entry Entry to evict. * @param obsoleteVer Obsolete version. @@ -762,9 +170,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements if (!loc) { if (cctx.isNear()) return; - - if (evictSync) - return; } GridCacheEntryEx e = txEntry.cached(); @@ -781,13 +186,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } notifyPolicy(e); - - if (evictSyncAgr) - waitForEvictionFutures(); } /** {@inheritDoc} */ - @Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) { + @Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) { if (e.detached() || e.isInternal()) return; @@ -802,18 +204,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements if (!plcEnabled) return; - // Don't track non-primary entries if evicts are synchronized. - if (!cctx.isNear() && evictSync && !cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), topVer)) - return; - if (!enterBusy()) return; try { - // Wait for futures to finish. - if (evictSyncAgr) - waitForEvictionFutures(); - if (log.isDebugEnabled()) log.debug("Touching entry [entry=" + e + ", localNode=" + cctx.nodeId() + ']'); @@ -825,39 +219,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } /** - * @param e Entry for eviction policy notification. - */ - private void touch0(GridCacheEntryEx e) { - assert evictSyncAgr; - assert plcEnabled; - - // Do not wait for futures here since only limited number - // of entries can be passed to this method. - notifyPolicy(e); - } - - /** - * @param entries Entries for eviction policy notification. - */ - private void touchOnTopologyChange(Iterable<? extends GridCacheEntryEx> entries) { - assert evictSync; - assert plcEnabled; - - if (log.isDebugEnabled()) - log.debug("Touching entries [entries=" + entries + ", localNode=" + cctx.nodeId() + ']'); - - for (GridCacheEntryEx e : entries) { - if (e.key() instanceof GridCacheInternal) - // Skip internal entry. - continue; - - // Do not wait for futures here since only limited number - // of entries can be passed to this method. - notifyPolicy(e); - } - } - - /** * Warns on first eviction. */ private void warnFirstEvict() { @@ -870,16 +231,11 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } U.warn(log, "Evictions started (cache may have reached its capacity)." + - " You may wish to increase 'maxSize' on eviction policy being used for cache: " + cctx.name(), + " You may wish to increase 'maxSize' on eviction policy being used for cache: " + cctx.name(), "Evictions started (cache may have reached its capacity): " + cctx.name()); } /** {@inheritDoc} */ - @Override public boolean evictSyncOrNearSync() { - return evictSyncAgr; - } - - /** {@inheritDoc} */ @Override public boolean evict(@Nullable GridCacheEntryEx entry, @Nullable GridCacheVersion obsoleteVer, boolean explicit, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { if (entry == null) @@ -892,51 +248,19 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements if (!cctx.isNear() && !explicit && !firstEvictWarn) warnFirstEvict(); - if (evictSyncAgr) { - assert !cctx.isNear(); // Make sure cache is not NEAR. + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); - if (cctx.affinity().backupsByKey( - entry.key(), - cctx.topology().topologyVersion()).contains(cctx.localNode()) && - evictSync) - // Do not track backups if evicts are synchronized. - return !explicit; + // Do not touch entry if not evicted: + // 1. If it is call from policy, policy tracks it on its own. + // 2. If it is explicit call, entry is touched on tx commit. + return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit); + } - try { - if (!cctx.isAll(entry, filter)) - return false; - - if (entry.lockedByAny()) - return false; - - // Add entry to eviction queue. - enqueue(entry, filter); - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry got removed while evicting [entry=" + entry + - ", localNode=" + cctx.nodeId() + ']'); - } - } - else { - if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); - - // Do not touch entry if not evicted: - // 1. If it is call from policy, policy tracks it on its own. - // 2. If it is explicit call, entry is touched on tx commit. - return evict0(cctx.cache(), entry, obsoleteVer, filter, explicit); - } - - return true; - } - - /** {@inheritDoc} */ - @Override public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVer) - throws IgniteCheckedException { - assert !evictSyncAgr; - - List<GridCacheEntryEx> locked = new ArrayList<>(keys.size()); + /** {@inheritDoc} */ + @Override public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVer) + throws IgniteCheckedException { + List<GridCacheEntryEx> locked = new ArrayList<>(keys.size()); Set<GridCacheEntryEx> notRmv = null; @@ -1005,7 +329,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } finally { // Unlock entries in reverse order. - for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious();) { + for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious(); ) { GridCacheEntryEx e = it.previous(); GridUnsafe.monitorExit(e); @@ -1031,332 +355,6 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements } /** - * Enqueues entry for synchronized eviction. - * - * @param entry Entry. - * @param filter Filter. - * @throws GridCacheEntryRemovedException If entry got removed. - */ - private void enqueue(GridCacheEntryEx entry, CacheEntryPredicate[] filter) - throws GridCacheEntryRemovedException { - Node<EvictionInfo> node = entry.meta(META_KEY); - - if (node == null) { - node = bufEvictQ.addLastx(new EvictionInfo(entry, entry.version(), filter)); - - if (entry.putMetaIfAbsent(META_KEY, node) != null) - // Was concurrently added, need to clear it from queue. - bufEvictQ.unlinkx(node); - else if (log.isDebugEnabled()) - log.debug("Added entry to eviction queue: " + entry); - } - } - - /** - * Checks eviction queue. - */ - private void checkEvictionQueue() { - int maxSize = maxQueueSize(); - - int bufSize = bufEvictQ.sizex(); - - if (bufSize >= maxSize) { - if (log.isDebugEnabled()) - log.debug("Processing eviction queue: " + bufSize); - - Collection<EvictionInfo> evictInfos = new ArrayList<>(bufSize); - - for (int i = 0; i < bufSize; i++) { - EvictionInfo info = bufEvictQ.poll(); - - if (info == null) - break; - - evictInfos.add(info); - } - - if (!evictInfos.isEmpty()) - addToCurrentFuture(evictInfos); - } - } - - /** - * @return Max queue size. - */ - private int maxQueueSize() { - int size = (int)(cctx.cache().size() * cctx.config().getEvictMaxOverflowRatio()) / 100; - - if (size <= 0) - size = 500; - - return Math.min(size, cctx.config().getEvictSynchronizedKeyBufferSize()); - } - - /** - * Processes eviction queue (sends required requests, etc.). - * - * @param evictInfos Eviction information to create future with. - */ - private void addToCurrentFuture(Collection<EvictionInfo> evictInfos) { - assert !evictInfos.isEmpty(); - - while (true) { - EvictionFuture fut = curEvictFut.get(); - - if (fut == null) { - curEvictFut.compareAndSet(null, new EvictionFuture()); - - continue; - } - - if (fut.prepareLock()) { - boolean added; - - try { - added = fut.add(evictInfos); - } - finally { - fut.prepareUnlock(); - } - - if (added) { - if (fut.prepare()) { - // Thread that prepares future should remove it and install listener. - curEvictFut.compareAndSet(fut, null); - - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - if (!enterBusy()) { - if (log.isDebugEnabled()) - log.debug("Will not notify eviction future completion (grid is stopping): " + - f); - - return; - } - - try { - AffinityTopologyVersion topVer = lockTopology(); - - try { - onFutureCompleted((EvictionFuture)f, topVer); - } - finally { - unlockTopology(); - } - } - finally { - busyLock.leaveBusy(); - } - } - }); - } - - break; - } - else - // Infos were not added, create another future for next iteration. - curEvictFut.compareAndSet(fut, new EvictionFuture()); - } - else - // Future has not been locked, create another future for next iteration. - curEvictFut.compareAndSet(fut, new EvictionFuture()); - } - } - - /** - * @param fut Completed eviction future. - * @param topVer Topology version on future complete. - */ - private void onFutureCompleted(EvictionFuture fut, AffinityTopologyVersion topVer) { - if (!enterBusy()) - return; - - try { - IgniteBiTuple<Collection<EvictionInfo>, Collection<EvictionInfo>> t; - - try { - t = fut.get(); - } - catch (IgniteFutureCancelledCheckedException ignored) { - assert false : "Future has been cancelled, but manager is not stopping: " + fut; - - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Eviction future finished with error (all entries will be touched): " + fut, e); - - if (plcEnabled) { - for (EvictionInfo info : fut.entries()) - touch0(info.entry()); - } - - return; - } - - // Check if topology version is different. - if (!fut.topologyVersion().equals(topVer)) { - if (log.isDebugEnabled()) - log.debug("Topology has changed, all entries will be touched: " + fut); - - if (plcEnabled) { - for (EvictionInfo info : fut.entries()) - touch0(info.entry()); - } - - return; - } - - // Evict remotely evicted entries. - GridCacheVersion obsoleteVer = null; - - Collection<EvictionInfo> evictedEntries = t.get1(); - - for (EvictionInfo info : evictedEntries) { - GridCacheEntryEx entry = info.entry(); - - try { - // Remove readers on which the entry was evicted. - for (IgniteBiTuple<ClusterNode, Long> r : fut.evictedReaders(entry.key())) { - UUID readerId = r.get1().id(); - Long msgId = r.get2(); - - ((GridDhtCacheEntry)entry).removeReader(readerId, msgId); - } - - if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); - - // Do not touch primary entries, if not evicted. - // They will be touched within updating transactions. - evict0(cctx.cache(), entry, obsoleteVer, versionFilter(info), false); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to evict entry [entry=" + entry + - ", localNode=" + cctx.nodeId() + ']', e); - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry was concurrently removed while evicting [entry=" + entry + - ", localNode=" + cctx.nodeId() + ']'); - } - } - - Collection<EvictionInfo> rejectedEntries = t.get2(); - - // Touch remotely rejected entries (only if policy is enabled). - if (plcEnabled && !rejectedEntries.isEmpty()) { - for (EvictionInfo info : rejectedEntries) - touch0(info.entry()); - } - } - finally { - busyLock.leaveBusy(); - - // Signal on future completion. - signal(); - } - } - - /** - * This method should be called when eviction future is processed - * and unwind may continue. - */ - private void signal() { - futsCntLock.lock(); - - try { - // Avoid volatile read on assertion. - int cnt = --activeFutsCnt; - - assert cnt >= 0 : "Invalid futures count: " + cnt; - - if (cnt < maxActiveFuts) - futsCntCond.signalAll(); - } - finally { - futsCntLock.unlock(); - } - } - - /** - * @param info Eviction info. - * @return Version aware filter. - */ - private CacheEntryPredicate[] versionFilter(final EvictionInfo info) { - // If version has changed since we started the whole process - // then we should not evict entry. - return new CacheEntryPredicate[]{new CacheEntryPredicateAdapter() { - @Override public boolean apply(GridCacheEntryEx e) { - try { - GridCacheVersion ver = e.version(); - - return info.version().equals(ver) && F.isAll(info.filter()); - } - catch (GridCacheEntryRemovedException ignored) { - return false; - } - } - }}; - } - - /** - * Gets a collection of nodes to send eviction requests to. - * - * - * @param entry Entry. - * @param topVer Topology version. - * @return Tuple of two collections: dht (in case of partitioned cache) nodes - * and readers (empty for replicated cache). - * @throws GridCacheEntryRemovedException If entry got removed during method - * execution. - */ - @SuppressWarnings( {"IfMayBeConditional"}) - private IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> remoteNodes(GridCacheEntryEx entry, - AffinityTopologyVersion topVer) - throws GridCacheEntryRemovedException { - assert entry != null; - - assert cctx.config().getCacheMode() != LOCAL; - - Collection<ClusterNode> backups; - - if (evictSync) - backups = F.view(cctx.dht().topology().nodes(entry.partition(), topVer), F0.notEqualTo(cctx.localNode())); - else - backups = Collections.emptySet(); - - Collection<ClusterNode> readers; - - if (nearSync) { - readers = F.transform(((GridDhtCacheEntry)entry).readers(), new C1<UUID, ClusterNode>() { - @Nullable @Override public ClusterNode apply(UUID nodeId) { - return cctx.node(nodeId); - } - }); - } - else - readers = Collections.emptySet(); - - return new IgnitePair<>(backups, readers); - } - - /** {@inheritDoc} */ - @Override public void unwind() { - if (!evictSyncAgr) - return; - - if (!enterBusy()) - return; - - try { - checkEvictionQueue(); - } - finally { - busyLock.leaveBusy(); - } - } - - /** * @param e Entry to notify eviction policy. */ @SuppressWarnings({"IfMayBeConditional", "RedundantIfStatement"}) @@ -1372,731 +370,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter implements plc.onEntryAccessed(e.obsoleteOrDeleted(), e.wrapEviction()); } - /** - * - */ - @SuppressWarnings("TooBroadScope") - private void waitForEvictionFutures() { - if (activeFutsCnt >= maxActiveFuts) { - boolean interrupted = false; - - futsCntLock.lock(); - - try { - while(!stopping && activeFutsCnt >= maxActiveFuts) { - try { - futsCntCond.await(2000, MILLISECONDS); - } - catch (InterruptedException ignored) { - interrupted = true; - } - } - } - finally { - futsCntLock.unlock(); - - if (interrupted) - Thread.currentThread().interrupt(); - } - } - } - - /** - * Prints out eviction stats. - */ - public void printStats() { - X.println("Eviction stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cache=" + cctx.cache().name() + ", buffEvictQ=" + bufEvictQ.sizex() + ']'); - } - /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Eviction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ", cache=" + cctx.name() + ']'); - X.println(">>> buffEvictQ size: " + bufEvictQ.sizex()); - X.println(">>> futsSize: " + futs.size()); - X.println(">>> futsCreated: " + idGen.get()); - } - - /** - * - */ - private class BackupWorker extends GridWorker { - /** */ - private final BlockingQueue<DiscoveryEvent> evts = new LinkedBlockingQueue<>(); - - /** */ - private final Collection<Integer> primaryParts = new HashSet<>(); - - /** - * - */ - private BackupWorker() { - super(cctx.igniteInstanceName(), "cache-eviction-backup-worker", GridCacheEvictionManager.this.log); - - assert plcEnabled; - } - - /** - * @param evt New event. - */ - void addEvent(DiscoveryEvent evt) { - assert evt != null; - - evts.add(evt); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - assert !cctx.isNear() && evictSync; - - ClusterNode loc = cctx.localNode(); - - AffinityTopologyVersion initTopVer = - new AffinityTopologyVersion(cctx.discovery().localJoinEvent().topologyVersion(), 0); - - AffinityTopologyVersion cacheStartVer = cctx.startTopologyVersion(); - - if (cacheStartVer != null && cacheStartVer.compareTo(initTopVer) > 0) - initTopVer = cacheStartVer; - - // Initialize. - primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(), initTopVer)); - - while (!isCancelled()) { - DiscoveryEvent evt = evts.take(); - - if (log.isDebugEnabled()) - log.debug("Processing event: " + evt); - - AffinityTopologyVersion topVer = new AffinityTopologyVersion(evt.topologyVersion()); - - // Remove partitions that are no longer primary. - for (Iterator<Integer> it = primaryParts.iterator(); it.hasNext();) { - if (!evts.isEmpty()) - break; - - if (!cctx.affinity().primaryByPartition(loc, it.next(), topVer)) - it.remove(); - } - - // Move on to next event. - if (!evts.isEmpty()) - continue; - - for (GridDhtLocalPartition part : cctx.topology().localPartitions()) { - if (!evts.isEmpty()) - break; - - if (part.primary(topVer) && primaryParts.add(part.id())) { - if (log.isDebugEnabled()) - log.debug("Touching partition entries: " + part); - - touchOnTopologyChange(part.allEntries()); - } - } - } - } - catch (InterruptedException ignored) { - // No-op. - } - catch (IgniteException e) { - if (!e.hasCause(InterruptedException.class)) - throw e; - } - } - } - - /** - * Wrapper around an entry to be put into queue. - */ - private class EvictionInfo { - /** Cache entry. */ - private GridCacheEntryEx entry; - - /** Start version. */ - private GridCacheVersion ver; - - /** Filter to pass before entry will be evicted. */ - private CacheEntryPredicate[] filter; - - /** - * @param entry Entry. - * @param ver Version. - * @param filter Filter. - */ - EvictionInfo(GridCacheEntryEx entry, GridCacheVersion ver, - CacheEntryPredicate[] filter) { - assert entry != null; - assert ver != null; - - this.entry = entry; - this.ver = ver; - this.filter = filter; - } - - /** - * @return Entry. - */ - GridCacheEntryEx entry() { - return entry; - } - - /** - * @return Version. - */ - GridCacheVersion version() { - return ver; - } - - /** - * @return Filter. - */ - CacheEntryPredicate[] filter() { - return filter; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(EvictionInfo.class, this); - } - } - - /** - * Future for synchronized eviction. Result is a tuple: {evicted entries, rejected entries}. - */ - private class EvictionFuture extends GridFutureAdapter<IgniteBiTuple<Collection<EvictionInfo>, - Collection<EvictionInfo>>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final long id = idGen.incrementAndGet(); - - /** */ - private ConcurrentLinkedDeque8<EvictionInfo> evictInfos = new ConcurrentLinkedDeque8<>(); - - /** */ - private final ConcurrentMap<KeyCacheObject, EvictionInfo> entries = new ConcurrentHashMap8<>(); - - /** */ - private final ConcurrentMap<KeyCacheObject, Collection<ClusterNode>> readers = - new ConcurrentHashMap8<>(); - - /** */ - private final Collection<EvictionInfo> evictedEntries = new GridConcurrentHashSet<>(); - - /** */ - private final ConcurrentMap<KeyCacheObject, EvictionInfo> rejectedEntries = new ConcurrentHashMap8<>(); - - /** Request map. */ - private final ConcurrentMap<UUID, GridCacheEvictionRequest> reqMap = - new ConcurrentHashMap8<>(); - - /** Response map. */ - private final ConcurrentMap<UUID, GridCacheEvictionResponse> resMap = - new ConcurrentHashMap8<>(); - - /** To make sure that future is completing within a single thread. */ - private final AtomicBoolean finishPrepare = new AtomicBoolean(); - - /** Lock to use when future is being initialized. */ - @GridToStringExclude - private final ReadWriteLock prepareLock = new ReentrantReadWriteLock(); - - /** To make sure that future is completing within a single thread. */ - private final AtomicBoolean completing = new AtomicBoolean(); - - /** Lock to use after future is initialized. */ - @GridToStringExclude - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** Object to force future completion on elapsing eviction timeout. */ - @GridToStringExclude - private GridTimeoutObject timeoutObj; - - /** Topology version future is processed on. */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** - * @return {@code True} if prepare lock was acquired. - */ - boolean prepareLock() { - return prepareLock.readLock().tryLock(); - } - - /** - * - */ - void prepareUnlock() { - prepareLock.readLock().unlock(); - } - - /** - * @param infos Infos to add. - * @return {@code False} if entries were not added due to capacity restrictions. - */ - boolean add(Collection<EvictionInfo> infos) { - assert infos != null && !infos.isEmpty(); - - if (evictInfos.sizex() > maxQueueSize()) - return false; - - evictInfos.addAll(infos); - - return true; - } - - /** - * @return {@code True} if future has been prepared by this call. - */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - boolean prepare() { - if (evictInfos.sizex() >= maxQueueSize() && finishPrepare.compareAndSet(false, true)) { - // Lock will never be released intentionally. - prepareLock.writeLock().lock(); - - futsCntLock.lock(); - - try { - activeFutsCnt++; - } - finally { - futsCntLock.unlock(); - } - - // Put future in map. - futs.put(id, this); - - prepare0(); - - return true; - } - - return false; - } - - /** - * Prepares future (sends all required requests). - */ - private void prepare0() { - if (log.isDebugEnabled()) - log.debug("Preparing eviction future [futId=" + id + ", localNode=" + cctx.nodeId() + - ", infos=" + evictInfos + ']'); - - assert evictInfos != null && !evictInfos.isEmpty(); - - topVer = lockTopology(); - - try { - Collection<EvictionInfo> locals = null; - - for (EvictionInfo info : evictInfos) { - // Queue node may have been stored in entry metadata concurrently, but we don't care - // about it since we are currently processing this entry. - Node<EvictionInfo> queueNode = info.entry().removeMeta(META_KEY); - - if (queueNode != null) - bufEvictQ.unlinkx(queueNode); - - IgniteBiTuple<Collection<ClusterNode>, Collection<ClusterNode>> tup; - - try { - tup = remoteNodes(info.entry(), topVer); - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry got removed while preparing eviction future (ignoring) [entry=" + - info.entry() + ", nodeId=" + cctx.nodeId() + ']'); - - continue; - } - - Collection<ClusterNode> entryReaders = - F.addIfAbsent(readers, info.entry().key(), new GridConcurrentHashSet<ClusterNode>()); - - assert entryReaders != null; - - // Add entry readers so that we could remove them right before local eviction. - entryReaders.addAll(tup.get2()); - - Collection<ClusterNode> nodes = F.concat(true, tup.get1(), tup.get2()); - - if (!nodes.isEmpty()) { - entries.put(info.entry().key(), info); - - // There are remote participants. - for (ClusterNode node : nodes) { - GridCacheEvictionRequest req = F.addIfAbsent(reqMap, node.id(), - new GridCacheEvictionRequest(cctx.cacheId(), id, evictInfos.size(), topVer, - cctx.deploymentEnabled())); - - assert req != null; - - req.addKey(info.entry().key(), info.version(), entryReaders.contains(node)); - } - } - else { - if (locals == null) - locals = new HashSet<>(evictInfos.size(), 1.0f); - - // There are no remote participants, need to keep the entry as local. - locals.add(info); - } - } - - if (locals != null) { - // Evict entries without remote participant nodes immediately. - GridCacheVersion obsoleteVer = cctx.versions().next(); - - for (EvictionInfo info : locals) { - if (log.isDebugEnabled()) - log.debug("Evicting key without remote participant nodes: " + info); - - try { - // Touch primary entry (without backup nodes) if not evicted - // to keep tracking. - if (!evict0(cctx.cache(), info.entry(), obsoleteVer, versionFilter(info), false) && - plcEnabled) - touch0(info.entry()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to evict entry: " + info.entry(), e); - } - } - } - - // If there were only local entries. - if (entries.isEmpty()) { - complete(false); - - return; - } - } - finally { - unlockTopology(); - } - - // Send eviction requests. - for (Map.Entry<UUID, GridCacheEvictionRequest> e : reqMap.entrySet()) { - UUID nodeId = e.getKey(); - - GridCacheEvictionRequest req = e.getValue(); - - if (log.isDebugEnabled()) - log.debug("Sending eviction request [node=" + nodeId + ", req=" + req + ']'); - - try { - cctx.io().send(nodeId, req, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignored) { - // Node left the topology. - onNodeLeft(nodeId); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to send eviction request to node [node=" + nodeId + ", req=" + req + ']', ex); - - rejectEntries(nodeId); - } - } - - registerTimeoutObject(); - } - - /** - * - */ - private void registerTimeoutObject() { - // Check whether future has not been completed yet. - if (lock.readLock().tryLock()) { - try { - timeoutObj = new GridTimeoutObjectAdapter(cctx.config().getEvictSynchronizedTimeout()) { - @Override public void onTimeout() { - complete(true); - } - }; - - cctx.time().addTimeoutObject(timeoutObj); - } - finally { - lock.readLock().unlock(); - } - } - } - - /** - * @return Future ID. - */ - long id() { - return id; - } - - /** - * @return Topology version. - */ - AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Keys to readers mapping. - */ - Map<KeyCacheObject, Collection<ClusterNode>> readers() { - return readers; - } - - /** - * @return All entries associated with future that should be evicted (or rejected). - */ - Collection<EvictionInfo> entries() { - return entries.values(); - } - - /** - * Reject all entries on behalf of specified node. - * - * @param nodeId Node ID. - */ - private void rejectEntries(UUID nodeId) { - assert nodeId != null; - - if (lock.readLock().tryLock()) { - try { - if (log.isDebugEnabled()) - log.debug("Rejecting entries for node: " + nodeId); - - GridCacheEvictionRequest req = reqMap.remove(nodeId); - - for (CacheEvictionEntry t : req.entries()) { - EvictionInfo info = entries.get(t.key()); - - assert info != null; - - rejectedEntries.put(t.key(), info); - } - } - finally { - lock.readLock().unlock(); - } - } - - checkDone(); - } - - /** - * @param nodeId Node id that left the topology. - */ - void onNodeLeft(UUID nodeId) { - assert nodeId != null; - - if (lock.readLock().tryLock()) { - try { - // Stop waiting response from this node. - reqMap.remove(nodeId); - - resMap.remove(nodeId); - } - finally { - lock.readLock().unlock(); - } - - checkDone(); - } - } - - /** - * @param nodeId Sender node ID. - * @param res Response. - */ - void onResponse(UUID nodeId, GridCacheEvictionResponse res) { - assert nodeId != null; - assert res != null; - - if (lock.readLock().tryLock()) { - try { - if (log.isDebugEnabled()) - log.debug("Entered to eviction future onResponse() [fut=" + this + ", node=" + nodeId + - ", res=" + res + ']'); - - ClusterNode node = cctx.node(nodeId); - - if (node != null) - resMap.put(nodeId, res); - else - // Sender node left grid. - reqMap.remove(nodeId); - } - finally { - lock.readLock().unlock(); - } - - if (res.evictError()) - // Complete future, since there was a class loading error on at least one node. - complete(false); - else - checkDone(); - } - else { - if (log.isDebugEnabled()) - log.debug("Ignored eviction response [fut=" + this + ", node=" + nodeId + ", res=" + res + ']'); - } - } - - /** - * - */ - private void checkDone() { - if (reqMap.isEmpty() || resMap.keySet().containsAll(reqMap.keySet())) - complete(false); - } - - /** - * Completes future. - * - * @param timedOut {@code True} if future is being forcibly completed on timeout. - */ - @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) - private void complete(boolean timedOut) { - if (completing.compareAndSet(false, true)) { - // Lock will never be released intentionally. - lock.writeLock().lock(); - - futs.remove(id); - - if (timeoutObj != null && !timedOut) - // If future is timed out, corresponding object is already removed. - cctx.time().removeTimeoutObject(timeoutObj); - - if (log.isDebugEnabled()) - log.debug("Building eviction future result [fut=" + this + ", timedOut=" + timedOut + ']'); - - boolean err = false; - - for (GridCacheEvictionResponse res : resMap.values()) { - if (res.evictError()) { - err = true; - - break; - } - } - - if (err) { - Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() { - @Override public boolean apply(UUID e) { - return resMap.get(e).evictError(); - } - }); - - assert !ids.isEmpty(); - - U.warn(log, "Remote node(s) failed to process eviction request " + - "due to topology changes " + - "(some backup or remote values maybe lost): " + ids); - } - - if (timedOut) - U.warn(log, "Timed out waiting for eviction future " + - "(consider changing 'evictSynchronousTimeout' and 'evictSynchronousConcurrencyLevel' " + - "configuration parameters)."); - - if (err || timedOut) { - // Future has not been completed successfully, all entries should be rejected. - assert evictedEntries.isEmpty(); - - rejectedEntries.putAll(entries); - } - else { - // Copy map to filter remotely rejected entries, - // as they will be touched within corresponding txs. - Map<KeyCacheObject, EvictionInfo> rejectedEntries0 = new HashMap<>(rejectedEntries); - - // Future has been completed successfully - build result. - for (EvictionInfo info : entries.values()) { - KeyCacheObject key = info.entry().key(); - - if (rejectedEntries0.containsKey(key)) - // Was already rejected. - continue; - - boolean rejected = false; - - for (GridCacheEvictionResponse res : resMap.values()) { - if (res.rejectedKeys().contains(key)) { - // Modify copied map. - rejectedEntries0.put(key, info); - - rejected = true; - - break; - } - } - - if (!rejected) - evictedEntries.add(info); - } - } - - // Pass entries that were rejected due to topology changes - // or due to timeout or class loading issues. - // Remotely rejected entries will be touched within corresponding txs. - onDone(F.t(evictedEntries, rejectedEntries.values())); - } - } - - /** - * @param key Key. - * @return Reader nodes on which given key was evicted. - */ - Collection<IgniteBiTuple<ClusterNode, Long>> evictedReaders(KeyCacheObject key) { - Collection<ClusterNode> mappedReaders = readers.get(key); - - if (mappedReaders == null) - return Collections.emptyList(); - - Collection<IgniteBiTuple<ClusterNode, Long>> col = new LinkedList<>(); - - for (Map.Entry<UUID, GridCacheEvictionResponse> e : resMap.entrySet()) { - ClusterNode node = cctx.node(e.getKey()); - - // If node has left or response did not arrive from near node - // then just skip it. - if (node == null || !mappedReaders.contains(node)) - continue; - - GridCacheEvictionResponse res = e.getValue(); - - if (!res.rejectedKeys().contains(key)) - col.add(F.t(node, res.messageId())); - } - - return col; - } - - /** {@inheritDoc} */ - @SuppressWarnings("LockAcquiredButNotSafelyReleased") - @Override public boolean cancel() { - if (completing.compareAndSet(false, true)) { - // Lock will never be released intentionally. - lock.writeLock().lock(); - - if (timeoutObj != null) - cctx.time().removeTimeoutObject(timeoutObj); - - boolean b = onCancelled(); - - assert b; - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(EvictionFuture.class, this); - } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/c56c4b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java deleted file mode 100644 index cb454f8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; - -/** - * Cache eviction request. - */ -public class GridCacheEvictionRequest extends GridCacheMessage implements GridCacheDeployable { - /** */ - private static final long serialVersionUID = 0L; - - /** Future id. */ - private long futId; - - /** Entries to clear from near and backup nodes. */ - @GridToStringInclude - @GridDirectCollection(CacheEvictionEntry.class) - private Collection<CacheEvictionEntry> entries; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheEvictionRequest() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param futId Future id. - * @param size Size. - * @param topVer Topology version. - * @param addDepInfo Deployment info flag. - */ - GridCacheEvictionRequest(int cacheId, long futId, int size, @NotNull AffinityTopologyVersion topVer, - boolean addDepInfo) { - assert futId > 0; - assert size > 0; - assert topVer.topologyVersion() > 0; - - this.cacheId = cacheId; - this.futId = futId; - this.addDepInfo = addDepInfo; - - entries = new ArrayList<>(size); - - this.topVer = topVer; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (entries != null) { - GridCacheContext cctx = ctx.cacheContext(cacheId); - - for (CacheEvictionEntry e : entries) { - e.prepareMarshal(cctx); - - if (addDepInfo) - prepareObject(e.key().value(cctx.cacheObjectContext(), false), cctx); - } - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (entries != null) { - GridCacheContext cctx = ctx.cacheContext(cacheId); - - for (CacheEvictionEntry e : entries) - e.finishUnmarshal(cctx, ldr); - } - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** - * @return Future id. - */ - long futureId() { - return futId; - } - - /** - * @return Entries - {{Key, Version, Boolean (near or not)}, ...}. - */ - Collection<CacheEvictionEntry> entries() { - return entries; - } - - /** - * @return Topology version. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * Add key to request. - * - * @param key Key to evict. - * @param ver Entry version. - * @param near {@code true} if key should be evicted from near cache. - */ - void addKey(KeyCacheObject key, GridCacheVersion ver, boolean near) { - assert key != null; - assert ver != null; - - entries.add(new CacheEvictionEntry(key, ver, near)); - } - - /** {@inheritDoc} */ - @Override public boolean ignoreClassErrors() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeLong("futId", futId)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - entries = reader.readCollection("entries", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - futId = reader.readLong("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridCacheEvictionRequest.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 14; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 6; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheEvictionRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c56c4b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java deleted file mode 100644 index 69ec09f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashSet; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Cache eviction response. - */ -public class GridCacheEvictionResponse extends GridCacheMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Future ID. */ - private long futId; - - /** Rejected keys. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private Collection<KeyCacheObject> rejectedKeys = new HashSet<>(); - - /** Flag to indicate whether request processing has finished with error. */ - private boolean err; - - /** - * Required by {@link Externalizable}. - */ - public GridCacheEvictionResponse() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param futId Future ID. - */ - GridCacheEvictionResponse(int cacheId, long futId) { - this(cacheId, futId, false); - } - - /** - * @param cacheId Cache ID. - * @param futId Future ID. - * @param err {@code True} if request processing has finished with error. - */ - GridCacheEvictionResponse(int cacheId, long futId, boolean err) { - this.cacheId = cacheId; - this.futId = futId; - this.err = err; - } - - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - prepareMarshalCacheObjects(rejectedKeys, ctx.cacheContext(cacheId)); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - finishUnmarshalCacheObjects(rejectedKeys, ctx.cacheContext(cacheId), ldr); - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return false; - } - - /** - * @return Future ID. - */ - long futureId() { - return futId; - } - - /** - * @return Rejected keys. - */ - Collection<KeyCacheObject> rejectedKeys() { - return rejectedKeys; - } - - /** - * Add rejected key to response. - * - * @param key Evicted key. - */ - void addRejected(KeyCacheObject key) { - assert key != null; - - rejectedKeys.add(key); - } - - /** - * @return {@code True} if request processing has finished with error. - */ - boolean evictError() { - return err; - } - - /** {@inheritDoc} */ - @Override public boolean ignoreClassErrors() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeBoolean("err", err)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeLong("futId", futId)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeCollection("rejectedKeys", rejectedKeys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - err = reader.readBoolean("err"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - futId = reader.readLong("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - rejectedKeys = reader.readCollection("rejectedKeys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridCacheEvictionResponse.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 15; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 6; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheEvictionResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/c56c4b8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 99878ec..fdd29e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -523,20 +523,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); switch (msg.directType()) { - case 14: { - GridCacheEvictionRequest req = (GridCacheEvictionRequest)msg; - - GridCacheEvictionResponse res = new GridCacheEvictionResponse( - ctx.cacheId(), - req.futureId(), - req.classError() != null - ); - - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); - } - - break; - case 30: { GridDhtLockRequest req = (GridDhtLockRequest)msg;
