Repository: ignite Updated Branches: refs/heads/ignite-1.5 15877a8ba -> 07f5a62ec
http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 76f2fbe..6f92204 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -57,17 +58,14 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; @@ -109,7 +107,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean private boolean retval; /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); + private volatile Throwable err; /** Timed out flag. */ private volatile boolean timedOut; @@ -129,8 +127,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean private GridNearTxLocal tx; /** Topology snapshot to operate on. */ - private AtomicReference<AffinityTopologyVersion> topVer = - new AtomicReference<>(); + private volatile AffinityTopologyVersion topVer; /** Map of current values. */ private Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap; @@ -138,9 +135,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** Trackable flag. */ private boolean trackable = true; - /** Mutex. */ - private final Object mux = new Object(); - /** Keys locked so far. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @GridToStringExclude @@ -152,6 +146,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** Skip store flag. */ private final boolean skipStore; + /** Mappings to proceed. */ + @GridToStringExclude + private Queue<GridNearLockMapping> mappings; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -206,7 +204,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean cctx.time().addTimeoutObject(timeoutObj); } - valMap = new ConcurrentHashMap8<>(keys.size(), 1f); + valMap = new ConcurrentHashMap8<>(); } /** {@inheritDoc} */ @@ -217,10 +215,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** * @return Entries. */ - public List<GridDistributedCacheEntry> entriesCopy() { - synchronized (mux) { - return new ArrayList<>(entries); - } + public synchronized List<GridDistributedCacheEntry> entriesCopy() { + return new ArrayList<>(entries); } /** @@ -313,6 +309,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean GridNearCacheEntry entry, UUID dhtNodeId ) throws GridCacheEntryRemovedException { + assert Thread.holdsLock(this); + // Check if lock acquisition is timed out. if (timedOut) return null; @@ -335,9 +333,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean txEntry.cached(entry); } - synchronized (mux) { - entries.add(entry); - } + entries.add(entry); if (c == null && timeout < 0) { if (log.isDebugEnabled()) @@ -525,7 +521,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @param t Error. */ private void onError(Throwable t) { - err.compareAndSet(null, t instanceof GridCacheLockTimeoutException ? null : t); + synchronized (this) { + if (err == null) + err = t; + } } /** @@ -572,35 +571,39 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean */ private boolean checkLocks() { if (!isDone() && initialized() && !hasPending()) { - for (int i = 0; i < entries.size(); i++) { - while (true) { - GridCacheEntryEx cached = entries.get(i); + synchronized (this) { + for (int i = 0; i < entries.size(); i++) { + while (true) { + GridCacheEntryEx cached = entries.get(i); - try { - if (!locked(cached)) { - if (log.isDebugEnabled()) - log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" + - cached + ", fut=" + this + ']'); + try { + if (!locked(cached)) { + if (log.isDebugEnabled()) + log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" + + cached + ", fut=" + this + ']'); - return false; - } + return false; + } - break; - } - // Possible in concurrent cases, when owner is changed after locks - // have been released or cancelled. - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached); + break; + } + // Possible in concurrent cases, when owner is changed after locks + // have been released or cancelled. + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key())); + // Replace old entry with new one. + entries.set( + i, + (GridDistributedCacheEntry)cctx.cache().entryEx(cached.key())); + } } } - } - if (log.isDebugEnabled()) - log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]"); + if (log.isDebugEnabled()) + log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]"); + } onComplete(true, true); @@ -627,7 +630,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (isDone() || (err == null && success && !checkLocks())) return false; - this.err.compareAndSet(null, err instanceof GridCacheLockTimeoutException ? null : err); + if (err != null && !(err instanceof GridCacheLockTimeoutException)) + onError(err); if (err != null) success = false; @@ -653,7 +657,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (tx != null) cctx.tm().txContext(tx); - if (super.onDone(success, err.get())) { + if (super.onDone(success, err)) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -730,7 +734,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } // Continue mapping on the same topology version as it was before. - this.topVer.compareAndSet(null, topVer); + if (this.topVer == null) + this.topVer = topVer; map(keys, false, true); @@ -749,7 +754,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * * @param remap Remap flag. */ - void mapOnTopology(final boolean remap) { + synchronized void mapOnTopology(final boolean remap) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -778,13 +783,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (tx != null) tx.onRemap(topVer); - this.topVer.set(topVer); + this.topVer = topVer; } else { if (tx != null) tx.topologyVersion(topVer); - this.topVer.compareAndSet(null, topVer); + if (this.topVer == null) + this.topVer = topVer; } map(keys, remap, false); @@ -825,7 +831,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean */ private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) { try { - AffinityTopologyVersion topVer = this.topVer.get(); + AffinityTopologyVersion topVer = this.topVer; assert topVer != null; @@ -842,204 +848,227 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); - ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); + synchronized (this) { + mappings = new ArrayDeque<>(); - // Assign keys to primary nodes. - GridNearLockMapping map = null; + // Assign keys to primary nodes. + GridNearLockMapping map = null; - for (KeyCacheObject key : keys) { - GridNearLockMapping updated = map(key, map, topVer); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map( + key, + map, + topVer); - // If new mapping was created, add to collection. - if (updated != map) { - mappings.add(updated); + // If new mapping was created, add to collection. + if (updated != map) { + mappings.add(updated); - if (tx != null && updated.node().isLocal()) - tx.nearLocallyMapped(true); + if (tx != null && updated.node().isLocal()) + tx.nearLocallyMapped(true); + } + + map = updated; } - map = updated; - } + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); + return; + } - return; - } + if (log.isDebugEnabled()) + log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - if (log.isDebugEnabled()) - log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean first = true; - boolean first = true; + // Create mini futures. + for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); - // Create mini futures. - for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping mapping = iter.next(); + ClusterNode node = mapping.node(); + Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); - ClusterNode node = mapping.node(); - Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys(); + assert !mappedKeys.isEmpty(); - assert !mappedKeys.isEmpty(); + GridNearLockRequest req = null; - GridNearLockRequest req = null; + Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); - Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size()); + boolean explicit = false; - boolean explicit = false; + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); - for (KeyCacheObject key : mappedKeys) { - IgniteTxKey txKey = cctx.txKey(key); + while (true) { + GridNearCacheEntry entry = null; - while (true) { - GridNearCacheEntry entry = null; + try { + entry = cctx.near().entryExx( + key, + topVer); - try { - entry = cctx.near().entryExx(key, topVer); + if (!cctx.isAll( + entry, + filter)) { + if (log.isDebugEnabled()) + log.debug("Entry being locked did not pass filter (will not lock): " + entry); - if (!cctx.isAll(entry, filter)) { - if (log.isDebugEnabled()) - log.debug("Entry being locked did not pass filter (will not lock): " + entry); + onComplete( + false, + false); - onComplete(false, false); + return; + } - return; - } + // Removed exception may be thrown here. + GridCacheMvccCandidate cand = addEntry( + topVer, + entry, + node.id()); - // Removed exception may be thrown here. - GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id()); + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done after addEntry attempt " + + "[fut=" + this + ", entry=" + entry + ']'); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done after addEntry attempt " + - "[fut=" + this + ", entry=" + entry + ']'); + return; + } - return; - } + if (cand != null) { + if (tx == null && !cand.reentry()) + cctx.mvcc().addExplicitLock( + threadId, + cand, + topVer); - if (cand != null) { - if (tx == null && !cand.reentry()) - cctx.mvcc().addExplicitLock(threadId, cand, topVer); + IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue(); - IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue(); + if (val == null) { + GridDhtCacheEntry dhtEntry = dht().peekExx(key); - if (val == null) { - GridDhtCacheEntry dhtEntry = dht().peekExx(key); + try { + if (dhtEntry != null) + val = dhtEntry.versionedValue(topVer); + } + catch (GridCacheEntryRemovedException ignored) { + assert dhtEntry.obsolete() : dhtEntry; - try { - if (dhtEntry != null) - val = dhtEntry.versionedValue(topVer); + if (log.isDebugEnabled()) + log.debug("Got removed exception for DHT entry in map (will ignore): " + + dhtEntry); + } } - catch (GridCacheEntryRemovedException ignored) { - assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: " - + dhtEntry; - if (log.isDebugEnabled()) - log.debug("Got removed exception for DHT entry in map (will ignore): " - + dhtEntry); - } - } + GridCacheVersion dhtVer = null; - GridCacheVersion dhtVer = null; + if (val != null) { + dhtVer = val.get1(); - if (val != null) { - dhtVer = val.get1(); + valMap.put( + key, + val); + } - valMap.put(key, val); - } + if (!cand.reentry()) { + if (req == null) { + boolean clientFirst = false; - if (!cand.reentry()) { - if (req == null) { - boolean clientFirst = false; + if (first) { + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); - if (first) { - clientFirst = clientNode && - !topLocked && - (tx == null || !tx.hasRemoteLocks()); + first = false; + } - first = false; + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncCommit(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L, + skipStore, + clientFirst, + cctx.deploymentEnabled()); + + mapping.request(req); } - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - implicitTx(), - implicitSingleTx(), - read, - retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncCommit(), - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L, - skipStore, - clientFirst, - cctx.deploymentEnabled()); - - mapping.request(req); - } + distributedKeys.add(key); - distributedKeys.add(key); + if (tx != null) + tx.addKeyMapping( + txKey, + mapping.node()); - if (tx != null) - tx.addKeyMapping(txKey, mapping.node()); + req.addKeyBytes( + key, + retval && dhtVer == null, + dhtVer, + // Include DHT version to match remote DHT entry. + cctx); + } - req.addKeyBytes( - key, - retval && dhtVer == null, - dhtVer, // Include DHT version to match remote DHT entry. - cctx); + if (cand.reentry()) + explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); } - - if (cand.reentry()) + else + // Ignore reentries within transactions. explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); - } - else - // Ignore reentries within transactions. - explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); - if (explicit) - tx.addKeyMapping(txKey, mapping.node()); + if (explicit) + tx.addKeyMapping( + txKey, + mapping.node()); - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry; + break; + } + catch (GridCacheEntryRemovedException ignored) { + assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry; - if (log.isDebugEnabled()) - log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); + if (log.isDebugEnabled()) + log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); + } } - } - // Mark mapping explicit lock flag. - if (explicit) { - boolean marked = tx != null && tx.markExplicit(node.id()); + // Mark mapping explicit lock flag. + if (explicit) { + boolean marked = tx != null && tx.markExplicit(node.id()); - assert tx == null || marked; + assert tx == null || marked; + } } - } - if (!distributedKeys.isEmpty()) - mapping.distributedKeys(distributedKeys); - else { - assert mapping.request() == null; + if (!distributedKeys.isEmpty()) + mapping.distributedKeys(distributedKeys); + else { + assert mapping.request() == null; - iter.remove(); + iter.remove(); + } } } cctx.mvcc().recheckPendingLocks(); - proceedMapping(mappings); + proceedMapping(); } catch (IgniteCheckedException ex) { onError(ex); @@ -1050,13 +1079,16 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to * remote primary node. * - * @param mappings Queue of mappings. * @throws IgniteCheckedException If mapping can not be completed. */ @SuppressWarnings("unchecked") - private void proceedMapping(final ConcurrentLinkedDeque8<GridNearLockMapping> mappings) + private void proceedMapping() throws IgniteCheckedException { - GridNearLockMapping map = mappings.poll(); + GridNearLockMapping map; + + synchronized (this) { + map = mappings.poll(); + } // If there are no more mappings to process, complete the future. if (map == null) @@ -1139,7 +1171,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean // Lock is held at this point, so we can set the // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get()); + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), res.pending()); @@ -1181,9 +1213,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean log.debug("Failed to add candidates because entry was " + "removed (will renew)."); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry) - cctx.cache().entryEx(entry.key())); + synchronized (GridNearLockFuture.this) { + // Replace old entry with new one. + entries.set(i, + (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + } } } @@ -1191,7 +1225,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } // Proceed and add new future (if any) before completing embedded future. - proceedMapping(mappings); + proceedMapping(); } catch (IgniteCheckedException ex) { onError(ex); @@ -1205,7 +1239,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean fut)); } else { - final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); + final MiniFuture fut = new MiniFuture(node, mappedKeys); req.miniId(fut.futureId()); @@ -1302,7 +1336,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); - topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer)); return topEx; } @@ -1354,23 +1388,19 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean @GridToStringInclude private Collection<KeyCacheObject> keys; - /** Mappings to proceed. */ - @GridToStringExclude - private ConcurrentLinkedDeque8<GridNearLockMapping> mappings; - /** */ - private AtomicBoolean rcvRes = new AtomicBoolean(false); + private boolean rcvRes; /** * @param node Node. * @param keys Keys. - * @param mappings Mappings to proceed. */ - MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, - ConcurrentLinkedDeque8<GridNearLockMapping> mappings) { + MiniFuture( + ClusterNode node, + Collection<KeyCacheObject> keys + ) { this.node = node; this.keys = keys; - this.mappings = mappings; } /** @@ -1395,197 +1425,194 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** - * @param e Error. - */ - void onResult(Throwable e) { - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); - } - else - U.warn(log, "Received error after another result has been processed [fut=" + GridNearLockFuture.this + - ", mini=" + this + ']', e); - } - - /** * @param e Node left exception. */ void onResult(ClusterTopologyCheckedException e) { if (isDone()) return; - if (rcvRes.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); + synchronized (this) { + if (!rcvRes) + rcvRes = true; + else + return; + } - if (tx != null) - tx.removeMapping(node.id()); + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); - // Primary node left the grid, so fail the future. - GridNearLockFuture.this.onDone(newTopologyException(e, node.id())); + if (tx != null) + tx.removeMapping(node.id()); - onDone(true); - } + // Primary node left the grid, so fail the future. + GridNearLockFuture.this.onDone(newTopologyException(e, node.id())); + + onDone(true); } /** * @param res Result callback. */ void onResult(GridNearLockResponse res) { - if (rcvRes.compareAndSet(false, true)) { - if (res.error() != null) { - if (log.isDebugEnabled()) - log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + - ", res=" + res + ']'); + synchronized (this) { + if (!rcvRes) + rcvRes = true; + else + return; + } - // Fail. - if (res.error() instanceof GridCacheLockTimeoutException) - onDone(false); - else - onDone(res.error()); + if (res.error() != null) { + if (log.isDebugEnabled()) + log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + + ", res=" + res + ']'); - return; - } + // Fail. + if (res.error() instanceof GridCacheLockTimeoutException) + onDone(false); + else + onDone(res.error()); - if (res.clientRemapVersion() != null) { - assert cctx.kernalContext().clientNode(); + return; + } - IgniteInternalFuture<?> affFut = - cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - if (affFut != null && !affFut.isDone()) { - affFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - try { - fut.get(); + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - remap(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.shared().txContextReset(); - } + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + remap(); } - }); - } - else - remap(); + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.shared().txContextReset(); + } + } + }); } - else { - int i = 0; + else + remap(); + } + else { + int i = 0; - AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); + AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer; - for (KeyCacheObject k : keys) { - while (true) { - GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); + for (KeyCacheObject k : keys) { + while (true) { + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - return; - } + return; + } - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - boolean readRecordable = false; + boolean readRecordable = false; - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + if (readRecordable) + hasOldVal = entry.hasValue(); + } - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); - oldVal = oldValTup.get2(); - } + oldVal = oldValTup.get2(); } + } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx()) { - tx.hasRemoteLocks(true); + if (inTx()) { + tx.hasRemoteLocks(true); - if (implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); } + } - entry.readyNearLock(lockVer, - mappedVer, - res.committedVersions(), - res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, + inTx() ? tx.resolveTaskName() : null); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + synchronized (GridNearLockFuture.this) { // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + entries.set(i, + (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); } } - - i++; } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); - } + i++; + } - onDone(true); + try { + proceedMapping(); + } + catch (IgniteCheckedException e) { + onDone(e); } + + onDone(true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6d0ffe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java index b4f689c..6c8e388 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockMapping.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -35,7 +35,7 @@ public class GridNearLockMapping { /** Collection of mapped keys. */ @GridToStringInclude - private Collection<KeyCacheObject> mappedKeys = new LinkedList<>(); + private final Collection<KeyCacheObject> mappedKeys = new ArrayList<>(); /** Near lock request. */ @GridToStringExclude @@ -115,4 +115,4 @@ public class GridNearLockMapping { public String toString() { return S.toString(GridNearLockMapping.class, this); } -} \ No newline at end of file +}
