Repository: ignite Updated Branches: refs/heads/ignite-2926 [created] b2e170501
WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2776cca1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2776cca1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2776cca1 Branch: refs/heads/ignite-2926 Commit: 2776cca164b716339caec14a0a66d613d7eb877c Parents: b93026f Author: vozerov-gridgain <[email protected]> Authored: Thu Mar 31 16:59:15 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Mar 31 16:59:15 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 24 +- .../GridNearAtomicSingleUpdateFuture.java | 1186 ++++++++++++++++++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 3 +- 3 files changed, 1201 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2776cca1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f85862d..00680ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -994,7 +994,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicSingleUpdateFuture updateFut = createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1026,7 +1026,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicSingleUpdateFuture updateFut = createSingleUpdateFuture(key, null, null, null, retval, filter, true); if (statsEnabled) @@ -1053,7 +1053,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param waitTopFut Whether to wait for topology future. * @return Future. */ - private GridNearAtomicUpdateFuture createSingleUpdateFuture( + private GridNearAtomicSingleUpdateFuture createSingleUpdateFuture( K key, @Nullable V val, @Nullable EntryProcessor proc, @@ -1080,7 +1080,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheOperationContext opCtx = ctx.operationContextPerCall(); - return new GridNearAtomicUpdateFuture( + return new GridNearAtomicSingleUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), @@ -1088,10 +1088,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collections.singletonList(key), vals, invokeArgs, - null, - null, retval, - false, opCtx != null ? opCtx.expiry() : null, filter, ctx.subjectIdPerCall(null, opCtx), @@ -2878,10 +2875,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + IgniteInternalFuture fut = ctx.mvcc().atomicFuture(res.futureVersion()); - if (fut != null) - fut.onResult(nodeId, res); + if (fut != null) { + if (fut instanceof GridNearAtomicUpdateFuture) + ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res); + else { + assert fut instanceof GridNearAtomicSingleUpdateFuture; + + ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res); + } + } else U.warn(log, "Failed to find near update future for update response (will ignore) " + "[nodeId=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/2776cca1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java new file mode 100644 index 0000000..4112308 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -0,0 +1,1186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + +/** + * DHT atomic cache near single update future. + */ +public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object> { + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + private final GridCacheContext cctx; + + /** Cache. */ + private GridDhtAtomicCache cache; + + /** Update operation. */ + private final GridCacheOperation op; + + /** Keys */ + private Collection<?> keys; + + /** Values. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private Collection<?> vals; + + /** Optional arguments for entry processor. */ + private Object[] invokeArgs; + + /** Return value require flag. */ + private final boolean retval; + + /** Expiry policy. */ + private final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + private final CacheEntryPredicate[] filter; + + /** Write synchronization mode. */ + private final CacheWriteSynchronizationMode syncMode; + + /** Fast map flag. */ + private final boolean fastMap; + + /** Near cache flag. */ + private final boolean nearEnabled; + + /** Subject ID. */ + private final UUID subjId; + + /** Task name hash. */ + private final int taskNameHash; + + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + + /** Skip store flag. */ + private final boolean skipStore; + + /** */ + private final boolean keepBinary; + + /** Wait for topology future flag. */ + private final boolean waitTopFut; + + /** Remap count. */ + private int remapCnt; + + /** State. */ + private final UpdateState state; + + /** + * @param cctx Cache context. + * @param cache Cache instance. + * @param syncMode Write synchronization mode. + * @param op Update operation. + * @param keys Keys to update. + * @param vals Values or transform closure. + * @param invokeArgs Optional arguments for entry processor. + * @param retval Return value require flag. + * @param expiryPlc Expiry policy explicitly specified for cache operation. + * @param filter Entry filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. + * @param remapCnt Maximum number of retries. + * @param waitTopFut If {@code false} does not wait for affinity change future. + */ + public GridNearAtomicSingleUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + Collection<?> keys, + @Nullable Collection<?> vals, + @Nullable Object[] invokeArgs, + final boolean retval, + @Nullable ExpiryPolicy expiryPlc, + final CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + int remapCnt, + boolean waitTopFut + ) { + assert vals == null || vals.size() == keys.size(); + assert subjId != null; + + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.keys = keys; + this.vals = vals; + this.invokeArgs = invokeArgs; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.waitTopFut = waitTopFut; + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + + fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && + cctx.config().getAtomicWriteOrderMode() == CLOCK && + !(cctx.writeThrough() && cctx.config().getInterceptor() != null); + + nearEnabled = CU.isNearEnabled(cctx); + + if (!waitTopFut) + remapCnt = 1; + + this.remapCnt = remapCnt; + + state = new UpdateState(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return state.futureVersion(); + } + + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { + // Wait fast-map near atomic update futures in CLOCK mode. + return fastMap; + } + + /** {@inheritDoc} */ + @Override public Collection<?> keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + state.onNodeLeft(nodeId); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); + + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; + + // Cannot remap. + remapCnt = 1; + + state.map(topVer, null); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange()) { + GridFutureAdapter<Void> fut = state.completeFuture(topVer); + + if (fut != null && isDone()) { + fut.onDone(); + + return null; + } + + return fut; + } + + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert res == null || res instanceof GridCacheReturn; + + GridCacheReturn ret = (GridCacheReturn)res; + + Object retval = + res == null ? null : (this.retval || op == TRANSFORM) ? + cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); + + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); + + if (super.onDone(retval, err)) { + GridCacheVersion futVer = state.onFutureDone(); + + if (futVer != null) + cctx.mvcc().removeAtomicFuture(futVer); + + return true; + } + + return false; + } + + /** + * Response callback. + * + * @param nodeId Node ID. + * @param res Update response. + */ + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + state.onResult(nodeId, res, false); + } + + /** + * Updates near cache. + * + * @param req Update request. + * @param res Update response. + */ + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + assert nearEnabled; + + if (res.remapKeys() != null || !req.hasPrimary()) + return; + + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); + + near.processNearAtomicUpdateResponse(req, res); + } + + /** + * Maps future on ready topology. + */ + private void mapOnTopology() { + cache.topology().readLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); + + return; + } + + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); + + return; + } + + topVer = fut.topologyVersion(); + } + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; + } + } + finally { + cache.topology().readUnlock(); + } + + state.map(topVer, null); + } + + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + private boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } + + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. + * @return Collection of nodes to which key is mapped. + */ + private Collection<ClusterNode> mapKey( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean fastMap + ) { + GridCacheAffinityManager affMgr = cctx.affinity(); + + // If we can send updates in parallel - do it. + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } + + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, true, null, true)); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } + } + + /** + * Sends messages to remote nodes and updates local cache. + * + * @param mappings Mappings to send. + */ + private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { + UUID locNodeId = cctx.localNodeId(); + + GridNearAtomicUpdateRequest locUpdate = null; + + // Send messages to remote nodes first, then run local update. + for (GridNearAtomicUpdateRequest req : mappings.values()) { + if (locNodeId.equals(req.nodeId())) { + assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + + ", req=" + req + ']'; + + locUpdate = req; + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } + } + + if (locUpdate != null) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, true, null, true)); + } + + /** + * + */ + private class UpdateState { + /** Current topology version. */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + private GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + private AffinityTopologyVersion mapErrTopVer; + + /** Mappings if operations is mapped to more than one node. */ + @GridToStringInclude + private Map<UUID, GridNearAtomicUpdateRequest> mappings; + + /** */ + private int resCnt; + + /** Error. */ + private CachePartialUpdateCheckedException err; + + /** Future ID. */ + private GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; + + /** Keys to remap. */ + private Collection<KeyCacheObject> remapKeys; + + /** Not null is operation is mapped to single node. */ + private GridNearAtomicUpdateRequest singleReq; + + /** Operation result. */ + private GridCacheReturn opRes; + + /** + * @return Future version. + */ + @Nullable synchronized GridCacheVersion futureVersion() { + return futVer; + } + + /** + * @param nodeId Left node ID. + */ + void onNodeLeft(UUID nodeId) { + GridNearAtomicUpdateResponse res = null; + + synchronized (this) { + GridNearAtomicUpdateRequest req; + + if (singleReq != null) + req = singleReq.nodeId().equals(nodeId) ? singleReq : null; + else + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null && req.response() == null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + nodeId, + req.futureVersion(), + cctx.deploymentEnabled()); + + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + nodeId); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); + } + } + + if (res != null) + onResult(nodeId, res, true); + } + + /** + * @param nodeId Node ID. + * @param res Response. + * @param nodeErr {@code True} if response was created on node failure. + */ + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) + void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; + + AffinityTopologyVersion remapTopVer = null; + + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + + boolean rcvAll; + + GridFutureAdapter<?> fut0 = null; + + synchronized (this) { + if (!res.futureVersion().equals(futVer)) + return; + + if (singleReq != null) { + if (!singleReq.nodeId().equals(nodeId)) + return; + + req = singleReq; + + singleReq = null; + + rcvAll = true; + } + else { + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null && req.onResponse(res)) { + resCnt++; + + rcvAll = mappings.size() == resCnt; + } + else + return; + } + + assert req != null && req.topologyVersion().equals(topVer) : req; + + if (res.remapKeys() != null) { + assert !fastMap || cctx.kernalContext().clientNode(); + + if (remapKeys == null) + remapKeys = U.newHashSet(res.remapKeys().size()); + + remapKeys.addAll(res.remapKeys()); + + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) { + if (res.failedKeys() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + } + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) + addInvokeResults(ret); + } + else + opRes = ret; + } + } + + if (rcvAll) { + if (remapKeys != null) { + assert mapErrTopVer != null; + + remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + + Collection<Object> failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + + updVer = null; + } + } + } + + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } + } + } + + if (res.error() != null && res.failedKeys() == null) { + onDone(res.error()); + + return; + } + + if (rcvAll && nearEnabled) { + if (mappings != null) { + for (GridNearAtomicUpdateRequest req0 : mappings.values()) { + GridNearAtomicUpdateResponse res0 = req0.response(); + + assert res0 != null : req0; + + updateNear(req0, res0); + } + } + else if (!nodeErr) + updateNear(req, res); + } + + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); + + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(remapKeys, cause); + + onDone(e); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = + cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + AffinityTopologyVersion topVer = fut.get(); + + map(topVer, remapKeys); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); + + return; + } + + if (rcvAll) + onDone(opRes0, err0); + } + + /** + * @param req Request. + * @param e Error. + */ + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (this) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion(), + cctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + onResult(req.nodeId(), res, true); + } + } + + /** + * @param topVer Topology version. + * @param remapKeys Keys to remap. + */ + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); + + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); + + return; + } + + Exception err = null; + GridNearAtomicUpdateRequest singleReq0 = null; + Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; + + int size = keys.size(); + + GridCacheVersion futVer = cctx.versions().next(topVer); + + GridCacheVersion updVer; + + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { + updVer = this.updVer; + + if (updVer == null) { + updVer = cctx.versions().next(topVer); + + if (log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); + } + } + else + updVer = null; + + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; + + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + } + else { + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, + topVer, + futVer, + updVer, + remapKeys); + + if (pendingMappings.size() == 1) + singleReq0 = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings0 = U.newHashMap(pendingMappings.size()); + + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings0.put(req.nodeId(), req); + } + } + else + mappings0 = pendingMappings; + + assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this; + } + } + + synchronized (this) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.updVer = updVer; + this.futVer = futVer; + + resCnt = 0; + + singleReq = singleReq0; + mappings = mappings0; + + this.remapKeys = null; + } + } + catch (Exception e) { + err = e; + } + + if (err != null) { + onDone(err); + + return; + } + + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) { + assert isDone() : GridNearAtomicSingleUpdateFuture.this; + + return; + } + } + + // Optimize mapping for single key. + if (singleReq0 != null) + mapSingle(singleReq0.nodeId(), singleReq0); + else { + assert mappings0 != null; + + if (size == 0) + onDone(new GridCacheReturn(cctx, true, true, null, true)); + else + doUpdate(mappings0); + } + } + + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + + return null; + } + + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; + + GridFutureAdapter<Void> fut0; + + synchronized (this) { + fut0 = topCompleteFut; + + topCompleteFut = null; + + ver0 = futVer; + + futVer = null; + } + + if (fut0 != null) + fut0.onDone(); + + return ver0; + } + + /** + * @param topNodes Cache nodes. + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @param remapKeys Keys to remap. + * @return Mapping. + * @throws Exception If failed. + */ + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, + AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer, + @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { + Iterator<?> it = null; + + if (vals != null) + it = vals.iterator(); + + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); + + // Create mappings first, then send messages. + for (Object key : keys) { + if (key == null) + throw new NullPointerException("Null key."); + + Object val; + + if (vals != null) { + val = it.next(); + + if (val == null) + throw new NullPointerException("Null value."); + } + else { + val = null; + } + + if (val == null && op != GridCacheOperation.DELETE) + continue; + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + int i = 0; + + for (ClusterNode affNode : affNodes) { + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + UUID nodeId = affNode.id(); + + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); + + if (mapped == null) { + mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + keys.size()); + + pendingMappings.put(nodeId, mapped); + } + + mapped.addUpdateEntry(cacheKey, + val, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + i == 0 + ); + + i++; + } + } + + return pendingMappings; + } + + /** + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @return Request. + * @throws Exception If failed. + */ + private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer) throws Exception { + Object key = F.first(keys); + + Object val; + + if (vals != null) + // Regular PUT. + val = F.first(vals); + else + // Regular REMOVE. + val = null; + + // We still can get here if user pass map with single element. + if (key == null) + throw new NullPointerException("Null key."); + + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); + + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + req.addUpdateEntry(cacheKey, + val, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + true); + + return req; + } + + /** + * @param ret Result from single node. + */ + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } + + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; + + if (err0 == null) + err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(failedKeys.size()); + + for (KeyCacheObject key : failedKeys) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err0.add(keys, err, topVer); + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(UpdateState.class, this); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2776cca1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 519df17..69e6274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -69,8 +69,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<Object>{ +public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implements GridCacheAtomicFuture<Object> { /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
