http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1937a5f,8c3c5d1..fc1d9e3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -768,108 -765,92 +770,108 @@@ public abstract class GridCacheAdapter< PeekModes modes = parsePeekModes(peekModes, false); - try { - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + + CacheObject cacheVal = null; - CacheObject cacheVal = null; + if (!ctx.isLocal()) { + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - if (!ctx.isLocal()) { - AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); + int part = ctx.affinity().partition(cacheKey); - int part = ctx.affinity().partition(cacheKey); + boolean nearKey; - boolean nearKey; + if (!(modes.near && modes.primary && modes.backup)) { - boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer); ++ boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer); + + if (keyPrimary) { + if (!modes.primary) + return null; - if (!(modes.near && modes.primary && modes.backup)) { - boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer); + nearKey = false; + } + else { - boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer); ++ boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); - if (keyPrimary) { - if (!modes.primary) + if (keyBackup) { + if (!modes.backup) return null; nearKey = false; } else { - boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); - - if (keyBackup) { - if (!modes.backup) - return null; - - nearKey = false; - } - else { - if (!modes.near) - return null; - - nearKey = true; + if (!modes.near) + return null; - // Swap and offheap are disabled for near cache. - modes.offheap = false; - modes.swap = false; - } - } - } - else { - nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); + nearKey = true; - if (nearKey) { // Swap and offheap are disabled for near cache. modes.offheap = false; - modes.swap = false; } } + } + else { - nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer); ++ nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); - if (nearKey && !ctx.isNear()) - return null; + if (nearKey) { + // Swap and offheap are disabled for near cache. + modes.offheap = false; + } + } - if (modes.heap) { - GridCacheEntryEx e = nearKey ? peekEx(cacheKey) : - (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey)); + if (nearKey && !ctx.isNear()) + return null; - if (e != null) { - cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc); + GridCacheEntryEx e; + GridCacheContext ctx0; - modes.offheap = false; - modes.swap = false; - } + while (true) { + if (nearKey) { + ctx0 = context(); + e = peekEx(key); + } + else { + ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; + e = modes.offheap ? ctx0.cache().entryEx(key) : ctx0.cache().peekEx(key); } - if (modes.offheap || modes.swap) { - GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); + if (e != null) { + try { + cacheVal = e.peek(modes.heap, modes.offheap, topVer, plc); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during 'peek': " + key); - cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap); + continue; + } + finally { + ctx0.evicts().touch(e, null); + } } + + break; } - else - cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, modes.swap, plc); + } + else { + while (true) { + try { + cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, plc); - Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false); + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during 'peek': " + key); - return (V)val; + // continue + } + } } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during 'peek': " + key); - return null; - } + Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false); + + return (V)val; } /** @@@ -1520,31 -1482,31 +1523,31 @@@ final boolean intercept = ctx.config().getInterceptor() != null; IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain( - new CX1<IgniteInternalFuture<T2<V, GridCacheVersion>>, CacheEntry<K, V>>() { - @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<T2<V, GridCacheVersion>> f) + new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() { + @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f) throws IgniteCheckedException { - T2<V, GridCacheVersion> t = f.get(); + EntryGetResult t = f.get(); - K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; + K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; - CacheEntry val = t != null ? new CacheEntryImplEx<>( - key, - t.value(), - t.version()) - : null; + CacheEntry val = t != null ? new CacheEntryImplEx<>( + key, - t.get1(), - t.get2()) ++ t.value(), ++ t.version()) + : null; - if (intercept) { - V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + if (intercept) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); - return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.get2() : null) : null; - return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null; ++ return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null; + } + else + return val; } - else - return val; - } - }); + }); if (statsEnabled) - fut.listen(new UpdateGetTimeStatClosure<T2<V, GridCacheVersion>>(metrics0(), start)); + fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start)); return fr; } @@@ -1639,22 -1587,9 +1642,22 @@@ final long start = statsEnabled ? System.nanoTime() : 0L; + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + String taskName = ctx.kernalContext().job().currentTaskName(); + - IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>> fut = - (IgniteInternalFuture<Map<K, T2<V, GridCacheVersion>>>)((IgniteInternalFuture)getAllAsync( + IgniteInternalFuture<Map<K, EntryGetResult>> fut = - (IgniteInternalFuture<Map<K, EntryGetResult>>) - ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true)); ++ (IgniteInternalFuture<Map<K, EntryGetResult>>)((IgniteInternalFuture)getAllAsync( + keys, + !ctx.config().isReadFromBackup(), + /*skip tx*/false, + opCtx != null ? opCtx.subjectId() : null, + taskName, + !(opCtx != null && opCtx.isKeepBinary()), + opCtx != null && opCtx.recovery(), + /*skip vals*/false, + /*can remap*/true, + /*need ver*/true)); final boolean intercept = ctx.config().getInterceptor() != null; @@@ -1732,7 -1667,9 +1735,7 @@@ */ @SuppressWarnings("IfMayBeConditional") private Collection<CacheEntry<K, V>> interceptGetEntries( - @Nullable Collection<? extends K> keys, Map<K, T2<V, GridCacheVersion>> map) { + @Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) { - Map<K, CacheEntry<K, V>> res; - if (F.isEmpty(keys)) { assert map.isEmpty(); @@@ -1957,23 -1890,14 +1966,23 @@@ } if (tx == null || tx.implicit()) { - Map<KeyCacheObject, GridCacheVersion> misses = null; - try { - final AffinityTopologyVersion topVer = tx == null ? - (canRemap ? - ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : - tx.topologyVersion(); ++ Map<KeyCacheObject, EntryGetResult> misses = null; + final AffinityTopologyVersion topVer = tx == null ? + (canRemap ? + ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); + + try { int keysSize = keys.size(); + GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture(); + + Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null; + + if (ex != null) + return new GridFinishedFuture<>(ex); + final Map<K1, V1> map = keysSize == 1 ? (Map<K1, V1>)new IgniteBiTuple<>() : U.<K1, V1>newHashMap(keysSize); @@@ -1992,33 -1920,55 +2001,53 @@@ } try { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( - null, - null, - /*update-metrics*/!skipVals, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary); - - if (res == null) { - if (storeEnabled) { - GridCacheVersion ver = entry.version(); + EntryGetResult res; + + boolean evt = !skipVals; + boolean updateMetrics = !skipVals; + + if (storeEnabled) { - res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(), - updateMetrics, ++ res = entry.innerGetAndReserveForLoad(updateMetrics, + evt, + subjId, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + assert res != null; + if (res.value() == null) { if (misses == null) misses = new HashMap<>(); - misses.put(key, ver); + misses.put(key, res); + + res = null; } - else - ctx.evicts().touch(entry, topVer); } else { + res = entry.innerGetVersioned( + null, + null, - ctx.isSwapOrOffheapEnabled(), + /*unmarshal*/true, + updateMetrics, + evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + if (res == null) + ctx.evicts().touch(entry, topVer); + } + + if (res != null) { ctx.addResult(map, key, - res.get1(), + res, skipVals, keepCacheObjects, deserializeBinary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 26e4ed3,f47e9f3..8e6a9ec --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@@ -305,9 -324,11 +306,9 @@@ public interface GridCacheEntryEx * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned( + public EntryGetResult innerGetVersioned( @Nullable GridCacheVersion ver, IgniteInternalTx tx, - boolean readSwap, - boolean unmarshal, boolean updateMetrics, boolean evt, UUID subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d28ea25,2237e22..846c633 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -103,6 -99,15 +102,9 @@@ public abstract class GridCacheMapEntr private static final byte IS_UNSWAPPED_MASK = 0x02; /** */ - private static final byte IS_OFFHEAP_PTR_MASK = 0x04; - - /** */ - private static final byte IS_SWAPPING_REQUIRED = 0x08; - - /** */ - private static final byte IS_EVICT_DISABLED = 0x10; ++ private static final byte IS_EVICT_DISABLED = 0x04; + + /** */ public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator(); /** @@@ -479,24 -776,56 +481,51 @@@ taskName, expirePlc, false, - keepBinary); + keepBinary, + false, + null); + } + + /** {@inheritDoc} */ - @Override public EntryGetResult innerGetAndReserveForLoad(boolean readSwap, - boolean updateMetrics, ++ @Override public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics, + boolean evt, + UUID subjId, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { + return (EntryGetResult)innerGet0( + /*ver*/null, + /*tx*/null, - readSwap, + /*readThrough*/false, + evt, + updateMetrics, - /*tmp*/false, + subjId, + /*transformClo*/null, + taskName, + expiryPlc, + true, + keepBinary, + /*reserve*/true, + readerArgs); } /** {@inheritDoc} */ - @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + @Override public EntryGetResult innerGetVersioned( @Nullable GridCacheVersion ver, IgniteInternalTx tx, - boolean readSwap, - boolean unmarshal, boolean updateMetrics, boolean evt, UUID subjId, Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { - return (T2<CacheObject, GridCacheVersion>)innerGet0( - return (EntryGetResult)innerGet0(ver, ++ return (EntryGetResult)innerGet0( + ver, tx, - readSwap, false, evt, updateMetrics, @@@ -624,14 -965,28 +658,27 @@@ // Cache version for optimistic check. startVer = ver; - } - if (ret != null) { - assert !obsolete; - assert !deferred; + addReaderIfNeed(readerArgs); + + if (ret != null) { - assert tmp || !(ret instanceof BinaryObjectOffheapImpl); + assert !obsolete; + assert !deferred; + + // If return value is consistent, then done. + res = retVer ? entryGetResult(ret, resVer, false) : ret; + } + else if (reserveForLoad && !obsolete) { + assert !readThrough; + assert retVer; + + boolean reserve = !evictionDisabled(); + + if (reserve) + flags |= IS_EVICT_DISABLED; - // If return value is consistent, then done. - return retVer ? new T2<>(ret, resVer) : ret; + res = entryGetResult(null, resVer, reserve); + } } if (obsolete) { @@@ -682,8 -1046,20 +734,10 @@@ update(ret, expTime, ttl, nextVer, true); - if (hadValPtr && cctx.offheapTiered()) { - if (log.isTraceEnabled()) { - log.trace("innerGet removeOffheap [key=" + key + - ", entry=" + System.identityHashCode(this) + - ", ptr=" + offHeapPointer() + ']'); - } - - cctx.swap().removeOffheap(key); - } - if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); + + assert readerArgs == null; } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { @@@ -2170,8 -3077,21 +2238,9 @@@ value(null); ver = newVer; + flags &= ~IS_EVICT_DISABLED; - if (log.isTraceEnabled()) { - log.trace("invalidate releaseSwap [key=" + key + - ", entry=" + System.identityHashCode(this) + - ", val=" + val + - ", ptr=" + offHeapPointer() + - ']'); - } - - releaseSwap(); - - clearIndex(val); + removeValue(); onInvalidate(); } @@@ -2249,9 -3168,10 +2318,10 @@@ ttlAndExpireTimeExtras(ttl, expireTime); this.ver = ver; + flags &= ~IS_EVICT_DISABLED; - if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl()) - cctx.ttl().addTrackedEntry(this); + if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion())) + cctx.ttl().addTrackedEntry((GridNearCacheEntry)this); } /** @@@ -2421,18 -3394,8 +2491,18 @@@ } } + /** {@inheritDoc} */ + @Nullable @Override public CacheObject peek(@Nullable IgniteCacheExpiryPolicy plc) + throws GridCacheEntryRemovedException, IgniteCheckedException { + IgniteInternalTx tx = cctx.tm().localTxx(); + + AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion(); + + return peek(true, false, topVer, plc); + } + /** - * TODO: GG-4009: do we need to generate event and invalidate value? + * TODO: IGNITE-3500: do we need to generate event and invalidate value? * * @return {@code true} if expired. * @throws IgniteCheckedException In case of failure. http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 46a20ab,86dd4ea..f339f46 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -57,8 -54,8 +56,9 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; - import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; + import org.apache.ignite.internal.managers.discovery.DiscoCache; + import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; @@@ -222,16 -210,19 +220,18 @@@ public class GridCachePartitionExchange } } - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : - assert - evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : ++ assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + -- "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; ++ "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), + affinityTopologyVersion(evt), + evt.type()); - exchFut = exchangeFuture(exchId, e, null, null); + exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; + DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); @@@ -260,11 -251,10 +260,10 @@@ } } - //todo think about refactoring - if (!F.isEmpty(valid)) { + if (!F.isEmpty(valid) && !(valid.size() == 1 && valid.iterator().next().globalStateChange())) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, e, valid, null); + exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { @@@ -278,14 -268,8 +277,14 @@@ } } else - exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); } + else if (customEvt.customMessage() instanceof StartFullSnapshotAckDiscoveryMessage + && !((StartFullSnapshotAckDiscoveryMessage)customEvt.customMessage()).hasError()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); ++ exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + - exchFut = exchangeFuture(exchId, e, null, null); ++ exchFut = exchangeFuture(exchId, evt, null, null, null); + } } if (exchId != null) { @@@ -1303,8 -1291,11 +1307,12 @@@ } else { if (msg.client()) { - final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), + final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture( - msg.exchangeId(), null, null, null); ++ msg.exchangeId(), + null, + null, + null, + null); exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 987d696,b016883..c18dbcf mode 100644,100755..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -880,48 -857,33 +891,59 @@@ public class GridCacheProcessor extend assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started"; assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; + + if (!ctx.clientNode() && !ctx.isDaemon()) + addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000)); + + } + + /** + * @param timeout Cleanup timeout. + */ + private void addRemovedItemsCleanupTask(long timeout) { + ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout)); } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void stop(boolean cancel) throws IgniteCheckedException { - for (String cacheName : stopSeq) { - GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName)); + /** + * + */ + private void checkConsistency() throws IgniteCheckedException { + if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) + continue; - if (cache != null) - stopCache(cache, cancel); - } + checkTransactionConfiguration(n); - for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) { - if (cache == stoppedCaches.remove(maskNull(cache.name()))) - stopCache(cache, cancel); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + + CU.checkAttributeMismatch( + log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); + + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); + + checkCache(locCfg, rmtCfg, n); + + // Check plugin cache configurations. + CachePluginManager pluginMgr = desc.pluginManager(); + + pluginMgr.validateRemotes(rmtCfg, n); + } + } + } } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void stop(boolean cancel) throws IgniteCheckedException { + stopCaches(cancel); List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index ca71f51,9c4e4ef..eb23c43 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -30,8 -30,8 +30,9 @@@ import java.util.concurrent.locks.Reent import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; + import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@@ -41,10 -41,7 +42,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; - import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index fba1877,58dbbcf..ac6eee3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@@ -650,10 -657,10 +654,11 @@@ public abstract class GridDhtCacheAdapt String taskName, @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean recovery ) { return getAllAsync0(keys, + readerArgs, readThrough, /*don't check local tx. */false, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 2c3435d,d0d801a..5fe0ef4 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@@ -392,32 -390,22 +395,23 @@@ public final class GridDhtGetFuture<K, txFut.markInitialized(); } - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut; if (txFut == null || txFut.isDone()) { - if (tx == null) { - fut = cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true, - recovery); - } - else { - fut = tx.getAllAsync(cctx, - null, - keys.keySet(), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + fut = cache().getDhtAllAsync( + keys.keySet(), + readerArgs, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, - /*can remap*/true); ++ /*can remap*/true, ++ recovery); } else { + final ReaderArguments args = readerArgs; + // If we are here, then there were active transactions for some entries // when we were adding the reader. In that case we must wait for those // transactions to complete. @@@ -428,27 -416,15 +422,16 @@@ if (e != null) throw new GridClosureException(e); - if (tx == null) { - return cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true, - recovery); - } - else { - return tx.getAllAsync(cctx, - null, - keys.keySet(), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + return cache().getDhtAllAsync( + keys.keySet(), + args, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, - /*can remap*/true); ++ /*can remap*/true, ++ recovery); } } ); http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 81d2570,33f4661..9cc69b5 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@@ -350,32 -348,22 +353,23 @@@ public final class GridDhtGetSingleFutu } } - IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; + IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut; if (rdrFut == null || rdrFut.isDone()) { - if (tx == null) { - fut = cache().getDhtAllAsync( - Collections.singleton(key), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true, - recovery); - } - else { - fut = tx.getAllAsync(cctx, - null, - Collections.singleton(key), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + fut = cache().getDhtAllAsync( + Collections.singleton(key), + readerArgs, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, - /*can remap*/true); ++ /*can remap*/true, ++ recovery); } else { + final ReaderArguments args = readerArgs; + rdrFut.listen( new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> fut) { @@@ -397,20 -384,7 +390,8 @@@ taskName, expiryPlc, skipVals, - /*can remap*/true); + /*can remap*/true, + recovery); - } - else { - fut0 = tx.getAllAsync(cctx, - null, - Collections.singleton(key), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false - ); - } fut0.listen(createGetFutureListener()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 9b30593,9f8498a..b1fe6ec --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@@ -49,24 -44,23 +49,28 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridCircularBuffer; + import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.T2; + import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; + import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; +import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; @@@ -160,22 -149,9 +169,24 @@@ public class GridDhtLocalPartition impl int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); + rmvQueueMaxSize = U.ceilPow2(delQueueSize); + + rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000); + + try { + store = cctx.offheap().createCacheDataStore(id); + } + catch (IgniteCheckedException e) { + // TODO ignite-db + throw new IgniteException(e); + } + } + + /** + * @return Data store. + */ + public CacheDataStore dataStore() { + return store; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index ac3e2c8,bdd84b0..4f8de4e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -17,10 -17,8 +17,9 @@@ package org.apache.ignite.internal.processors.cache.distributed.dht; - import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 322bbe3,84ff96b..04188fd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -30,18 -30,15 +30,19 @@@ import java.util.Set import java.util.UUID; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; + import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@@ -503,86 -468,70 +504,86 @@@ class GridDhtPartitionTopologyImpl impl ClusterNode loc = cctx.localNode(); - U.writeLock(lock); + cctx.shared().database().checkpointReadLock(); - try { - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + synchronized (cctx.shared().exchange().interruptLock()) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); - if (stopping) - return; + try { + U.writeLock(lock); + } + catch (IgniteInterruptedCheckedException e) { + cctx.shared().database().checkpointReadUnlock(); - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + throw e; + } - if (exchId.isLeft()) - removeNode(exchId.nodeId()); + try { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + if (stopping) + return; - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; - long updateSeq = this.updateSeq.incrementAndGet(); + if (exchId.isLeft()) + removeNode(exchId.nodeId()); - ClusterNode oldest = currentCoordinator(); - cntrMap.clear(); ++ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + long updateSeq = this.updateSeq.incrementAndGet(); - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); + cntrMap.clear(); + + // If this is the oldest node. + if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); + if (affReady) + initPartitions0(exchFut, updateSeq); + else { + List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); + + createPartitions(aff, updateSeq); } - } - if (affReady) - initPartitions0(exchFut, updateSeq); - else { - List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); + consistencyCheck(); - createPartitions(aff, updateSeq); + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); } + finally { + lock.writeLock().unlock(); - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); + cctx.shared().database().checkpointReadUnlock(); + } } // Wait for evictions. @@@ -761,14 -692,12 +762,14 @@@ try { loc = locParts.get(p); + state = loc != null ? loc.state() : null; + - boolean belongs = cctx.affinity().localNode(p, topVer); + boolean belongs = cctx.affinity().partitionLocalNode(p, topVer); - if (loc != null && loc.state() == EVICTED) { + if (loc != null && state == EVICTED) { locParts.set(p, loc = null); - if (!belongs) + if (!treatAllPartAsLoc && !belongs) throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); @@@ -835,9 -750,10 +836,10 @@@ for (int i = 0; i < locParts.length(); i++) { GridDhtLocalPartition part = locParts.get(i); - if (part != null) + if (part != null && part.state().active()) list.add(part); } + return list; } @@@ -1065,11 -981,9 +1067,11 @@@ /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update( - @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, ++ @Override public GridDhtPartitionMap2 update( + @Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, Long> cntrMap) { + @Nullable Map<Integer, T2<Long, Long>> cntrMap + ) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 6c4da68,519239a..0ef8bb8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@@ -353,10 -341,10 +354,11 @@@ public class GridPartitionedGetFuture<K topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + recovery); add(fut); // Append new future. @@@ -461,9 -452,11 +464,9 @@@ GridCacheVersion ver = null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, /**update-metrics*/false, /*event*/!skipVals, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index ea69743,a3f6b72..0da3a44 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@@ -316,10 -300,10 +317,11 @@@ public class GridPartitionedSingleGetFu topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + recovery); } try { @@@ -388,9 -375,11 +390,9 @@@ GridCacheVersion ver = null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + EntryGetResult res = entry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, /**update-metrics*/false, /*event*/!skipVals, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 72489fd,cebf4ae..ef8150c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -555,11 -547,8 +555,10 @@@ public class GridDhtAtomicCache<K, V> e /** {@inheritDoc} */ @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { + CacheOperationContext opCtx = ctx.operationContextPerCall(); + return getAllAsyncInternal(keys, !ctx.config().isReadFromBackup(), - true, null, ctx.kernalContext().job().currentTaskName(), deserializeBinary, @@@ -1607,10 -1584,12 +1605,10 @@@ GridCacheVersion ver = null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, - /**update-metrics*/false, + /*update-metrics*/false, /*event*/!skipVals, subjId, null, @@@ -3209,10 -3186,11 +3212,11 @@@ * @param nodeId Sender node ID. * @param res Near atomic update response. */ - @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (msgLog.isDebugEnabled()) - msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + + msgLog.debug("Received near atomic update response " + - "[futId=" + res.futureVersion() + ", node=" + nodeId + ']'); ++ "[futId=" + res.futureVersion() + + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index d5e8389,e1e0ec2..d86dc91 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@@ -476,9 -473,11 +478,9 @@@ public class GridDhtColocatedCache<K, V GridCacheVersion ver = null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + getRes = entry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, /**update-metrics*/false, /*event*/!skipVals, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 358ec8f,79ca108..31cff03 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -911,37 -911,38 +917,38 @@@ public final class GridDhtColocatedLock !topLocked && (tx == null || !tx.hasRemoteLocks()); - first = false; - } - - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - implicitTx(), - implicitSingleTx(), - read, - retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncMode() == FULL_SYNC, - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? createTtl : -1L, + first = false; + } + + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncMode() == FULL_SYNC, + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L, ++ read ? createTtl : -1L, + read ? accessTtl : -1L, - skipStore, - keepBinary, - clientFirst, - cctx.deploymentEnabled()); + skipStore, + keepBinary, + clientFirst, + cctx.deploymentEnabled()); - mapping.request(req); - } + mapping.request(req); + } distributedKeys.add(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 97d768a,9942423..b80ad04 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@@ -272,24 -288,22 +272,24 @@@ class GridDhtPartitionSupplier boolean partMissing = false; if (phase == SupplyContextPhase.NEW) - phase = SupplyContextPhase.ONHEAP; + phase = SupplyContextPhase.OFFHEAP; + + if (phase == SupplyContextPhase.OFFHEAP) { + IgniteRebalanceIterator iter; + + if (sctx == null || sctx.entryIt == null) { + iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); - if (phase == SupplyContextPhase.ONHEAP) { - Iterator<GridCacheMapEntry> entIt = sctx != null ? - (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator(); + if (!iter.historical()) + s.clean(part); + } + else + iter = (IgniteRebalanceIterator)sctx.entryIt; - while (entIt.hasNext()) { + while (iter.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, so we send '-1' partition and move on. + // Demander no longer needs this partition, + // so we send '-1' partition and move on. s.missed(part); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index af0085d,46fb144..c33dc7b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -46,9 -44,7 +46,9 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; - import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; + import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@@ -488,9 -455,9 +502,11 @@@ public class GridDhtPartitionsExchangeF assert !dummy && !forcePreload : this; try { + discoCache.updateAlives(cctx.discovery()); + + AffinityTopologyVersion topVer = topologyVersion(); + - srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topVer)); + srvNodes = new ArrayList<>(discoCache.serverNodes()); remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId())))); @@@ -1324,13 -1118,10 +1340,12 @@@ * Cleans up resources to avoid excessive memory usage. */ public void cleanUp() { - topSnapshot.set(null); singleMsgs.clear(); fullMsgs.clear(); + changeGlobalStateExceptions.clear(); crd = null; partReleaseFut = null; + changeGlobalStateE = null; } /** @@@ -1402,11 -1194,10 +1418,13 @@@ if (crd.isLocal()) { if (remaining.remove(node.id())) { - updatePartitionSingleMap(node, msg); + updateSingleMap = true; + + pendingSingleUpdates++; + if (exchangeOnChangeGlobalState && msg.getException() != null) + changeGlobalStateExceptions.put(node.id(), msg.getException()); + allReceived = remaining.isEmpty(); } } @@@ -1414,8 -1205,42 +1432,42 @@@ singleMsgs.put(node, msg); } - if (allReceived) + if (updateSingleMap) { + try { - updatePartitionSingleMap(msg); ++ updatePartitionSingleMap(node, msg); + } + finally { + synchronized (mux) { + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + mux.notifyAll(); + } + } + } + + if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(); + } + } + + /** + * + */ + private void awaitSingleMapUpdates() { + synchronized (mux) { + try { + while (pendingSingleUpdates > 0) + U.wait(mux); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); + } + } } /** @@@ -1541,10 -1272,10 +1593,10 @@@ try { assert crd.isLocal(); - if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) { + if (!crd.equals(discoCache.serverNodes().get(0))) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff); + cacheCtx.topology().beforeExchange(this, !centralizedAff); } } @@@ -1937,10 -1626,9 +1991,12 @@@ } if (crd0.isLocal()) { + if (exchangeOnChangeGlobalState && changeGlobalStateE !=null) + changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); + if (allReceived) { + awaitSingleMapUpdates(); + onAllReceived(); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index ff4e838,8b74ae6..7cabbd4 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@@ -377,10 -374,10 +377,11 @@@ public final class GridNearGetFuture<K topVer, subjId, taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + recovery); add(fut); // Append new future. @@@ -441,9 -438,11 +442,9 @@@ // First we peek into near cache. if (isNear) { if (needVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + EntryGetResult res = entry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, /**update-metrics*/true, /*event*/!skipVals, subjId, @@@ -577,9 -579,11 +579,9 @@@ boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer); if (needVer) { - T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned( + EntryGetResult res = dhtEntry.innerGetVersioned( null, null, - /*swap*/true, - /*unmarshal*/true, /**update-metrics*/false, /*event*/!nearRead && !skipVals, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index b096d5d,7ca2635..dbf8391 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@@ -137,10 -138,10 +141,11 @@@ public class GridNearGetRequest extend @NotNull AffinityTopologyVersion topVer, UUID subjId, int taskNameHash, + long createTtl, long accessTtl, boolean skipVals, - boolean addDepInfo + boolean addDepInfo, + boolean recovery ) { assert futId != null; assert miniId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/91210eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ----------------------------------------------------------------------