http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index f182ecb..a44ccf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -47,10 +46,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -58,18 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture { - /** Fast map flag. */ - private final boolean fastMap; - /** Keys */ private Collection<?> keys; @@ -87,13 +80,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** Mappings if operations is mapped to more than one node. */ @GridToStringInclude - private Map<UUID, GridNearAtomicFullUpdateRequest> mappings; + private Map<UUID, PrimaryRequestState> mappings; /** Keys to remap. */ + @GridToStringInclude private Collection<KeyCacheObject> remapKeys; /** Not null is operation is mapped to single node. */ - private GridNearAtomicFullUpdateRequest singleReq; + @GridToStringInclude + private PrimaryRequestState singleReq; + + /** */ + private int resCnt; /** * @param cctx Cache context. @@ -149,84 +147,124 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu this.vals = vals; this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; - - fastMap = cache.isFastMap(filter, op); } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { + @Override public Long id() { synchronized (mux) { - return futVer; + return futId; } } /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + AffinityTopologyVersion remapTopVer0 = null; + + boolean rcvAll = false; - GridNearAtomicFullUpdateRequest req; + List<GridNearAtomicCheckUpdateRequest> checkReqs = null; synchronized (mux) { - if (singleReq != null) - req = singleReq.nodeId().equals(nodeId) ? singleReq : null; - else - req = mappings != null ? mappings.get(nodeId) : null; + if (futId == null) + return false; - if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - nodeId, - req.futureVersion(), - cctx.deploymentEnabled()); + if (singleReq != null) { + if (singleReq.req.nodeId.equals(nodeId)) { + GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail(); - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + - "before response is received: " + nodeId); + if (req != null) { + rcvAll = true; - e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + GridNearAtomicUpdateResponse res = primaryFailedResponse(req); - res.addFailedKeys(req.keys(), e); - } - } + singleReq.onPrimaryResponse(res, cctx); + + onPrimaryError(req, res); + } + } + else { + DhtLeftResult res = singleReq.onDhtNodeLeft(nodeId); + + if (res == DhtLeftResult.DONE) + rcvAll = true; + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) + checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req)); + } - if (res != null) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + nodeId + ']'); + if (rcvAll) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } } + else { + if (mappings == null) + return false; - onResult(nodeId, res, true); - } + for (Map.Entry<UUID, PrimaryRequestState> e : mappings.entrySet()) { + assert e.getKey().equals(e.getValue().req.nodeId()); - return false; - } + PrimaryRequestState reqState = e.getValue(); - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - // Wait fast-map near atomic update futures in CLOCK mode. - if (fastMap) { - GridFutureAdapter<Void> fut; + boolean reqDone = false; - synchronized (mux) { - if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); + if (e.getKey().equals(nodeId)) { + GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail(); - fut = topCompleteFut; - } - else - fut = null; - } + if (req != null) { + reqDone = true; + + GridNearAtomicUpdateResponse res = primaryFailedResponse(req); + + reqState.onPrimaryResponse(res, cctx); + + onPrimaryError(req, res); + } + } + else { + DhtLeftResult res = reqState.onDhtNodeLeft(nodeId); + + if (res == DhtLeftResult.DONE) + reqDone = true; + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) { + if (checkReqs == null) + checkReqs = new ArrayList<>(); - if (fut != null && isDone()) { - fut.onDone(); + checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req)); + } + } + + if (reqDone) { + assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']'; + + resCnt++; + + if (mappings.size() == resCnt) { + rcvAll = true; - return null; + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + + break; + } + } + } } + } - return fut; + if (checkReqs != null) { + assert !rcvAll; + + for (int i = 0; i < checkReqs.size(); i++) + sendCheckUpdateRequest(checkReqs.get(i)); } + else if (rcvAll) + finishUpdateFuture(opRes0, err0, remapTopVer0); - return null; + return false; } /** {@inheritDoc} */ @@ -244,10 +282,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu retval = Collections.emptyMap(); if (super.onDone(retval, err)) { - GridCacheVersion futVer = onFutureDone(); + Long futId = onFutureDone(); - if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); + if (futId != null) + cctx.mvcc().removeAtomicFuture(futId); return true; } @@ -256,145 +294,166 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { - GridNearAtomicFullUpdateRequest req; - - AffinityTopologyVersion remapTopVer = null; - - GridCacheReturn opRes0 = null; - CachePartialUpdateCheckedException err0 = null; - - boolean rcvAll; - - GridFutureAdapter<?> fut0 = null; + @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) { + GridCacheReturn opRes0; + CachePartialUpdateCheckedException err0; + AffinityTopologyVersion remapTopVer0; synchronized (mux) { - if (!res.futureVersion().equals(futVer)) + if (futId == null || futId != res.futureId()) return; - if (singleReq != null) { - if (!singleReq.nodeId().equals(nodeId)) - return; + PrimaryRequestState reqState; - req = singleReq; + if (singleReq != null) { + assert singleReq.req.nodeId().equals(res.primaryId()); - singleReq = null; + if (opRes == null && res.hasResult()) + opRes = res.result(); - rcvAll = true; + if (singleReq.onDhtResponse(nodeId, res)) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } + else + return; } else { - req = mappings != null ? mappings.get(nodeId) : null; + reqState = mappings != null ? mappings.get(res.primaryId()) : null; - if (req != null && req.onResponse(res)) { - resCnt++; + if (reqState != null) { + if (opRes == null && res.hasResult()) + opRes = res.result(); - rcvAll = mappings.size() == resCnt; + if (reqState.onDhtResponse(nodeId, res)) { + assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']'; + + resCnt++; + + if (mappings.size() == resCnt) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } + else + return; + } + else + return; } else return; } + } - assert req != null && req.topologyVersion().equals(topVer) : req; + UpdateErrors errors = res.errors(); - if (res.remapKeys() != null) { - assert !fastMap || cctx.kernalContext().clientNode(); + if (errors != null) { + assert errors.error() != null; - if (remapKeys == null) - remapKeys = U.newHashSet(res.remapKeys().size()); + onDone(errors.error()); + + return; + } - remapKeys.addAll(res.remapKeys()); + finishUpdateFuture(opRes0, err0, remapTopVer0); + } - if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) - mapErrTopVer = req.topologyVersion(); - } - else if (res.error() != null) { - if (res.failedKeys() != null) { - if (err == null) - err = new CachePartialUpdateCheckedException( - "Failed to update keys (retry update if possible)."); + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) + @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicAbstractUpdateRequest req; - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + AffinityTopologyVersion remapTopVer0 = null; - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; - err.add(keys, res.error(), req.topologyVersion()); - } + boolean rcvAll; + + synchronized (mux) { + if (futId == null || futId != res.futureId()) + return; + + if (singleReq != null) { + req = singleReq.processPrimaryResponse(nodeId, res); + + if (req == null) + return; + + rcvAll = singleReq.onPrimaryResponse(res, cctx); } else { - if (!req.fastMap() || req.hasPrimary()) { - GridCacheReturn ret = res.returnValue(); - - if (op == TRANSFORM) { - if (ret != null) { - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } - } - else - opRes = ret; - } - } + if (mappings == null) + return; - if (rcvAll) { - if (remapKeys != null) { - assert mapErrTopVer != null; + PrimaryRequestState reqState = mappings.get(nodeId); - remapTopVer = cctx.shared().exchange().topologyVersion(); - } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - --remapCnt > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); + if (reqState == null) + return; - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); + req = reqState.processPrimaryResponse(nodeId, res); + + if (req != null) { + if (reqState.onPrimaryResponse(res, cctx)) { + assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']'; + + resCnt++; + + rcvAll = mappings.size() == resCnt; + } + else { + assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']'; + + rcvAll = false; + } + } + else + return; + } - assert cause != null && cause.topologyVersion() != null : err; + assert req.topologyVersion().equals(topVer) : req; - remapTopVer = - new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + if (res.remapTopologyVersion() != null) { + assert !req.topologyVersion().equals(res.remapTopologyVersion()); - err = null; + if (remapKeys == null) + remapKeys = U.newHashSet(req.size()); - Collection<Object> failedKeys = cause.failedKeys(); + remapKeys.addAll(req.keys()); - remapKeys = new ArrayList<>(failedKeys.size()); + if (remapTopVer == null || remapTopVer.compareTo(res.remapTopologyVersion()) < 0) + remapTopVer = req.topologyVersion(); + } + else if (res.error() != null) + onPrimaryError(req, res); + else { + GridCacheReturn ret = res.returnValue(); - for (Object key : failedKeys) - remapKeys.add(cctx.toCacheKeyObject(key)); + if (op == TRANSFORM) { + if (ret != null) { + assert ret.value() == null || ret.value() instanceof Map : ret.value(); - updVer = null; + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; } } } + else + opRes = ret; + } + + if (rcvAll) { + remapTopVer0 = onAllReceived(); - if (remapTopVer == null) { + if (remapTopVer0 == null) { err0 = err; opRes0 = opRes; } - else { - fut0 = topCompleteFut; - - topCompleteFut = null; - - cctx.mvcc().removeAtomicFuture(futVer); - - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } } } @@ -406,67 +465,160 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (rcvAll && nearEnabled) { if (mappings != null) { - for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) { - GridNearAtomicUpdateResponse res0 = req0.response(); + for (PrimaryRequestState reqState : mappings.values()) { + GridNearAtomicUpdateResponse res0 = reqState.req.response(); - assert res0 != null : req0; + assert res0 != null : reqState; - updateNear(req0, res0); + updateNear(reqState.req, res0); } } else if (!nodeErr) updateNear(req, res); } - if (remapTopVer != null) { - if (fut0 != null) - fut0.onDone(); + if (remapTopVer0 != null) { + waitAndRemap(remapTopVer0); - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); + return; + } - return; + if (rcvAll) + onDone(opRes0, err0); + } + + private void waitAndRemap(AffinityTopologyVersion remapTopVer) { + assert remapTopVer != null; + + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(remapKeys, cause); + + onDone(e); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); } + }); + } + + /** + * @return Non null topology version if update should be remapped. + */ + @Nullable private AffinityTopologyVersion onAllReceived() { + assert futId != null; - if (topLocked) { - assert !F.isEmpty(remapKeys) : remapKeys; + AffinityTopologyVersion remapTopVer0 = null; - CachePartialUpdateCheckedException e = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + if (remapKeys != null) { + assert remapTopVer != null; - ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( - "Failed to update keys, topology changed while execute atomic update inside transaction."); + remapTopVer0 = remapTopVer; + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class); - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); - e.add(remapKeys, cause); + assert cause != null && cause.topologyVersion() != null : err; + assert remapKeys == null; + assert remapTopVer == null; - onDone(e); + remapTopVer = remapTopVer0 = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); - return; + err = null; + + Collection<Object> failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + } } + } - IgniteInternalFuture<AffinityTopologyVersion> fut = - cctx.shared().exchange().affinityReadyFuture(remapTopVer); + if (remapTopVer0 != null) { + cctx.mvcc().removeAtomicFuture(futId); - if (fut == null) - fut = new GridFinishedFuture<>(remapTopVer); + futId = null; + topVer = AffinityTopologyVersion.ZERO; - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); + remapTopVer = null; + } + + return remapTopVer0; + } + + /** + * @param opRes Operation result. + * @param err Operation error. + */ + private void finishUpdateFuture(GridCacheReturn opRes, + CachePartialUpdateCheckedException err, + @Nullable AffinityTopologyVersion remapTopVer) { + if (nearEnabled) { + if (mappings != null) { + for (PrimaryRequestState reqState : mappings.values()) { + GridNearAtomicUpdateResponse res0 = reqState.req.response(); + + assert res0 != null : reqState; + + updateNear(reqState.req, res0); } - }); + } + else { + assert singleReq != null && singleReq.req.response() != null; + + updateNear(singleReq.req, singleReq.req.response()); + } + } + + if (remapTopVer != null) { + assert !F.isEmpty(remapKeys); + + waitAndRemap(remapTopVer); return; } - if (rcvAll) - onDone(opRes0, err0); + onDone(opRes, err); } /** @@ -475,10 +627,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param req Update request. * @param res Update response. */ - private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) { + private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { assert nearEnabled; - if (res.remapKeys() != null || !req.hasPrimary()) + if (res.remapTopologyVersion() != null) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -489,59 +641,48 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** {@inheritDoc} */ @Override protected void mapOnTopology() { AffinityTopologyVersion topVer; - GridCacheVersion futVer; - - cache.topology().readLock(); - - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); - return; - } + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); + return; + } - if (err != null) { - onDone(err); + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - return; - } + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); - topVer = fut.topologyVersion(); - - futVer = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } + + topVer = fut.topologyVersion(); } - finally { - cache.topology().readUnlock(); + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; } - if (futVer != null) - map(topVer, futVer, remapKeys); + map(topVer, remapKeys); } /** @@ -549,13 +690,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * * @param mappings Mappings to send. */ - private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) { + private void sendUpdateRequests(Map<UUID, PrimaryRequestState> mappings) { UUID locNodeId = cctx.localNodeId(); - GridNearAtomicFullUpdateRequest locUpdate = null; + GridNearAtomicAbstractUpdateRequest locUpdate = null; // Send messages to remote nodes first, then run local update. - for (GridNearAtomicFullUpdateRequest req : mappings.values()) { + for (PrimaryRequestState reqState : mappings.values()) { + GridNearAtomicAbstractUpdateRequest req = reqState.req; + if (locNodeId.equals(req.nodeId())) { assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + ", req=" + req + ']'; @@ -564,18 +707,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } else { try { + if (req.initMappingLocally() && reqState.dhtNodes.isEmpty()) { + reqState.dhtNodes = null; + + req.needPrimaryResponse(true); + } + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + + msgLog.debug("Near update fut, sent request [futId=" + req.futureId() + ", node=" + req.nodeId() + ']'); } } catch (IgniteCheckedException e) { if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + + msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() + ", node=" + req.nodeId() + ", err=" + e + ']'); } @@ -587,9 +734,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); + new GridDhtAtomicCache.UpdateReplyClosure() { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { + if (syncMode != FULL_ASYNC) + onPrimaryResponse(res.nodeId(), res, false); + else if (res.remapTopologyVersion() != null) + ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req); } }); } @@ -599,18 +749,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { - map(topVer, futVer, null); + @Override protected void map(AffinityTopologyVersion topVer) { + map(topVer, null); } /** * @param topVer Topology version. - * @param futVer Future ID. * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable Collection<KeyCacheObject> remapKeys) { + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -620,64 +767,45 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - GridCacheVersion updVer; - - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { - updVer = this.updVer; - - if (updVer == null) { - updVer = futVer; - - if (log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); - } - } - else - updVer = null; + Long futId = cctx.mvcc().atomicFutureId(); Exception err = null; - GridNearAtomicFullUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null; + PrimaryRequestState singleReq0 = null; + Map<UUID, PrimaryRequestState> mappings0 = null; int size = keys.size(); + boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) && + !cctx.discovery().hasNearCache(cctx.cacheId(), topVer); + try { - if (size == 1 && !fastMap) { + if (size == 1) { assert remapKeys == null || remapKeys.size() == 1; - singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown); } else { - Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes, + Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes, topVer, - futVer, - updVer, - remapKeys); + futId, + remapKeys, + mappingKnown); if (pendingMappings.size() == 1) singleReq0 = F.firstValue(pendingMappings); else { - if (syncMode == PRIMARY_SYNC) { - mappings0 = U.newHashMap(pendingMappings.size()); - - for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings0.put(req.nodeId(), req); - } - } - else - mappings0 = pendingMappings; + mappings0 = pendingMappings; assert !mappings0.isEmpty() || size == 0 : this; } } synchronized (mux) { - assert this.futVer == futVer || (this.isDone() && this.error() != null); - assert this.topVer == topVer; + assert this.futId == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; - this.updVer = updVer; + this.topVer = topVer; + this.futId = futId; resCnt = 0; @@ -686,6 +814,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu this.remapKeys = null; } + + if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) { + assert isDone(); + + return; + } } catch (Exception e) { err = e; @@ -699,56 +833,133 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu // Optimize mapping for single key. if (singleReq0 != null) - mapSingle(singleReq0.nodeId(), singleReq0); + sendSingleRequest(singleReq0.req.nodeId(), singleReq0.req); else { assert mappings0 != null; - if (size == 0) + if (size == 0) { onDone(new GridCacheReturn(cctx, true, true, null, true)); + + return; + } else - doUpdate(mappings0); + sendUpdateRequests(mappings0); } + + if (syncMode == FULL_ASYNC) { + onDone(new GridCacheReturn(cctx, true, true, null, true)); + + return; + } + + if (mappingKnown && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion()) + checkDhtNodes(futId); } - /** - * @return Future version. - */ - private GridCacheVersion onFutureDone() { - GridCacheVersion ver0; + private void checkDhtNodes(Long futId) { + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + AffinityTopologyVersion remapTopVer0 = null; + + List<GridNearAtomicCheckUpdateRequest> checkReqs = null; - GridFutureAdapter<Void> fut0; + boolean rcvAll = false; synchronized (mux) { - fut0 = topCompleteFut; + if (this.futId == null || !this.futId.equals(futId)) + return; + + if (singleReq != null) { + if (!singleReq.req.initMappingLocally()) + return; + + DhtLeftResult res = singleReq.checkDhtNodes(cctx); + + if (res == DhtLeftResult.DONE) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) + checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req)); + else + return; + } + else { + if (mappings != null) { + for (PrimaryRequestState reqState : mappings.values()) { + if (!reqState.req.initMappingLocally()) + continue; + + DhtLeftResult res = reqState.checkDhtNodes(cctx); + + if (res == DhtLeftResult.DONE) { + assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']'; + + resCnt++; - topCompleteFut = null; + if (mappings.size() == resCnt) { + rcvAll = true; - ver0 = futVer; + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); - futVer = null; + break; + } + } + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) { + if (checkReqs == null) + checkReqs = new ArrayList<>(mappings.size()); + + checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req)); + } + } + } + else + return; + } } - if (fut0 != null) - fut0.onDone(); + if (checkReqs != null) { + assert !rcvAll; + + for (int i = 0; i < checkReqs.size(); i++) + sendCheckUpdateRequest(checkReqs.get(i)); + } + else if (rcvAll) + finishUpdateFuture(opRes0, err0, remapTopVer0); + } + + /** + * @return Future version. + */ + private Long onFutureDone() { + Long id0; + + synchronized (mux) { + id0 = futId; + + futId = null; + } - return ver0; + return id0; } /** * @param topNodes Cache nodes. * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. + * @param futId Future ID. * @param remapKeys Keys to remap. * @return Mapping. * @throws Exception If failed. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, + private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes, AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer, - @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { + Long futId, + @Nullable Collection<KeyCacheObject> remapKeys, + boolean mappingKnown) throws Exception { Iterator<?> it = null; if (vals != null) @@ -764,7 +975,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); + Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size()); // Create mappings first, then send messages. for (Object key : keys) { @@ -819,55 +1030,50 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - List<ClusterNode> affNodes = mapKey(cacheKey, topVer); + List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer); - if (affNodes.isEmpty()) + if (F.isEmpty(nodes)) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid)."); - int i = 0; - - for (int n = 0; n < affNodes.size(); n++) { - ClusterNode affNode = affNodes.get(n); - - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - UUID nodeId = affNode.id(); - - GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicFullUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - keys.size()); - - pendingMappings.put(nodeId, mapped); - } + ClusterNode primary = nodes.get(0); + + boolean needPrimaryRes = !mappingKnown || primary.isLocal(); + + UUID nodeId = primary.id(); - mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + PrimaryRequestState mapped = pendingMappings.get(nodeId); - i++; + if (mapped == null) { + GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( + cctx.cacheId(), + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + needPrimaryRes, + skipStore, + keepBinary, + cctx.deploymentEnabled(), + keys.size()); + + mapped = new PrimaryRequestState(req, nodes, false); + + pendingMappings.put(nodeId, mapped); } + + if (mapped.req.initMappingLocally()) + mapped.addMapping(nodes); + + mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer); } return pendingMappings; @@ -875,14 +1081,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. + * @param futId Future ID. + * @param mappingKnown {@code True} if update mapping is known locally. * @return Request. * @throws Exception If failed. */ - private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer) throws Exception { + private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown) + throws Exception { Object key = F.first(keys); Object val; @@ -935,18 +1140,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer); + List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer); - if (primary == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid)."); + if (F.isEmpty(nodes)) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + ClusterNode primary = nodes.get(0); + + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), primary.id(), - futVer, - fastMap, - updVer, + futId, topVer, topLocked, syncMode, @@ -957,9 +1164,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled(), 1); @@ -967,26 +1174,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu val, conflictTtl, conflictExpireTime, - conflictVer, - true); - - return req; - } - - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @return Collection of nodes to which key is mapped. - */ - private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { - GridCacheAffinityManager affMgr = cctx.affinity(); + conflictVer); - // If we can send updates in parallel - do it. - return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primaryByKey(key, topVer)); + return new PrimaryRequestState(req, nodes, true); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 22e01ae..4e20fc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -23,11 +23,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -59,29 +58,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @GridDirectTransient private UUID nodeId; - /** Future version. */ - private GridCacheVersion futVer; + /** Future ID. */ + private long futId; - /** Update error. */ - @GridDirectTransient - private volatile IgniteCheckedException err; - - /** Serialized error. */ - private byte[] errBytes; + /** */ + private UpdateErrors errs; /** Return value. */ @GridToStringInclude private GridCacheReturn ret; - /** Failed keys. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private volatile Collection<KeyCacheObject> failedKeys; - - /** Keys that should be remapped. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List<KeyCacheObject> remapKeys; + /** */ + private AffinityTopologyVersion remapTopVer; /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */ @GridDirectCollection(int.class) @@ -108,6 +96,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Partition ID. */ private int partId = -1; + /** */ + @GridDirectCollection(UUID.class) + @GridToStringInclude + private List<UUID> dhtNodes; + + /** */ + @GridDirectTransient + private boolean nodeLeft; + /** * Empty constructor required by {@link Externalizable}. */ @@ -118,24 +115,52 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * @param cacheId Cache ID. * @param nodeId Node ID this reply should be sent to. - * @param futVer Future version. + * @param futId Future ID. + * @param partId Partition. + * @param nodeLeft {@code True} if primary node failed. * @param addDepInfo Deployment info flag. */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) { - assert futVer != null; - + public GridNearAtomicUpdateResponse(int cacheId, + UUID nodeId, + long futId, + int partId, + boolean nodeLeft, + boolean addDepInfo) { this.cacheId = cacheId; this.nodeId = nodeId; - this.futVer = futVer; + this.futId = futId; + this.partId = partId; + this.nodeLeft = nodeLeft; this.addDepInfo = addDepInfo; } + /** + * @return {@code True} if primary node failed. + */ + public boolean nodeLeftResponse() { + return nodeLeft; + } + /** {@inheritDoc} */ @Override public int lookupIndex() { return CACHE_MSG_IDX; } /** + * @param dhtNodes DHT nodes. + */ + public void dhtNodes(List<UUID> dhtNodes) { + this.dhtNodes = dhtNodes; + } + + /** + * @return DHT nodes. + */ + @Nullable public List<UUID> dhtNodes() { + return dhtNodes; + } + + /** * @return Node ID this response should be sent to. */ public UUID nodeId() { @@ -150,17 +175,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @return Future version. + * @return Future ID. */ - public GridCacheVersion futureVersion() { - return futVer; - } - - /** - * @param partId Partition ID for proper striping on near node. - */ - public void partition(int partId) { - this.partId = partId; + public long futureId() { + return futId; } /** @@ -169,19 +187,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param err Error. */ public void error(IgniteCheckedException err){ - this.err = err; + if (errs == null) + errs = new UpdateErrors(); + + errs.onError(err); } /** {@inheritDoc} */ @Override public IgniteCheckedException error() { - return err; + return errs != null ? errs.error() : null; } /** * @return Collection of failed keys. */ public Collection<KeyCacheObject> failedKeys() { - return failedKeys; + return errs != null ? errs.failedKeys() : null; } /** @@ -200,17 +221,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @param remapKeys Remap keys. + * @param remapTopVer Topology version to remap update. */ - public void remapKeys(List<KeyCacheObject> remapKeys) { - this.remapKeys = remapKeys; + void remapTopologyVersion(AffinityTopologyVersion remapTopVer) { + this.remapTopVer = remapTopVer; } /** - * @return Remap keys. + * @return Topology version if update should be remapped. */ - public Collection<KeyCacheObject> remapKeys() { - return remapKeys; + @Nullable AffinityTopologyVersion remapTopologyVersion() { + return remapTopVer; } /** @@ -221,7 +242,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param ttl TTL for near cache update. * @param expireTime Expire time for near cache update. */ - public void addNearValue(int keyIdx, + void addNearValue(int keyIdx, @Nullable CacheObject val, long ttl, long expireTime) { @@ -242,7 +263,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param expireTime Expire time for near cache update. */ @SuppressWarnings("ForLoopReplaceableByForEach") - public void addNearTtl(int keyIdx, long ttl, long expireTime) { + void addNearTtl(int keyIdx, long ttl, long expireTime) { if (ttl >= 0) { if (nearTtls == null) { nearTtls = new GridLongList(16); @@ -299,7 +320,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * @param nearVer Version generated on primary node to be used for originating node's near cache update. */ - public void nearVersion(GridCacheVersion nearVer) { + void nearVersion(GridCacheVersion nearVer) { this.nearVer = nearVer; } @@ -313,7 +334,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * @param keyIdx Index of key for which update was skipped */ - public void addSkippedIndex(int keyIdx) { + void addSkippedIndex(int keyIdx) { if (nearSkipIdxs == null) nearSkipIdxs = new ArrayList<>(); @@ -351,35 +372,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param e Error cause. */ public synchronized void addFailedKey(KeyCacheObject key, Throwable e) { - if (failedKeys == null) - failedKeys = new ConcurrentLinkedQueue<>(); - - failedKeys.add(key); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); - } - - /** - * Adds keys to collection of failed keys. - * - * @param keys Key to add. - * @param e Error cause. - */ - public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) { - if (keys != null) { - if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); - - failedKeys.addAll(keys); - } - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); + if (errs == null) + errs = new UpdateErrors(); - err.addSuppressed(e); + errs.addFailedKey(key, e); } /** @@ -387,18 +383,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * * @param keys Key to add. * @param e Error cause. - * @param ctx Context. */ - public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) { - if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); + synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) { + if (errs == null) + errs = new UpdateErrors(); - failedKeys.addAll(keys); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); + errs.addFailedKeys(keys, e); } /** {@inheritDoc} @@ -406,14 +396,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (err != null && errBytes == null) - errBytes = U.marshal(ctx, err); - GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(failedKeys, cctx); - - prepareMarshalCacheObjects(remapKeys, cctx); + if (errs != null) + errs.prepareMarshal(this, cctx); prepareMarshalCacheObjects(nearVals, cctx); @@ -425,14 +411,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null && err == null) - err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - GridCacheContext cctx = ctx.cacheContext(cacheId); - finishUnmarshalCacheObjects(failedKeys, cctx, ldr); - - finishUnmarshalCacheObjects(remapKeys, cctx, ldr); + if (errs != null) + errs.finishUnmarshal(this, cctx, ldr); finishUnmarshalCacheObjects(nearVals, cctx, ldr); @@ -471,19 +453,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr switch (writer.state()) { case 3: - if (!writer.writeByteArray("errBytes", errBytes)) + if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID)) return false; writer.incrementState(); case 4: - if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("errs", errs)) return false; writer.incrementState(); case 5: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); @@ -531,7 +513,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 13: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("remapTopVer", remapTopVer)) return false; writer.incrementState(); @@ -559,7 +541,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr switch (reader.state()) { case 3: - errBytes = reader.readByteArray("errBytes"); + dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID); if (!reader.isLastRead()) return false; @@ -567,7 +549,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 4: - failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + errs = reader.readMessage("errs"); if (!reader.isLastRead()) return false; @@ -575,7 +557,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 5: - futVer = reader.readMessage("futVer"); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -639,7 +621,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 13: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + remapTopVer = reader.readMessage("remapTopVer"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java new file mode 100644 index 0000000..1d415c8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class UpdateErrors implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Failed keys. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List<KeyCacheObject> failedKeys; + + /** Update error. */ + @GridDirectTransient + @GridToStringInclude + private IgniteCheckedException err; + + /** Serialized update error. */ + private byte[] errBytes; + + /** + * + */ + public UpdateErrors() { + // No-op. + } + + /** + * @param err Error. + */ + public UpdateErrors(IgniteCheckedException err) { + assert err != null; + + this.err = err; + } + + /** + * @param err Error. + */ + public void onError(IgniteCheckedException err){ + this.err = err; + } + + /** + * @return Error. + */ + public IgniteCheckedException error() { + return err; + } + + /** + * @return Failed keys. + */ + public Collection<KeyCacheObject> failedKeys() { + return failedKeys; + } + + /** + * Adds key to collection of failed keys. + * + * @param key Key to add. + * @param e Error cause. + */ + void addFailedKey(KeyCacheObject key, Throwable e) { + if (failedKeys == null) + failedKeys = new ArrayList<>(); + + failedKeys.add(key); + + if (err == null) + err = new IgniteCheckedException("Failed to update keys."); + + err.addSuppressed(e); + } + + /** + * @param keys Keys. + * @param e Error. + */ + void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) { + if (failedKeys == null) + failedKeys = new ArrayList<>(keys.size()); + + failedKeys.addAll(keys); + + if (err == null) + err = new IgniteCheckedException("Failed to update keys on primary node."); + + err.addSuppressed(e); + } + + /** {@inheritDoc} */ + void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException { + msg.prepareMarshalCacheObjects(failedKeys, cctx); + + if (errBytes == null) + errBytes = U.marshal(cctx.marshaller(), err); + } + + /** {@inheritDoc} */ + void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException { + msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr); + + if (errBytes != null && err == null) + err = U.unmarshal(cctx.marshaller(), errBytes, U.resolveClassLoader(ldr, cctx.gridConfig())); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(UpdateErrors.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -46; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(UpdateErrors.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 41632ef..62aecd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -43,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; @@ -141,10 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { List<Integer> nearValsIdxs = res.nearValuesIndexes(); List<Integer> skipped = res.skippedIndexes(); - GridCacheVersion ver = req.updateVersion(); - - if (ver == null) - ver = res.nearVersion(); + GridCacheVersion ver = res.nearVersion(); assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']'; @@ -194,7 +192,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { processNearAtomicUpdateResponse(ver, key, val, - null, ttl, expireTime, req.keepBinary(), @@ -212,7 +209,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param ver Version. * @param key Key. * @param val Value. - * @param valBytes Value bytes. * @param ttl TTL. * @param expireTime Expire time. * @param nodeId Node ID. @@ -224,7 +220,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheVersion ver, KeyCacheObject key, @Nullable CacheObject val, - @Nullable byte[] valBytes, long ttl, long expireTime, boolean keepBinary, @@ -241,7 +236,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { try { entry = entryEx(key, topVer); - GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE; + GridCacheOperation op = val != null ? UPDATE : DELETE; GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, @@ -299,11 +294,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param nodeId Sender node ID. * @param req Dht atomic update request. * @param res Dht atomic update response. + * @return Evicted near keys (if any). */ - public void processDhtAtomicUpdateRequest( + @Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest( UUID nodeId, GridDhtAtomicAbstractUpdateRequest req, - GridDhtAtomicUpdateResponse res + GridDhtAtomicNearResponse res ) { GridCacheVersion ver = req.writeVersion(); @@ -313,6 +309,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + List<KeyCacheObject> nearEvicted = null; + for (int i = 0; i < req.nearSize(); i++) { KeyCacheObject key = req.nearKey(i); @@ -322,7 +320,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridCacheEntryEx entry = peekEx(key); if (entry == null) { - res.addNearEvicted(key); + if (nearEvicted == null) + nearEvicted = new ArrayList<>(); + + nearEvicted.add(key); break; } @@ -388,6 +389,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e)); } } + + return nearEvicted; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index b3f0684..485059f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -425,7 +425,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler onEntryUpdate(evt, notify, loc, recordIgniteEvt); } - }); + }, sync); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 5ca3da8..35fbe11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -140,7 +140,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * before stored in cache. * @return Cache key object. */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj); + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, + @Nullable GridCacheContext cctx, + Object obj, + boolean userObj); /** * @param ctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index ff7c4ba..e0549fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -231,8 +231,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme cctx.affinity().partition(obj, false) : ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null); } - catch (IgniteCheckedException ignored) { - U.error(log, "Failed to get partition"); + catch (IgniteCheckedException e) { + U.error(log, "Failed to get partition", e); return -1; } @@ -327,13 +327,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** * @param key Key. - */ - UserKeyCacheObjectImpl(Object key) { - this(key, -1); - } - - /** - * @param key Key. + * @param part Partition. */ UserKeyCacheObjectImpl(Object key, int part) { super(key, null, part); @@ -341,6 +335,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** * @param key Key. + * @param valBytes Marshalled key. + * @param part Partition. */ UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) { super(key, valBytes, part); @@ -366,10 +362,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); - return new KeyCacheObjectImpl(val, valBytes); + return new KeyCacheObjectImpl(val, valBytes, partition()); } - return new KeyCacheObjectImpl(val, valBytes); + return new KeyCacheObjectImpl(val, valBytes, partition()); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to marshal object: " + val, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 53096ab..6c85b32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -56,7 +56,7 @@ public class StripedExecutor implements ExecutorService { /** * @param cnt Count. - * @param gridName Node name. + * @param igniteInstanceName Node name. * @param poolName Pool name. * @param log Logger. */ @@ -435,7 +435,11 @@ public class StripedExecutor implements ExecutorService { * Starts the stripe. */ void start() { - thread = new IgniteThread(igniteInstanceName, poolName + "-stripe-" + idx, this); + thread = new IgniteThread(igniteInstanceName, + poolName + "-stripe-" + idx, + this, + IgniteThread.GRP_IDX_UNASSIGNED, + idx); thread.start(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 7abd367..96f3797 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.util.future; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -118,7 +120,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig } catch (IgniteCheckedException e) { if (!ignoreFailure(e)) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + if (e instanceof NodeStoppingException) { + IgniteLogger log = logger(); + + if (log != null && log.isDebugEnabled()) + log.debug("Failed to execute compound future reducer, node stopped."); + } + else + U.error(null, "Failed to execute compound future reducer: " + this, e); onDone(e); }
