http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 26eef50..703daf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -81,7 +81,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - preldr = new GridDhtPreloader<>(ctx); + preldr = new GridDhtPreloader(ctx); preldr.start(); @@ -518,7 +518,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return; } - // Group lock can be only started from local node, so we never start group lock transaction on remote node. IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null); // Register listener just so we print out errors. @@ -534,8 +533,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) { assert nodeId != null; assert res != null; - GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(), - res.futureId()); + GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -604,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert tx != null; - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>( + GridDhtLockFuture fut = new GridDhtLockFuture( ctx, tx.nearNodeId(), tx.nearXidVersion(), @@ -669,7 +667,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @return Future. */ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, final ClusterNode nearNode, final GridNearLockRequest req, @Nullable final CacheEntryPredicate[] filter0) { @@ -719,26 +717,57 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (filter == null) filter = req.filter(); - GridDhtLockFuture<K, V> fut = null; + GridDhtLockFuture fut = null; if (!req.inTx()) { - fut = new GridDhtLockFuture<>(ctx, - nearNode.id(), - req.version(), - req.topologyVersion(), - cnt, - req.txRead(), - req.needReturnValue(), - req.timeout(), - tx, - req.threadId(), - req.accessTtl(), - filter, - req.skipStore()); + GridDhtPartitionTopology top = null; + + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); + + top = topology(); + + topology().readLock(); + } + + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } + + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); + + return new GridFinishedFuture<>(res); + } + + fut = new GridDhtLockFuture(ctx, + nearNode.id(), + req.version(), + req.topologyVersion(), + cnt, + req.txRead(), + req.needReturnValue(), + req.timeout(), + tx, + req.threadId(), + req.accessTtl(), + filter, + req.skipStore()); - // Add before mapping. - if (!ctx.mvcc().addFuture(fut)) - throw new IllegalStateException("Duplicate future ID: " + fut); + // Add before mapping. + if (!ctx.mvcc().addFuture(fut)) + throw new IllegalStateException("Duplicate future ID: " + fut); + } + finally { + if (top != null) + top.readUnlock(); + } } boolean timedout = false; @@ -788,45 +817,76 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // Handle implicit locks for pessimistic transactions. if (req.inTx()) { if (tx == null) { - tx = new GridDhtTxLocal( - ctx.shared(), - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - req.implicitTx(), - req.implicitSingleTx(), - ctx.systemTx(), - false, - ctx.ioPolicy(), - PESSIMISTIC, - req.isolation(), - req.timeout(), - req.isInvalidate(), - false, - req.txSize(), - null, - req.subjectId(), - req.taskNameHash()); + GridDhtPartitionTopology top = null; - tx.syncCommit(req.syncCommit()); + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - tx = ctx.tm().onCreated(null, tx); + top = topology(); - if (tx == null || !tx.init()) { - String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); + topology().readLock(); + } - U.warn(log, msg); + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } - if (tx != null) - tx.rollback(); + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); - return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); - } + return new GridFinishedFuture<>(res); + } - tx.topologyVersion(req.topologyVersion()); + tx = new GridDhtTxLocal( + ctx.shared(), + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + req.implicitTx(), + req.implicitSingleTx(), + ctx.systemTx(), + false, + ctx.ioPolicy(), + PESSIMISTIC, + req.isolation(), + req.timeout(), + req.isInvalidate(), + false, + req.txSize(), + null, + req.subjectId(), + req.taskNameHash()); + + tx.syncCommit(req.syncCommit()); + + tx = ctx.tm().onCreated(null, tx); + + if (tx == null || !tx.init()) { + String msg = "Failed to acquire lock (transaction has been completed): " + + req.version(); + + U.warn(log, msg); + + if (tx != null) + tx.rollback(); + + return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); + } + + tx.topologyVersion(req.topologyVersion()); + } + finally { + if (top != null) + top.readUnlock(); + } } ctx.tm().txContext(tx); @@ -947,6 +1007,42 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** + * @param nearNode Client node. + * @param req Request. + * @param topVer Remap version. + * @return Response. + */ + private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode, + GridNearLockRequest req, + AffinityTopologyVersion topVer) { + assert topVer != null; + + GridNearLockResponse res = new GridNearLockResponse( + ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + 0, + null, + topVer); + + try { + ctx.io().send(nearNode, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send client lock remap response, client node failed " + + "[node=" + nearNode + ", req=" + req + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send client lock remap response [node=" + nearNode + ", req=" + req + ']', e); + } + + return res; + } + + /** * @param nearNode Near node. * @param entries Entries. * @param req Lock request. @@ -968,7 +1064,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Send reply back to originating near node. GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(), - req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err); + req.version(), + req.futureId(), + req.miniId(), + tx != null && tx.onePhaseCommit(), + entries.size(), + err, + null); if (err == null) { res.pending(localDhtPendingVersions(entries, mappedVer)); @@ -1077,8 +1179,14 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.error(log, "Failed to get value for lock reply message for node [node=" + U.toShortString(nearNode) + ", req=" + req + ']', e); - return new GridNearLockResponse(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, - entries.size(), e); + return new GridNearLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + entries.size(), + e, + null); } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 54b59b8..90edb0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -52,15 +52,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map<UUID, GridDistributedTxMapping> nearMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>(); /** DHT mappings. */ - protected Map<UUID, GridDistributedTxMapping> dhtMap = - new ConcurrentHashMap8<>(); + protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ - private AtomicBoolean mapped = new AtomicBoolean(); + protected AtomicBoolean mapped = new AtomicBoolean(); /** */ private long dhtThreadId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 293cf95..af0fbdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -582,7 +582,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.writeVersion(), tx.invalidPartitions(), ret, - prepErr); + prepErr, + null); if (prepErr == null) { addDhtValues(res); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8bbfe96..8630421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -171,7 +171,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; - preldr = new GridDhtPreloader<>(ctx); + preldr = new GridDhtPreloader(ctx); preldr.start(); @@ -737,6 +737,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final CacheEntryPredicate[] filter, final boolean waitTopFut ) { + assert ctx.updatesAllowed(); + if (map != null && keyCheck) validateCacheKeys(map.keySet()); @@ -793,6 +795,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean rawRetval, @Nullable final CacheEntryPredicate[] filter ) { + assert ctx.updatesAllowed(); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -1024,9 +1028,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteCacheExpiryPolicy expiry = null; try { - // If batch store update is enabled, we need to lock all entries. - // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion()); + List<GridDhtCacheEntry> locked = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; try { @@ -1043,11 +1045,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } // Do not check topology version for CLOCK versioning since - // partition exchange will wait for near update future. + // partition exchange will wait for near update future (if future is on server node). // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. - if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() || - ctx.config().getAtomicWriteOrderMode() == CLOCK) { + if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || + !needRemap(req.topologyVersion(), topology().topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1056,13 +1058,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } + // If batch store update is enabled, we need to lock all entries. + // First, need to acquire locks on cache entries, then check filter. + locked = lockEntries(keys, req.topologyVersion()); + boolean hasNear = ctx.discovery().cacheNearNode(node, name()); GridCacheVersion ver = req.updateVersion(); if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(req.topologyVersion()); + ver = ctx.versions().next(topology().topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1105,7 +1111,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retVal = updRes.invokeResults(); } else { - UpdateSingleResult<K, V> updRes = updateSingle(node, + UpdateSingleResult updRes = updateSingle(node, hasNear, req, res, @@ -1144,7 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { e.printStackTrace(); } finally { - unlockEntries(locked, req.topologyVersion()); + if (locked != null) + unlockEntries(locked, req.topologyVersion()); // Enqueue if necessary after locks release. if (deleted != null) { @@ -1157,7 +1164,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (GridDhtInvalidPartitionException ignore) { - assert ctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert !req.fastMap() || req.clientRequest() : req; if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); @@ -1605,7 +1612,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ - private UpdateSingleResult<K, V> updateSingle( + private UpdateSingleResult updateSingle( ClusterNode node, boolean hasNear, GridNearAtomicUpdateRequest req, @@ -1799,7 +1806,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - return new UpdateSingleResult<>(retVal, deleted, dhtFut); + return new UpdateSingleResult(retVal, deleted, dhtFut); } /** @@ -2572,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Result of {@link GridDhtAtomicCache#updateSingle} execution. */ - private static class UpdateSingleResult<K, V> { + private static class UpdateSingleResult { /** */ private final GridCacheReturn retVal; @@ -2772,14 +2779,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void onTimeout() { if (guard.compareAndSet(false, true)) { - writeLock().lock(); + ctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + writeLock().lock(); - try { - finish(); - } - finally { - writeLock().unlock(); - } + try { + finish(); + } + finally { + writeLock().unlock(); + } + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 40ab104..ff8454e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private Collection<KeyCacheObject> keys; + /** */ + private boolean waitForExchange; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); keys = new ArrayList<>(updateReq.keys().size()); + + boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); + + waitForExchange = !topLocked; } /** {@inheritDoc} */ @@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean waitForPartitionExchange() { - // Wait dht update futures in PRIMARY mode. - return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY; + return waitForExchange; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 76e05e5..07f5ecf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -128,6 +128,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Fast map flag. */ private final boolean fastMap; + /** */ + private boolean fastMapRemap; + + /** */ + private GridCacheVersion updVer; + /** Near cache flag. */ private final boolean nearEnabled; @@ -304,11 +310,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); if (topVer == null) - mapOnTopology(keys, false, null, waitTopFut); + mapOnTopology(null, false, null, waitTopFut); else { topLocked = true; - map0(topVer, keys, false, null); + map0(topVer, null, false, null); } } @@ -343,9 +349,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { - assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert !fastMap || cctx.kernalContext().clientNode(); + + Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); - mapOnTopology(res.remapKeys(), true, nodeId, true); + mapOnTopology(remapKeys, true, nodeId, true); return; } @@ -454,9 +462,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else { if (waitTopFut) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override - public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(keys, remap, oldNodeId, waitTopFut); + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(keys, remap, oldNodeId, waitTopFut); + } + }); } }); } @@ -476,29 +487,43 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * Checks if future is ready to be completed. */ - private synchronized void checkComplete() { - if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { - CachePartialUpdateCheckedException err0 = err; + private void checkComplete() { + boolean remap = false; - if (err0 != null) - onDone(err0); - else - onDone(opRes); + synchronized (this) { + if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { + CachePartialUpdateCheckedException err0 = err; + + if (err0 != null) + onDone(err0); + else { + if (fastMapRemap) { + assert cctx.kernalContext().clientNode(); + + remap = true; + } + else + onDone(opRes); + } + } } + + if (remap) + mapOnTopology(null, true, null, true); } /** * @param topVer Topology version. - * @param keys Keys to map. + * @param remapKeys Keys to remap or {@code null} to map all keys. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ private void map0( AffinityTopologyVersion topVer, - Collection<?> keys, + @Nullable Collection<?> remapKeys, boolean remap, @Nullable UUID oldNodeId) { - assert oldNodeId == null || remap; + assert oldNodeId == null || remap || fastMapRemap; Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); @@ -519,12 +544,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> CacheConfiguration ccfg = cctx.config(); // Assign version on near node in CLOCK ordering mode even if fastMap is false. - GridCacheVersion updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + if (updVer == null) + updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; if (updVer != null && log.isDebugEnabled()) log.debug("Assigned fast-map version for update on near node: " + updVer); if (keys.size() == 1 && !fastMap && (single == null || single)) { + assert remapKeys == null || remapKeys.size() == 1 : remapKeys; + Object key = F.first(keys); Object val; @@ -610,7 +638,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); req.addUpdateEntry(cacheKey, val, @@ -647,9 +676,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (remap) + if (oldNodeId != null) removeMapping(oldNodeId); + // For fastMap mode wait for all responses before remapping. + if (remap && fastMap && !mappings.isEmpty()) { + fastMapRemap = true; + + return; + } + // Create mappings first, then send messages. for (Object key : keys) { if (key == null) { @@ -705,6 +741,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; + if (op != TRANSFORM) val = cctx.toCacheObject(val); @@ -748,7 +787,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> filter, subjId, taskNameHash, - skipStore); + skipStore, + cctx.kernalContext().clientNode()); pendingMappings.put(nodeId, mapped); @@ -763,6 +803,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> i++; } } + + fastMapRemap = false; } if ((single == null || single) && pendingMappings.size() == 1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index a96a666..86c5ab8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Skip write-through to a persistent storage. */ private boolean skipStore; + /** */ + private boolean clientReq; + /** * Empty constructor required by {@link Externalizable}. */ @@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param fastMap Fast map scheme flag. * @param updateVer Update version set if fast map is performed. * @param topVer Topology version. + * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. * @param op Cache update operation. * @param retval Return value required flag. @@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. + * @param clientReq Client node request flag. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, - boolean skipStore + boolean skipStore, + boolean clientReq ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.clientReq = clientReq; keys = new ArrayList<>(); } @@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return {@code True} if request sent from client node. + */ + public boolean clientRequest() { + return clientReq; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (writer.state()) { case 3: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) + if (!writer.writeBoolean("clientReq", clientReq)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("conflictTtls", conflictTtls)) + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("conflictTtls", conflictTtls)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 8: - if (!writer.writeBoolean("fastMap", fastMap)) + if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("fastMap", fastMap)) return false; writer.incrementState(); case 10: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("hasPrimary", hasPrimary)) + if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); case 12: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeBoolean("hasPrimary", hasPrimary)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 14: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: - if (!writer.writeBoolean("retval", retval)) + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) return false; writer.incrementState(); case 16: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeBoolean("retval", retval)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 23: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri switch (reader.state()) { case 3: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); + clientReq = reader.readBoolean("clientReq"); if (!reader.isLastRead()) return false; @@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 4: - conflictTtls = reader.readMessage("conflictTtls"); + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) return false; @@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 5: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + conflictTtls = reader.readMessage("conflictTtls"); if (!reader.isLastRead()) return false; @@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 6: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 7: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 8: - fastMap = reader.readBoolean("fastMap"); + expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); if (!reader.isLastRead()) return false; @@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 9: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); + fastMap = reader.readBoolean("fastMap"); if (!reader.isLastRead()) return false; @@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 10: - futVer = reader.readMessage("futVer"); + filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); if (!reader.isLastRead()) return false; @@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 11: - hasPrimary = reader.readBoolean("hasPrimary"); + futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) return false; @@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 12: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + hasPrimary = reader.readBoolean("hasPrimary"); if (!reader.isLastRead()) return false; @@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 13: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; @@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 14: + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: byte opOrd; opOrd = reader.readByte("op"); @@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 15: + case 16: retval = reader.readBoolean("retval"); if (!reader.isLastRead()) @@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 16: + case 17: skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) @@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 17: + case 18: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 23: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 24: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 05b3c7b..221b230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -362,13 +362,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable TransactionIsolation isolation, long accessTtl ) { - assert tx == null || tx instanceof GridNearTxLocal; + assert tx == null || tx instanceof GridNearTxLocal : tx; GridNearTxLocal txx = (GridNearTxLocal)tx; CacheOperationContext opCtx = ctx.operationContextPerCall(); - GridDhtColocatedLockFuture<K, V> fut = new GridDhtColocatedLockFuture<>(ctx, + GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx, keys, txx, isRead, @@ -619,7 +619,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ IgniteInternalFuture<Exception> lockAllAsync( - final GridCacheContext<K, V> cacheCtx, + final GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, final long threadId, final GridCacheVersion ver, @@ -700,7 +700,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @return Lock future. */ private IgniteInternalFuture<Exception> lockAllAsync0( - GridCacheContext<K, V> cacheCtx, + GridCacheContext<?, ?> cacheCtx, @Nullable final GridNearTxLocal tx, long threadId, final GridCacheVersion ver, @@ -715,7 +715,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte int cnt = keys.size(); if (tx == null) { - GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(ctx, + GridDhtLockFuture fut = new GridDhtLockFuture(ctx, ctx.localNodeId(), ver, topVer, @@ -838,7 +838,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert nodeId != null; assert res != null; - GridDhtColocatedLockFuture<K, V> fut = (GridDhtColocatedLockFuture<K, V>)ctx.mvcc(). + GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). <Boolean>future(res.version(), res.futureId()); if (fut != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 372c517..c784948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.*; /** * Colocated cache lock future. */ -public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -59,7 +59,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -121,10 +121,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @param timeout Lock acquisition timeout. * @param accessTtl TTL for read operation. * @param filter Filter. - * @param skipStore + * @param skipStore Skip store flag. */ public GridDhtColocatedLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.colocated().removeLocks(threadId, lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -346,16 +347,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } /** - * - * @param dist {@code True} if need to distribute lock release. - */ - private void onFailed(boolean dist) { - undoLocks(dist); - - complete(false); - } - - /** * @param success Success flag. */ public void complete(boolean success) { @@ -475,7 +466,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -550,7 +541,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -558,14 +549,17 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false, null); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. + * @param c Optional closure to run after map. */ - private void mapOnTopology() { + private void mapOnTopology(final boolean remap, @Nullable final Runnable c) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -589,19 +583,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); + + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); + + this.topVer.compareAndSet(null, topVer); + } - this.topVer.compareAndSet(null, topVer); + map(keys, remap); - map(keys); + if (c != null) + c.run(); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap, c); } }); } @@ -617,8 +622,9 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Collection<KeyCacheObject> keys) { + private void map(Collection<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); @@ -633,8 +639,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + // First assume this node is primary for all keys passed in. - if (mapAsPrimary(keys, topVer)) + if (!clientNode && mapAsPrimary(keys, topVer)) return; Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); @@ -668,6 +678,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity boolean hasRmtNodes = false; + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -736,6 +748,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (cand != null && !cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -757,7 +777,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + clientFirst); mapping.request(req); } @@ -815,7 +836,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (hasRmtNodes) { trackable = true; - if (!cctx.mvcc().addFuture(this)) + if (!remap && !cctx.mvcc().addFuture(this)) throw new IllegalStateException("Duplicate future ID: " + this); } else @@ -1249,75 +1270,111 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - for (KeyCacheObject k : keys) { - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); + for (KeyCacheObject k : keys) { + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k); - CacheObject newVal = res.value(i); + CacheObject newVal = res.value(i); - GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion dhtVer = res.dhtVersion(i); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + } } - } - if (inTx()) { - IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + if (inTx()) { + IgniteTxEntry txEntry = tx.entry(cctx.txKey(k)); + + // In colocated cache we must receive responses only for detached entries. + assert txEntry.cached().detached() : txEntry; - // In colocated cache we must receive responses only for detached entries. - assert txEntry.cached().detached(); + txEntry.markLocked(); - txEntry.markLocked(); + GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); + + return; + } - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); - return; + tx.hasRemoteLocks(true); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + } + else + cctx.mvcc().markExplicitOwner(k, threadId); + + if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + cctx.events().addEvent(cctx.affinity().partition(k), + k, + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + null, + false, + CU.subjectId(tx, cctx.shared()), + null, + tx == null ? null : tx.resolveTaskName()); } - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); + i++; + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + try { + proceedMapping(mappings); } - else - cctx.mvcc().markExplicitOwner(k, threadId); - - if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - cctx.events().addEvent(cctx.affinity().partition(k), - k, - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - null, - false, - CU.subjectId(tx, cctx.shared()), - null, - tx == null ? null : tx.resolveTaskName()); + catch (IgniteCheckedException e) { + onDone(e); } - i++; + onDone(true); } + } + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); - } + /** + * + */ + private void remap() { + undoLocks(false, false); - onDone(true); - } + for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys) + cctx.mvcc().removeExplicitLock(threadId, key, lockVer); + + mapOnTopology(true, new Runnable() { + @Override public void run() { + onDone(true); + } + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 78966d0..1d57ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec private IgniteUuid futId = IgniteUuid.randomUuid(); /** Preloader. */ - private GridDhtPreloader<K, V> preloader; + private GridDhtPreloader preloader; /** Trackable flag. */ private boolean trackable; @@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec GridCacheContext<K, V> cctx, AffinityTopologyVersion topVer, Collection<KeyCacheObject> keys, - GridDhtPreloader<K, V> preloader + GridDhtPreloader preloader ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; @@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @return {@code True} if some mapping was added. */ private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) { - Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>(); - - ClusterNode loc = cctx.localNode(); - - int curTopVer = topCntr.get(); + Map<ClusterNode, Set<KeyCacheObject>> mappings = null; for (KeyCacheObject key : keys) - map(key, mappings, exc); + mappings = map(key, mappings, exc); if (isDone()) return false; boolean ret = false; - if (!mappings.isEmpty()) { + if (mappings != null) { + ClusterNode loc = cctx.localNode(); + + int curTopVer = topCntr.get(); + preloader.addFuture(this); trackable = true; @@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param key Key. * @param exc Exclude nodes. * @param mappings Mappings. + * @return Mappings. */ - private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) { + private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key, + @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings, + Collection<ClusterNode> exc) + { ClusterNode loc = cctx.localNode(); - int part = cctx.affinity().partition(key); - GridCacheEntryEx e = cctx.dht().peekEx(key); try { if (e != null && !e.isNewLocked()) { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { + int part = cctx.affinity().partition(key); + log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); + } // Key has been rebalanced or retrieved already. - return; + return mappings; } } catch (GridCacheEntryRemovedException ignore) { @@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec ", locId=" + cctx.nodeId() + ']'); } + int part = cctx.affinity().partition(key); + List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) : new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc))); @@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']'); // Key is already rebalanced. - return; + return mappings; } // Create partition. @@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); - return; + return mappings; } + if (mappings == null) + mappings = U.newHashMap(keys.size()); + Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet()); assert mappedKeys != null; @@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']'); } + + return mappings; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 633f237..a6e6c4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*; * and populating local cache. */ @SuppressWarnings("NonConstantFieldWithUpperCaseName") -public class GridDhtPartitionDemandPool<K, V> { +public class GridDhtPartitionDemandPool { /** Dummy message to wake up a blocking queue if a node leaves. */ private final SupplyMessage DUMMY_TOP = new SupplyMessage(); /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> { log = cctx.logger(getClass()); - poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - if (poolSize > 0) { + poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0; + + if (enabled) { barrier = new CyclicBarrier(poolSize); dmdWorkers = new ArrayList<>(poolSize); @@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ - void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) { + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> { private int id; /** Partition-to-node assignments. */ - private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); /** Message queue. */ private final LinkedBlockingDeque<SupplyMessage> msgQ = @@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> { /** * @param assigns Assignments. */ - void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) { + void addAssignments(GridDhtPreloaderAssignments assigns) { assert assigns != null; assignQ.offer(assigns); @@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> { } // Sync up all demand threads at this step. - GridDhtPreloaderAssignments<K, V> assigns = null; + GridDhtPreloaderAssignments assigns = null; while (assigns == null) assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); @@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> { * @param exchFut Exchange future. * @return Assignments of partitions to nodes. */ - GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = cctx.dht().topology(); if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); int partCnt = cctx.affinity().partitions(); @@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> { "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; - GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index facf7e3..faa6cf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext * @return Full string representation. */ public String toFullString() { - return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString()); + return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 5d9677d..13cfef3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Thread pool for supplying partitions to demanding nodes. */ -class GridDhtPartitionSupplyPool<K, V> { +class GridDhtPartitionSupplyPool { /** */ - private final GridCacheContext<K, V> cctx; + private final GridCacheContext<?, ?> cctx; /** */ private final IgniteLogger log; @@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) { + GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> { top = cctx.dht().topology(); - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + if (!cctx.kernalContext().clientNode()) { + int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); + for (int i = 0; i < poolSize; i++) + workers.add(new SupplyWorker()); - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processDemandMessage(id, m); + } + }); + } depEnabled = cctx.gridDeploy().enabled(); } @@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> { boolean ack = false; try { - // Partition map exchange is finished which means that all near transactions with given - // topology version are committed. We can wait for local locks here as it will not take - // much time. - cctx.mvcc().finishLocks(d.topologyVersion()).get(); - for (int part : d.partitions()) { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
