Repository: ignite Updated Branches: refs/heads/ignite-2523-remap-issue-2 [created] f8aabbf3a
Revert "IGNITE-2926: Implemented special version of single-update future for ATOMIC cache. Used for PRIMARY mode and only in case of a single key-value pair update. Gives 3% garbage reduction. Not much, though, the main goal for this change is not perofmance, but rather infrastructure for further improvements. Namely, for single-update request which will be initiated only form this future." This reverts commit e833eb2d3d09112ddca994c17b73441df20524a3. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8aabbf3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8aabbf3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8aabbf3 Branch: refs/heads/ignite-2523-remap-issue-2 Commit: f8aabbf3ab0a20d8d126b105986bc2c42b669a3d Parents: 97cf2b3 Author: vozerov-gridgain <[email protected]> Authored: Mon Apr 25 15:27:44 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Apr 25 15:27:44 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheOperationFilter.java | 61 -- .../processors/cache/GridCacheAtomicFuture.java | 5 + .../dht/atomic/GridDhtAtomicCache.java | 107 +-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 + .../GridNearAtomicAbstractUpdateFuture.java | 244 ------- .../GridNearAtomicSingleUpdateFuture.java | 645 ------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 319 +++++++-- 7 files changed, 301 insertions(+), 1085 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java deleted file mode 100644 index 7fdfaac..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.jetbrains.annotations.Nullable; - -/** - * Cache operation filter. - */ -public enum CacheOperationFilter { - /** Always pass. */ - ALWAYS, - - /** No value. */ - NO_VAL, - - /** Has value. */ - HAS_VAL, - - /** Equals to value. */ - EQUALS_VAL; - - /** - * Creare predicate from operation filter. - * - * @param val Optional value. - * @return Predicate. - */ - @Nullable public CacheEntryPredicate createPredicate(@Nullable CacheObject val) { - switch (this) { - case ALWAYS: - return null; - - case NO_VAL: - return new CacheEntryPredicateNoValue(); - - case HAS_VAL: - return new CacheEntryPredicateHasValue(); - - default: - assert this == EQUALS_VAL; - - return new CacheEntryPredicateContainsValue(val); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index c96d00f..359909e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -38,4 +38,9 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { * @return Future or {@code null} if no need to wait. */ public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer); + + /** + * @return Future keys. + */ + public Collection<?> keys(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d28aaaa..d8a0782 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -983,7 +983,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); - final GridNearAtomicAbstractUpdateFuture updateFut = + final GridNearAtomicUpdateFuture updateFut = createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1015,7 +1015,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - final GridNearAtomicAbstractUpdateFuture updateFut = + final GridNearAtomicUpdateFuture updateFut = createSingleUpdateFuture(key, null, null, null, retval, filter, true); if (statsEnabled) @@ -1042,7 +1042,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param waitTopFut Whether to wait for topology future. * @return Future. */ - private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( + private GridNearAtomicUpdateFuture createSingleUpdateFuture( K key, @Nullable V val, @Nullable EntryProcessor proc, @@ -1054,19 +1054,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheOperationContext opCtx = ctx.operationContextPerCall(); GridCacheOperation op; - Object val0; + Collection vals; if (val != null) { op = UPDATE; - val0 = val; + vals = Collections.singletonList(val); } else if (proc != null) { op = TRANSFORM; - val0 = proc; + vals = Collections.singletonList(proc); } else { op = DELETE; - val0 = null; + vals = null; } GridCacheDrInfo conflictPutVal = null; @@ -1080,75 +1080,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (op == UPDATE) { conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), ctx.versions().next(dcId)); - val0 = null; + vals = null; } else if (op == GridCacheOperation.TRANSFORM) { conflictPutVal = new GridCacheDrInfo(proc, ctx.versions().next(dcId)); - val0 = null; + vals = null; } else conflictRmvVer = ctx.versions().next(dcId); } - CacheEntryPredicate[] filters = CU.filterArray(filter); - - if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) { - return new GridNearAtomicSingleUpdateFuture( - ctx, - this, - ctx.config().getWriteSynchronizationMode(), - op, - key, - val0, - invokeArgs, - retval, - false, - opCtx != null ? opCtx.expiry() : null, - filters, - ctx.subjectIdPerCall(null, opCtx), - ctx.kernalContext().job().currentTaskNameHash(), - opCtx != null && opCtx.skipStore(), - opCtx != null && opCtx.isKeepBinary(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut - ); - } - else { - return new GridNearAtomicUpdateFuture( - ctx, - this, - ctx.config().getWriteSynchronizationMode(), - op, - Collections.singletonList(key), - val0 != null ? Collections.singletonList(val0) : null, - invokeArgs, - conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, - conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, - retval, - false, - opCtx != null ? opCtx.expiry() : null, - filters, - ctx.subjectIdPerCall(null, opCtx), - ctx.kernalContext().job().currentTaskNameHash(), - opCtx != null && opCtx.skipStore(), - opCtx != null && opCtx.isKeepBinary(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); - } - } - - /** - * Whether this is fast-map operation. - * - * @param filters Filters. - * @param op Operation. - * @return {@code True} if fast-map. - */ - public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) { - return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC && - ctx.config().getAtomicWriteOrderMode() == CLOCK && - !(ctx.writeThrough() && ctx.config().getInterceptor() != null); + return new GridNearAtomicUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + op, + Collections.singletonList(key), + vals, + invokeArgs, + conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, + conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, + retval, + false, + opCtx != null ? opCtx.expiry() : null, + CU.filterArray(filter), + ctx.subjectIdPerCall(null, opCtx), + ctx.kernalContext().job().currentTaskNameHash(), + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); } /** @@ -2880,15 +2842,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicAbstractUpdateFuture fut = - (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (fut != null) { - if (fut instanceof GridNearAtomicSingleUpdateFuture) - ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false); - else - ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false); - } + if (fut != null) + fut.onResult(nodeId, res, false); else U.warn(log, "Failed to find near update future for update response (will ignore) " + "[nodeId=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 5760596..8e91272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -205,6 +205,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return null; } + /** {@inheritDoc} */ + @Override public Collection<KeyCacheObject> keys() { + return keys; + } + /** * @param entry Entry to map. * @param val Value to write. http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java deleted file mode 100644 index 7f52299..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -import javax.cache.expiry.ExpiryPolicy; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; - -/** - * Base for near atomic update futures. - */ -public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<Object> { - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Cache context. */ - protected final GridCacheContext cctx; - - /** Cache. */ - protected final GridDhtAtomicCache cache; - - /** Write synchronization mode. */ - protected final CacheWriteSynchronizationMode syncMode; - - /** Update operation. */ - protected final GridCacheOperation op; - - /** Optional arguments for entry processor. */ - protected final Object[] invokeArgs; - - /** Return value require flag. */ - protected final boolean retval; - - /** Raw return value flag. */ - protected final boolean rawRetval; - - /** Expiry policy. */ - protected final ExpiryPolicy expiryPlc; - - /** Optional filter. */ - protected final CacheEntryPredicate[] filter; - - /** Subject ID. */ - protected final UUID subjId; - - /** Task name hash. */ - protected final int taskNameHash; - - /** Skip store flag. */ - protected final boolean skipStore; - - /** Keep binary flag. */ - protected final boolean keepBinary; - - /** Wait for topology future flag. */ - protected final boolean waitTopFut; - - /** Near cache flag. */ - protected final boolean nearEnabled; - - /** Mutex to synchronize state updates. */ - protected final Object mux = new Object(); - - /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ - protected boolean topLocked; - - /** Remap count. */ - protected int remapCnt; - - /** Current topology version. */ - protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** */ - protected GridCacheVersion updVer; - - /** Topology version when got mapping error. */ - protected AffinityTopologyVersion mapErrTopVer; - - /** */ - protected int resCnt; - - /** Error. */ - protected CachePartialUpdateCheckedException err; - - /** Future ID. */ - protected GridCacheVersion futVer; - - /** Completion future for a particular topology version. */ - protected GridFutureAdapter<Void> topCompleteFut; - - /** Operation result. */ - protected GridCacheReturn opRes; - - /** - * Constructor. - * - * @param cctx Cache context. - * @param cache Cache. - * @param syncMode Synchronization mode. - * @param op Operation. - * @param invokeArgs Invoke arguments. - * @param retval Return value flag. - * @param rawRetval Raw return value flag. - * @param expiryPlc Expiry policy. - * @param filter Filter. - * @param subjId Subject ID. - * @param taskNameHash Task name hash. - * @param skipStore Skip store flag. - * @param keepBinary Keep binary flag. - * @param remapCnt Remap count. - * @param waitTopFut Wait topology future flag. - */ - protected GridNearAtomicAbstractUpdateFuture( - GridCacheContext cctx, - GridDhtAtomicCache cache, - CacheWriteSynchronizationMode syncMode, - GridCacheOperation op, - @Nullable Object[] invokeArgs, - boolean retval, - boolean rawRetval, - @Nullable ExpiryPolicy expiryPlc, - CacheEntryPredicate[] filter, - UUID subjId, - int taskNameHash, - boolean skipStore, - boolean keepBinary, - int remapCnt, - boolean waitTopFut - ) { - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - - this.cctx = cctx; - this.cache = cache; - this.syncMode = syncMode; - this.op = op; - this.invokeArgs = invokeArgs; - this.retval = retval; - this.rawRetval = rawRetval; - this.expiryPlc = expiryPlc; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.waitTopFut = waitTopFut; - - nearEnabled = CU.isNearEnabled(cctx); - - if (!waitTopFut) - remapCnt = 1; - - this.remapCnt = remapCnt; - } - - /** - * Performs future mapping. - */ - public void map() { - AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); - - if (topVer == null) - mapOnTopology(); - else { - topLocked = true; - - // Cannot remap. - remapCnt = 1; - - map(topVer); - } - } - - /** - * @param topVer Topology version. - */ - protected abstract void map(AffinityTopologyVersion topVer); - - /** - * Maps future on ready topology. - */ - protected abstract void mapOnTopology(); - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. - } - - /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - protected boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java deleted file mode 100644 index abfc5c9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ /dev/null @@ -1,645 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -import javax.cache.expiry.ExpiryPolicy; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.UUID; - -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; - -/** - * DHT atomic cache near update future. - */ -public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { - /** Keys */ - private Object key; - - /** Values. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Object val; - - /** Not null is operation is mapped to single node. */ - private GridNearAtomicUpdateRequest req; - - /** - * @param cctx Cache context. - * @param cache Cache instance. - * @param syncMode Write synchronization mode. - * @param op Update operation. - * @param key Keys to update. - * @param val Values or transform closure. - * @param invokeArgs Optional arguments for entry processor. - * @param retval Return value require flag. - * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. - * @param expiryPlc Expiry policy explicitly specified for cache operation. - * @param filter Entry filter. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param skipStore Skip store flag. - * @param keepBinary Keep binary flag. - * @param remapCnt Maximum number of retries. - * @param waitTopFut If {@code false} does not wait for affinity change future. - */ - public GridNearAtomicSingleUpdateFuture( - GridCacheContext cctx, - GridDhtAtomicCache cache, - CacheWriteSynchronizationMode syncMode, - GridCacheOperation op, - Object key, - @Nullable Object val, - @Nullable Object[] invokeArgs, - final boolean retval, - final boolean rawRetval, - @Nullable ExpiryPolicy expiryPlc, - final CacheEntryPredicate[] filter, - UUID subjId, - int taskNameHash, - boolean skipStore, - boolean keepBinary, - int remapCnt, - boolean waitTopFut - ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, remapCnt, waitTopFut); - - assert subjId != null; - - this.key = key; - this.val = val; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - synchronized (mux) { - return futVer; - } - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; - - synchronized (mux) { - GridNearAtomicUpdateRequest req = this.req != null && this.req.nodeId().equals(nodeId) ? - this.req : null; - - if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - nodeId, - req.futureVersion(), - cctx.deploymentEnabled()); - - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + - "before response is received: " + nodeId); - - e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); - - res.addFailedKeys(req.keys(), e); - } - } - - if (res != null) - onResult(nodeId, res, true); - - return false; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - return null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - assert res == null || res instanceof GridCacheReturn; - - GridCacheReturn ret = (GridCacheReturn)res; - - Object retval = - res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? - cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); - - if (op == TRANSFORM && retval == null) - retval = Collections.emptyMap(); - - if (super.onDone(retval, err)) { - GridCacheVersion futVer = onFutureDone(); - - if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); - - return true; - } - - return false; - } - - /** - * Response callback. - * - * @param nodeId Node ID. - * @param res Update response. - * @param nodeErr {@code True} if response was created on node failure. - */ - @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { - GridNearAtomicUpdateRequest req; - - AffinityTopologyVersion remapTopVer = null; - - GridCacheReturn opRes0 = null; - CachePartialUpdateCheckedException err0 = null; - - GridFutureAdapter<?> fut0 = null; - - synchronized (mux) { - if (!res.futureVersion().equals(futVer)) - return; - - if (!this.req.nodeId().equals(nodeId)) - return; - - req = this.req; - - this.req = null; - - boolean remapKey = !F.isEmpty(res.remapKeys()); - - if (remapKey) { - if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) - mapErrTopVer = req.topologyVersion(); - } - else if (res.error() != null) { - if (res.failedKeys() != null) { - if (err == null) - err = new CachePartialUpdateCheckedException( - "Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); - - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); - - err.add(keys, res.error(), req.topologyVersion()); - } - } - else { - if (!req.fastMap() || req.hasPrimary()) { - GridCacheReturn ret = res.returnValue(); - - if (op == TRANSFORM) { - if (ret != null) { - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } - } - else - opRes = ret; - } - } - - if (remapKey) { - assert mapErrTopVer != null; - - remapTopVer = cctx.shared().exchange().topologyVersion(); - } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - --remapCnt > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); - - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); - - assert cause != null && cause.topologyVersion() != null : err; - - remapTopVer = - new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); - - err = null; - updVer = null; - } - } - } - - if (remapTopVer == null) { - err0 = err; - opRes0 = opRes; - } - else { - fut0 = topCompleteFut; - - topCompleteFut = null; - - cctx.mvcc().removeAtomicFuture(futVer); - - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } - } - - if (res.error() != null && res.failedKeys() == null) { - onDone(res.error()); - - return; - } - - if (nearEnabled && !nodeErr) - updateNear(req, res); - - if (remapTopVer != null) { - if (fut0 != null) - fut0.onDone(); - - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); - - return; - } - - if (topLocked) { - CachePartialUpdateCheckedException e = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - - ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( - "Failed to update keys, topology changed while execute atomic update inside transaction."); - - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); - - e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); - - onDone(e); - - return; - } - - IgniteInternalFuture<AffinityTopologyVersion> fut = - cctx.shared().exchange().affinityReadyFuture(remapTopVer); - - if (fut == null) - fut = new GridFinishedFuture<>(remapTopVer); - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - }); - } - }); - - return; - } - - onDone(opRes0, err0); - } - - /** - * Updates near cache. - * - * @param req Update request. - * @param res Update response. - */ - private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - assert nearEnabled; - - if (res.remapKeys() != null || !req.hasPrimary()) - return; - - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - - near.processNearAtomicUpdateResponse(req, res); - } - - /** {@inheritDoc} */ - @Override protected void mapOnTopology() { - cache.topology().readLock(); - - AffinityTopologyVersion topVer = null; - - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); - - return; - } - - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); - - if (err != null) { - onDone(err); - - return; - } - - topVer = fut.topologyVersion(); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); - - return; - } - } - finally { - cache.topology().readUnlock(); - } - - map(topVer); - } - - /** - * Maps future to single node. - * - * @param nodeId Node ID. - * @param req Request. - */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); - } - }); - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - } - catch (IgniteCheckedException e) { - onSendError(req, e); - } - } - } - - /** - * @param req Request. - * @param e Error. - */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { - synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion(), - cctx.deploymentEnabled()); - - res.addFailedKeys(req.keys(), e); - - onResult(req.nodeId(), res, true); - } - } - - /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); - - return; - } - - Exception err = null; - GridNearAtomicUpdateRequest singleReq0 = null; - - GridCacheVersion futVer = cctx.versions().next(topVer); - - GridCacheVersion updVer; - - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { - updVer = this.updVer; - - if (updVer == null) { - updVer = cctx.versions().next(topVer); - - if (log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); - } - } - else - updVer = null; - - try { - singleReq0 = mapSingleUpdate(topVer, futVer, updVer); - - synchronized (mux) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; - this.updVer = updVer; - this.futVer = futVer; - - resCnt = 0; - - req = singleReq0; - } - } - catch (Exception e) { - err = e; - } - - if (err != null) { - onDone(err); - - return; - } - - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, this)) { - assert isDone() : this; - - return; - } - } - - // Optimize mapping for single key. - mapSingle(singleReq0.nodeId(), singleReq0); - } - - /** - * @return Future version. - */ - GridCacheVersion onFutureDone() { - GridCacheVersion ver0; - - GridFutureAdapter<Void> fut0; - - synchronized (mux) { - fut0 = topCompleteFut; - - topCompleteFut = null; - - ver0 = futVer; - - futVer = null; - } - - if (fut0 != null) - fut0.onDone(); - - return ver0; - } - - /** - * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. - * @return Request. - * @throws Exception If failed. - */ - private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer) throws Exception { - if (key == null) - throw new NullPointerException("Null key."); - - Object val = this.val; - - if (val == null && op != GridCacheOperation.DELETE) - throw new NullPointerException("Null value."); - - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - - if (op != TRANSFORM) - val = cctx.toCacheObject(val); - - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); - - if (primary == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid)."); - - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - false, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); - - req.addUpdateEntry(cacheKey, - val, - CU.TTL_NOT_CHANGED, - CU.EXPIRE_TIME_CALCULATE, - null, - true); - - return req; - } - - /** {@inheritDoc} */ - public String toString() { - synchronized (mux) { - return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8aabbf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index edebd8c..9955df7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -23,8 +23,10 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -34,7 +36,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; @@ -53,19 +57,34 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture { - /** Fast map flag. */ - private final boolean fastMap; +public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object>{ + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + private final GridCacheContext cctx; + + /** Cache. */ + private GridDhtAtomicCache cache; + + /** Update operation. */ + private final GridCacheOperation op; /** Keys */ private Collection<?> keys; @@ -74,6 +93,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<?> vals; + /** Optional arguments for entry processor. */ + private Object[] invokeArgs; + /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheDrInfo> conflictPutVals; @@ -82,16 +104,85 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheVersion> conflictRmvVals; + /** Return value require flag. */ + private final boolean retval; + + /** Expiry policy. */ + private final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + private final CacheEntryPredicate[] filter; + + /** Write synchronization mode. */ + private final CacheWriteSynchronizationMode syncMode; + + /** Raw return value flag. */ + private final boolean rawRetval; + + /** Fast map flag. */ + private final boolean fastMap; + + /** Near cache flag. */ + private final boolean nearEnabled; + + /** Subject ID. */ + private final UUID subjId; + + /** Task name hash. */ + private final int taskNameHash; + + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + + /** Skip store flag. */ + private final boolean skipStore; + + /** */ + private final boolean keepBinary; + + /** Wait for topology future flag. */ + private final boolean waitTopFut; + + /** Remap count. */ + private int remapCnt; + + /** Mutex to synchronize state updates. */ + private final Object mux = new Object(); + + /** Current topology version. */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + private GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + private AffinityTopologyVersion mapErrTopVer; + /** Mappings if operations is mapped to more than one node. */ @GridToStringInclude private Map<UUID, GridNearAtomicUpdateRequest> mappings; + /** */ + private int resCnt; + + /** Error. */ + private CachePartialUpdateCheckedException err; + + /** Future ID. */ + private GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; + /** Keys to remap. */ private Collection<KeyCacheObject> remapKeys; /** Not null is operation is mapped to single node. */ private GridNearAtomicUpdateRequest singleReq; + /** Operation result. */ + private GridCacheReturn opRes; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -134,20 +225,49 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu int remapCnt, boolean waitTopFut ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, remapCnt, waitTopFut); + this.rawRetval = rawRetval; assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert subjId != null; + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; this.keys = keys; this.vals = vals; + this.invokeArgs = invokeArgs; this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.waitTopFut = waitTopFut; + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - fastMap = cache.isFastMap(filter, op); + fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && + cctx.config().getAtomicWriteOrderMode() == CLOCK && + !(cctx.writeThrough() && cctx.config().getInterceptor() != null); + + nearEnabled = CU.isNearEnabled(cctx); + + if (!waitTopFut) + remapCnt = 1; + + this.remapCnt = remapCnt; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -157,6 +277,19 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } } + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { + // Wait fast-map near atomic update futures in CLOCK mode. + return fastMap; + } + + /** {@inheritDoc} */ + @Override public Collection<?> keys() { + return keys; + } + /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridNearAtomicUpdateResponse res = null; @@ -191,21 +324,37 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - // Wait fast-map near atomic update futures in CLOCK mode. - if (fastMap) { - GridFutureAdapter<Void> fut; + @Override public boolean trackable() { + return true; + } - synchronized (mux) { - if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } - fut = topCompleteFut; - } - else - fut = null; - } + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); + + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; + + // Cannot remap. + remapCnt = 1; + + map(topVer, null); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange()) { + GridFutureAdapter<Void> fut = completeFuture0(topVer); if (fut != null && isDone()) { fut.onDone(); @@ -305,34 +454,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { - if (res.failedKeys() != null) { - if (err == null) - err = new CachePartialUpdateCheckedException( - "Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); - - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); - - err.add(keys, res.error(), req.topologyVersion()); - } + if (res.failedKeys() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); } else { if (!req.fastMap() || req.hasPrimary()) { GridCacheReturn ret = res.returnValue(); if (op == TRANSFORM) { - if (ret != null) { - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } + if (ret != null) + addInvokeResults(ret); } else opRes = ret; @@ -489,8 +620,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu near.processNearAtomicUpdateResponse(req, res); } - /** {@inheritDoc} */ - @Override protected void mapOnTopology() { + /** + * Maps future on ready topology. + */ + private void mapOnTopology() { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -544,6 +677,35 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + private boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } + + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. + * @return Collection of nodes to which key is mapped. + */ + private Collection<ClusterNode> mapKey( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean fastMap + ) { + GridCacheAffinityManager affMgr = cctx.affinity(); + + // If we can send updates in parallel - do it. + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } + + /** * Maps future to single node. * * @param nodeId Node ID. @@ -635,11 +797,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } } - /** {@inheritDoc} */ - protected void map(AffinityTopologyVersion topVer) { - map(topVer, null); - } - /** * @param topVer Topology version. * @param remapKeys Keys to remap. @@ -757,6 +914,26 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) { + synchronized (mux) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + + return null; + } + } + + /** * @return Future version. */ GridCacheVersion onFutureDone() { @@ -862,7 +1039,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); if (affNodes.isEmpty()) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -1013,19 +1190,41 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @return Collection of nodes to which key is mapped. + * @param ret Result from single node. */ - private Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { - GridCacheAffinityManager affMgr = cctx.affinity(); + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } - // If we can send updates in parallel - do it. - return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; + + if (err0 == null) + err0 = this.err = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(failedKeys.size()); + + for (KeyCacheObject key : failedKeys) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err0.add(keys, err, topVer); } /** {@inheritDoc} */
