http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ef8150c,c20ed48..bc1c584 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -38,7 -38,7 +38,8 @@@ import org.apache.ignite.cluster.Cluste import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.pagemem.wal.StorageException; + import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; @@@ -2699,24 -2600,12 +2618,15 @@@ public class GridDhtAtomicCache<K, V> e GridCacheOperation op; if (putMap != null) { - // If fast mapping, filter primary keys for write to store. - Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? - F.view(putMap, new P1<CacheObject>() { - @Override public boolean apply(CacheObject key) { - return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion()); - } - }) : - putMap; - try { - Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(storeMap, - ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { - @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { - return F.t(v, ver); - } - })); ++ Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap, + new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() { + @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) { + return F.t(val, ver); + } + }); + + ctx.store().putAll(null, view); } catch (CacheStorePartialUpdateException e) { storeErr = e; @@@ -3212,11 -3028,10 +3054,10 @@@ * @param nodeId Sender node ID. * @param res Near atomic update response. */ - @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { if (msgLog.isDebugEnabled()) - msgLog.debug("Received near atomic update response " + - "[futId=" + res.futureVersion() + - msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']'); ++ msgLog.debug("Received near atomic update response [futId" + res.futureId() + + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); @@@ -3254,126 -3100,209 +3126,235 @@@ String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); + ctx.shared().database().checkpointReadLock(); - try { - while (true) { - GridDhtCacheEntry entry = null; + try { + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); - try { - entry = entryExx(key); + try { + while (true) { + GridDhtCacheEntry entry = null; - CacheObject val = req.value(i); - CacheObject prevVal = req.previousValue(i); + try { + entry = entryExx(key); - EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); - Long updateIdx = req.updateCounter(i); + CacheObject val = req.value(i); + CacheObject prevVal = req.previousValue(i); - GridCacheOperation op = entryProcessor != null ? TRANSFORM : - (val != null) ? UPDATE : DELETE; + EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); + Long updateIdx = req.updateCounter(i); - long ttl = req.ttl(i); - long expireTime = req.conflictExpireTime(i); + GridCacheOperation op = entryProcessor != null ? TRANSFORM : + (val != null) ? UPDATE : DELETE; - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( - ver, - nodeId, - nodeId, - op, - op == TRANSFORM ? entryProcessor : val, - op == TRANSFORM ? req.invokeArguments() : null, - /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) - && writeThrough() && !req.skipStore(), - /*read-through*/false, - /*retval*/false, - req.keepBinary(), - /*expiry policy*/null, - /*event*/true, - /*metrics*/true, - /*primary*/false, - /*check version*/!req.forceTransformBackups(), - req.topologyVersion(), - CU.empty0(), - replicate ? DR_BACKUP : DR_NONE, - ttl, - expireTime, - req.conflictVersion(i), - false, - intercept, - req.subjectId(), - taskName, - prevVal, - updateIdx, - null); + long ttl = req.ttl(i); + long expireTime = req.conflictExpireTime(i); + + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + ver, + nodeId, + nodeId, + op, + op == TRANSFORM ? entryProcessor : val, + op == TRANSFORM ? req.invokeArguments() : null, + /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) + && writeThrough() && !req.skipStore(), + /*read-through*/false, + /*retval*/false, + req.keepBinary(), + /*expiry policy*/null, + /*event*/true, + /*metrics*/true, + /*primary*/false, + /*check version*/!req.forceTransformBackups(), + req.topologyVersion(), + CU.empty0(), + replicate ? DR_BACKUP : DR_NONE, + ttl, + expireTime, + req.conflictVersion(i), + false, + intercept, + req.subjectId(), + taskName, + prevVal, + updateIdx, + null); - if (updRes.removeVersion() != null) - ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.removeVersion() != null) + ctx.onDeferredDelete(entry, updRes.removeVersion()); - entry.onUnlock(); + entry.onUnlock(); - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry while updating backup value (will retry): " + key); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while updating backup value (will retry): " + key); - entry = null; - } - finally { - if (entry != null) - ctx.evicts().touch(entry, req.topologyVersion()); - } + entry = null; + } + finally { + if (entry != null) + ctx.evicts().touch(entry, req.topologyVersion()); } } - catch (GridDhtInvalidPartitionException ignored) { - // Ignore. - } - catch (IgniteCheckedException e) { - res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); - } + } + catch (GridDhtInvalidPartitionException ignored) { + // Ignore. + } + catch (IgniteCheckedException e) { - IgniteCheckedException err = - new IgniteCheckedException("Failed to update key on backup node: " + key, e); ++ IgniteCheckedException err = new IgniteCheckedException("Failed to update key on backup node: " + key, e); + + if (nearRes != null) + nearRes.addFailedKey(key, err); + - U.error(log, "Failed to update key on backup node: " + key, e); ++ U.error(log, "Failed to update key on backup node: " + key, e);} } } + finally { + ctx.shared().database().checkpointReadUnlock(); + } - if (isNearEnabled(cacheCfg)) - ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); + GridDhtAtomicUpdateResponse dhtRes = null; + + if (isNearEnabled(cacheCfg)) { + List<KeyCacheObject> nearEvicted = + ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); + + if (nearEvicted != null) { + dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), + req.partition(), + req.futureId(), + ctx.deploymentEnabled()); + + dhtRes.nearEvicted(nearEvicted); + } + } + try { + // TODO handle failure: probably drop the node from topology + // TODO fire events only after successful fsync + if (ctx.shared().wal() != null) + ctx.shared().wal().fsync(null); + } + catch (StorageException e) { - res.onError(new IgniteCheckedException(e)); ++ if (dhtRes != null) ++ dhtRes.onError(new IgniteCheckedException(e)); ++ ++ if (nearRes != null) ++ nearRes.onClassError(e); + } + catch (IgniteCheckedException e) { - res.onError(e); ++ if (dhtRes != null) ++ dhtRes.onError(e); ++ ++ if (nearRes != null) ++ nearRes.onClassError(e); ++ } ++ + if (nearRes != null) + sendDhtNearResponse(req, nearRes); + + if (dhtRes == null && req.replyWithoutDelay()) { + dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), + req.partition(), + req.futureId(), + ctx.deploymentEnabled()); } + if (dhtRes != null) + sendDhtPrimaryResponse(nodeId, req, dhtRes); + else + sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId()); + } + + /** + * @param nodeId Primary node ID. + * @param req Request. + * @param dhtRes Response to send. + */ + private void sendDhtPrimaryResponse(UUID nodeId, + GridDhtAtomicAbstractUpdateRequest req, + GridDhtAtomicUpdateResponse dhtRes) { try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + ctx.io().send(nodeId, dhtRes, ctx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", writeVer=" + req.writeVersion() + + ", node=" + nodeId + ']'); } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() + - ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); - } + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + + ", nearFutId=" + req.nearFutureId() + + ", node=" + nodeId + + ", res=" + dhtRes + ']', e); + } + } + + /** + * @param part Partition. + * @param primaryId Primary ID. + * @param futId Future ID. + */ + private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) { + Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get(); + + GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId); + + if (msg == null) { + msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), + new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE)); + + if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) { + GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId); + + msg.timeoutSender(timeoutSnd); + + ctx.time().addTimeoutObject(timeoutSnd); + } + + resMap.put(primaryId, msg); + } + + GridLongList futIds = msg.futureIds(); + + assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size(); + + futIds.add(futId); - // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. - sendDeferredUpdateResponse(nodeId, req.futureVersion()); + if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) { + resMap.remove(primaryId); + + sendDeferredUpdateResponse(primaryId, msg); + } + } + + /** + * @param primaryId Primary ID. + * @param msg Message. + */ + private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) { + try { + GridTimeoutObject timeoutSnd = msg.timeoutSender(); + + if (timeoutSnd != null) + ctx.time().removeTimeoutObject(timeoutSnd); + + ctx.io().send(primaryId, msg, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() + + ", node=" + primaryId + ']'); } } catch (ClusterTopologyCheckedException ignored) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java index 9160865,6811236..9887f55 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java @@@ -140,93 -73,25 +73,27 @@@ public abstract class GridNearAtomicAbs boolean retval, @Nullable UUID subjId, int taskNameHash, + boolean mappingKnown, boolean skipStore, boolean keepBinary, + boolean recovery, - boolean clientReq, boolean addDepInfo ) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.updateVer = updateVer; - this.topVer = topVer; - this.syncMode = syncMode; - this.op = op; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.addDepInfo = addDepInfo; - - fastMap(fastMap); - topologyLocked(topLocked); - returnValue(retval); - skipStore(skipStore); - keepBinary(keepBinary); - clientRequest(clientReq); - recovery(recovery); - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Mapped node ID. - */ - @Override public UUID nodeId() { - return nodeId; - } - - /** - * @param nodeId Node ID. - */ - @Override public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** - * @return Subject ID. - */ - @Override public UUID subjectId() { - return subjId; - } - - /** - * @return Task name hash. - */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** - * @return Future version. - */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** - * @return Update version for fast-map request. - */ - @Override public GridCacheVersion updateVersion() { - return updateVer; - } - - /** - * @return Topology version. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Cache write synchronization mode. - */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; + super(cacheId, + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + subjId, + taskNameHash, + mappingKnown, + skipStore, + keepBinary, ++ recovery, + addDepInfo); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index b933186,a43bfb0..4b3ea5bc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@@ -38,15 -48,177 +48,183 @@@ public abstract class GridNearAtomicAbs /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); + /** . */ + private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private static final int TOP_LOCKED_FLAG_MASK = 0x02; + + /** Skip write-through to a persistent storage. */ + private static final int SKIP_STORE_FLAG_MASK = 0x04; + + /** Keep binary flag. */ + private static final int KEEP_BINARY_FLAG_MASK = 0x08; + + /** Return value flag. */ + private static final int RET_VAL_FLAG_MASK = 0x10; + ++ /** Recovery value flag. */ ++ private static final int RECOVERY_FLAG_MASK = 0x20; ++ + /** Target node ID. */ + @GridDirectTransient + protected UUID nodeId; + + /** Future version. */ + protected long futId; + + /** Topology version. */ + protected AffinityTopologyVersion topVer; + + /** Write synchronization mode. */ + protected CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected GridCacheOperation op; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */ + @GridToStringExclude + protected byte flags; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + /** - * @return Mapped node ID. + * */ - public abstract UUID nodeId(); + public GridNearAtomicAbstractUpdateRequest() { + // No-op. + } /** + * Constructor. + * + * @param cacheId Cache ID. * @param nodeId Node ID. + * @param futId Future ID. + * @param topVer Topology version. + * @param topLocked Topology locked flag. + * @param syncMode Synchronization mode. + * @param op Cache update operation. + * @param retval Return value required flag. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param needPrimaryRes {@code True} if near node waits for primary response. + * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. + * @param addDepInfo Deployment info flag. + */ + protected GridNearAtomicAbstractUpdateRequest( + int cacheId, + UUID nodeId, + long futId, + @NotNull AffinityTopologyVersion topVer, + boolean topLocked, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + @Nullable UUID subjId, + int taskNameHash, + boolean needPrimaryRes, + boolean skipStore, + boolean keepBinary, ++ boolean recovery, + boolean addDepInfo + ) { + this.cacheId = cacheId; + this.nodeId = nodeId; + this.futId = futId; + this.topVer = topVer; + this.syncMode = syncMode; + this.op = op; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.addDepInfo = addDepInfo; + + if (needPrimaryRes) + needPrimaryResponse(true); + if (topLocked) + topologyLocked(true); + if (retval) + returnValue(true); + if (skipStore) + skipStore(true); + if (keepBinary) + keepBinary(true); ++ if (recovery) ++ recovery(true); + } + + /** {@inheritDoc} */ + @Override public final AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public final int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public final boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** + * @return {@code True} if near node is able to initialize update mapping locally. + */ + boolean initMappingLocally() { + return !needPrimaryResponse() && fullSync(); + } + + /** + * @return {@code True} if near node waits for primary response. + */ + boolean needPrimaryResponse() { + return isFlag(NEED_PRIMARY_RES_FLAG_MASK); + } + + /** + * @param needRes {@code True} if near node waits for primary response. + */ + void needPrimaryResponse(boolean needRes) { + setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK); + } + + /** + * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode. + */ + boolean fullSync() { + assert syncMode != null; + + return syncMode == CacheWriteSynchronizationMode.FULL_SYNC; + } + + /** + * @return Task name hash code. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Update opreation. */ - public abstract void nodeId(UUID nodeId); + public GridCacheOperation operation() { + return op; + } /** * @return Subject ID. @@@ -111,38 -328,51 +334,65 @@@ /** * @return Keep binary flag. */ - public abstract boolean keepBinary(); + public final boolean keepBinary() { + return isFlag(KEEP_BINARY_FLAG_MASK); + } /** - * @return Recovery flag. + * @param val Keep binary flag. */ - public abstract boolean recovery(); + public void keepBinary(boolean val) { + setFlag(val, KEEP_BINARY_FLAG_MASK); + } /** - * @return Update operation. ++ * @return Keep binary flag. + */ - public abstract GridCacheOperation operation(); ++ public final boolean recovery() { ++ return isFlag(RECOVERY_FLAG_MASK); ++ } + + /** - * @return Optional arguments for entry processor. ++ * @param val Keep binary flag. + */ - @Nullable public abstract Object[] invokeArguments(); ++ public void recovery(boolean val) { ++ setFlag(val, RECOVERY_FLAG_MASK); ++ } + + /** - * @return Flag indicating whether this request contains primary keys. + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. */ - public abstract boolean hasPrimary(); + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } /** - * @param res Response. - * @return {@code True} if current response was {@code null}. + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. */ - public abstract boolean onResponse(GridNearAtomicUpdateResponse res); + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } /** - * @return Response. + * @return Expiry policy. + */ + public abstract ExpiryPolicy expiry(); + + /** + * @return Filter. + */ + @Nullable public abstract CacheEntryPredicate[] filter(); + + /** + * @return Optional arguments for entry processor. */ - @Nullable public abstract GridNearAtomicUpdateResponse response(); + @Nullable public abstract Object[] invokeArguments(); /** * @param key Key to add. http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index dcaf246,ade9976..4a94c22 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@@ -212,26 -152,26 +151,28 @@@ public class GridNearAtomicFullUpdateRe @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, + boolean recovery, - boolean clientReq, boolean addDepInfo, int maxEntryCnt ) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.fastMap = fastMap; - this.updateVer = updateVer; - - this.topVer = topVer; - this.topLocked = topLocked; - this.syncMode = syncMode; - this.op = op; - this.retval = retval; + super(cacheId, + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + subjId, + taskNameHash, + needPrimaryRes, + skipStore, + keepBinary, ++ recovery, + addDepInfo); this.expiryPlc = expiryPlc; this.invokeArgs = invokeArgs; this.filter = filter; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java index 39b6ab2,c32501a..30197cd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java @@@ -88,10 -84,9 +84,10 @@@ public class GridNearAtomicSingleUpdate @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, + boolean recovery, - boolean clientReq, boolean addDepInfo ) { super( @@@ -107,10 -100,9 +101,10 @@@ retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - clientReq, addDepInfo ); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index dbff89a,c2372d1..6401fbd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@@ -108,8 -102,21 +104,22 @@@ public class GridNearAtomicSingleUpdate int remapCnt, boolean waitTopFut ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, recovery, remapCnt, waitTopFut); + super(cctx, + cache, + syncMode, + op, + invokeArgs, + retval, + rawRetval, + expiryPlc, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, ++ recovery, + remapCnt, + waitTopFut); assert subjId != null; @@@ -378,49 -433,21 +436,21 @@@ /** {@inheritDoc} */ @Override protected void mapOnTopology() { AffinityTopologyVersion topVer; - GridCacheVersion futVer; - - cache.topology().readLock(); - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - return; - } - - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null); - - if (err != null) { - onDone(err); + return; + } - return; - } + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - topVer = fut.topologyVersion(); + if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); ++ Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null); - futVer = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } @@@ -564,10 -628,9 +631,10 @@@ invokeArgs, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } else { @@@ -585,10 -646,9 +650,10 @@@ retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } else { @@@ -606,10 -664,9 +669,10 @@@ filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } } @@@ -631,10 -686,9 +692,10 @@@ filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled(), 1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java index 02cfd91,298ea05..bbcad12 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java @@@ -106,10 -102,9 +102,10 @@@ public class GridNearAtomicSingleUpdate @Nullable Object[] invokeArgs, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, + boolean recovery, - boolean clientReq, boolean addDepInfo ) { super( @@@ -125,15 -118,15 +119,16 @@@ retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - clientReq, addDepInfo ); - this.invokeArgs = invokeArgs; assert op == TRANSFORM : op; + + this.invokeArgs = invokeArgs; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 18b6118,14c70aa..94373c4 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@@ -100,18 -93,14 +93,15 @@@ public class GridNearAtomicSingleUpdate boolean retval, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, + boolean recovery, - boolean clientReq, boolean addDepInfo ) { - super( - cacheId, + super(cacheId, nodeId, - futVer, - fastMap, - updateVer, + futId, topVer, topLocked, syncMode, @@@ -119,12 -108,10 +109,12 @@@ retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - addDepInfo); + recovery, - clientReq, + addDepInfo + ); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index f24a9b1,a44ccf9..c5824d5 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@@ -490,49 -641,21 +642,21 @@@ public class GridNearAtomicUpdateFutur /** {@inheritDoc} */ @Override protected void mapOnTopology() { AffinityTopologyVersion topVer; - GridCacheVersion futVer; - - cache.topology().readLock(); - - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); - return; - } + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx, recovery, false, null, keys); + return; + } - if (err != null) { - onDone(err); + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - return; - } + if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); ++ Throwable err = fut.validateCache(cctx, recovery, false, null, keys); - topVer = fut.topologyVersion(); - - futVer = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } @@@ -826,50 -1036,44 +1037,45 @@@ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid)."); - int i = 0; - - for (int n = 0; n < affNodes.size(); n++) { - ClusterNode affNode = affNodes.get(n); - - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - UUID nodeId = affNode.id(); - - GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicFullUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - recovery, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - keys.size()); - - pendingMappings.put(nodeId, mapped); - } + ClusterNode primary = nodes.get(0); + + boolean needPrimaryRes = !mappingKnown || primary.isLocal(); + + UUID nodeId = primary.id(); - mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + PrimaryRequestState mapped = pendingMappings.get(nodeId); - i++; + if (mapped == null) { + GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( + cctx.cacheId(), + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + needPrimaryRes, + skipStore, + keepBinary, ++ recovery, + cctx.deploymentEnabled(), + keys.size()); + + mapped = new PrimaryRequestState(req, nodes, false); + + pendingMappings.put(nodeId, mapped); } + + if (mapped.req.initMappingLocally()) + mapped.addMapping(nodes); + + mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer); } return pendingMappings; @@@ -959,10 -1164,9 +1166,10 @@@ filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, + recovery, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled(), 1); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 955b8ba,8b52ba8..1c761c8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@@ -351,18 -320,10 +320,13 @@@ public class GridNearAtomicUpdateRespon * @param e Error cause. */ public synchronized void addFailedKey(KeyCacheObject key, Throwable e) { + assert key != null; + assert e != null; + - if (failedKeys == null) - failedKeys = new ConcurrentLinkedQueue<>(); - - failedKeys.add(key); + if (errs == null) + errs = new UpdateErrors(); - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); + errs.addFailedKey(key, e); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index d86dc91,03bbfe0..f922d09 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@@ -208,9 -211,9 +208,11 @@@ public class GridDhtColocatedCache<K, V final CacheOperationContext opCtx = ctx.operationContextPerCall(); ++ final boolean recovery = opCtx != null && opCtx.recovery(); ++ if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx, readyTopVer, Collections.singleton(ctx.toCacheKeyObject(key)), @@@ -218,6 -221,6 +220,7 @@@ skipVals, false, opCtx != null && opCtx.skipStore(), ++ recovery, needVer); return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() { @@@ -275,7 -277,6 +278,7 @@@ @Nullable UUID subjId, String taskName, final boolean deserializeBinary, - boolean recovery, ++ final boolean recovery, final boolean skipVals, boolean canRemap, final boolean needVer @@@ -302,6 -303,6 +305,7 @@@ skipVals, false, opCtx != null && opCtx.skipStore(), ++ recovery, needVer); } }, opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 31cff03,79c15fb..56dc322 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -161,8 -161,8 +161,11 @@@ public final class GridDhtColocatedLock private final boolean keepBinary; /** */ + private final boolean recovery; + ++ /** */ + private int miniId; + /** * @param cctx Registry. * @param keys Keys to lock. @@@ -917,38 -914,38 +920,36 @@@ !topLocked && (tx == null || !tx.hasRemoteLocks()); - first = false; - } - - assert !implicitTx() && !implicitSingleTx() : tx; + first = false; + } - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - implicitTx(), - implicitSingleTx(), - read, - retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncMode() == FULL_SYNC, - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? createTtl : -1L, - req = new GridNearLockRequest( ++ assert !implicitTx() && !implicitSingleTx() : tx;req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncMode() == FULL_SYNC, + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? createTtl : -1L, read ? accessTtl : -1L, - skipStore, - keepBinary, - clientFirst, - cctx.deploymentEnabled()); + skipStore, + keepBinary, + clientFirst, + cctx.deploymentEnabled()); - mapping.request(req); - } + mapping.request(req); + } distributedKeys.add(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c33dc7b,5eacc36..829b29d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -33,9 -33,9 +34,10 @@@ import java.util.concurrent.atomic.Atom import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; @@@ -52,9 -51,7 +55,10 @@@ import org.apache.ignite.internal.pagem import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.ClusterState; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; + import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@@ -74,7 -68,7 +78,8 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; + import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@@ -98,9 -88,8 +103,9 @@@ import static org.apache.ignite.interna /** * Future for exchanging partition maps. */ +@SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask { /** */ public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 12b5204,79c71b3..6aa7441 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@@ -391,43 -360,37 +372,43 @@@ public class GridNearGetRequest extend writer.incrementState(); - case 11: + case 10: - if (!writer.writeBoolean("reload", reload)) + if (!writer.writeBoolean("recovery", recovery)) return false; writer.incrementState(); - case 12: + case 11: - if (!writer.writeBoolean("skipVals", skipVals)) + if (!writer.writeBoolean("reload", reload)) return false; writer.incrementState(); - case 13: + case 12: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipVals", skipVals)) return false; writer.incrementState(); - case 14: + case 13: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 15: + case 14: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 16: + case 15: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 17: if (!writer.writeMessage("ver", ver)) return false; @@@ -513,55 -468,47 +486,55 @@@ reader.incrementState(); - case 11: + case 10: - reload = reader.readBoolean("reload"); + recovery = reader.readBoolean("recovery"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 12: + case 11: - skipVals = reader.readBoolean("skipVals"); + reload = reader.readBoolean("reload"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 13: + case 12: - subjId = reader.readUuid("subjId"); + skipVals = reader.readBoolean("skipVals"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 14: + case 13: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 15: + case 14: - topVer = reader.readMessage("topVer"); + taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 16: + case 15: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 17: ver = reader.readMessage("ver"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index d8f9222,1948df0..0900bac --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@@ -164,9 -164,9 +164,12 @@@ public final class GridNearLockFuture e /** Keep binary context flag. */ private final boolean keepBinary; + /** Recovery mode context flag. */ + private final boolean recovery; + + /** */ + private int miniId; + /** * @param cctx Registry. * @param keys Keys to lock. http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index ec4b9e5,48b508b..e519707 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@@ -398,67 -376,31 +376,31 @@@ public class GridNearLockRequest extend writer.incrementState(); - case 24: + case 25: - if (!writer.writeBoolean("firstClientReq", firstClientReq)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 25: + case 26: - if (!writer.writeBoolean("hasTransforms", hasTransforms)) + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 26: + case 27: - if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) - return false; - - writer.incrementState(); - - case 28: - if (!writer.writeBoolean("implicitTx", implicitTx)) - return false; - - writer.incrementState(); - - case 29: - if (!writer.writeIgniteUuid("miniId", miniId)) - return false; - - writer.incrementState(); - - case 30: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) - return false; - - writer.incrementState(); - - case 31: - if (!writer.writeBoolean("retVal", retVal)) - return false; - - writer.incrementState(); - - case 32: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 33: - if (!writer.writeBoolean("syncCommit", syncCommit)) - return false; - - writer.incrementState(); - - case 34: - case 27: ++ case 28: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 35: - case 28: ++ case 29: if (!writer.writeMessage("topVer", topVer)) return false; @@@ -512,63 -454,23 +454,23 @@@ reader.incrementState(); - case 24: + case 25: - firstClientReq = reader.readBoolean("firstClientReq"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 25: + case 26: - hasTransforms = reader.readBoolean("hasTransforms"); + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 26: + case 27: - implicitSingleTx = reader.readBoolean("implicitSingleTx"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 28: - implicitTx = reader.readBoolean("implicitTx"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 29: - miniId = reader.readIgniteUuid("miniId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 30: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 31: - retVal = reader.readBoolean("retVal"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 32: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@@ -576,15 -478,7 +478,7 @@@ reader.incrementState(); - case 33: - syncCommit = reader.readBoolean("syncCommit"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 34: - case 27: ++ case 28: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@@ -592,7 -486,7 +486,7 @@@ reader.incrementState(); - case 35: - case 28: ++ case 29: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index f09b6c8,976f05f..1d610c7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@@ -62,6 -62,19 +62,19 @@@ public abstract class GridNearOptimisti } if (topVer != null) { + try { - IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock()); ++ IgniteCheckedException err = tx.txState().validateTopology(cctx, false, topologyReadLock()); + + if (err != null) { + onDone(err); + + return; + } + } + finally { + topologyReadUnlock(); + } + tx.topologyVersion(topVer); cctx.mvcc().addFuture(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index cd6e275,994172b..de69b21 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@@ -44,23 -42,20 +42,23 @@@ public class GridNearSingleGetRequest e private static final long serialVersionUID = 0L; /** */ - public static final int READ_THROUGH_FLAG_MASK = 0x01; + private static final int READ_THROUGH_FLAG_MASK = 0x01; /** */ - public static final int SKIP_VALS_FLAG_MASK = 0x02; + private static final int SKIP_VALS_FLAG_MASK = 0x02; /** */ - public static final int ADD_READER_FLAG_MASK = 0x04; + private static final int ADD_READER_FLAG_MASK = 0x04; /** */ - public static final int NEED_VER_FLAG_MASK = 0x08; + private static final int NEED_VER_FLAG_MASK = 0x08; /** */ - public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10; + private static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10; + /** */ + public static final int RECOVERY_FLAG_MASK = 0x20; + /** Future ID. */ private long futId; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 1bb39e2,5ad05b0..1468e8a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@@ -43,7 -43,7 +43,6 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; --import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@@ -121,7 -121,6 +120,7 @@@ public class GridNearTransactionalCache @Nullable UUID subjId, String taskName, final boolean deserializeBinary, - boolean recovery, ++ final boolean recovery, final boolean skipVals, boolean canRemap, final boolean needVer @@@ -150,6 -149,6 +149,7 @@@ skipVals, false, skipStore, ++ recovery, needVer); } }, opCtx);
