Repository: ignite Updated Branches: refs/heads/master 186c86049 -> e833eb2d3
IGNITE-2926: Implemented special version of single-update future for ATOMIC cache. Used for PRIMARY mode and only in case of a single key-value pair update. Gives 3% garbage reduction. Not much, though, the main goal for this change is not perofmance, but rather infrastructure for further improvements. Namely, for single-update request which will be initiated only form this future. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e833eb2d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e833eb2d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e833eb2d Branch: refs/heads/master Commit: e833eb2d3d09112ddca994c17b73441df20524a3 Parents: 186c860 Author: vozerov-gridgain <[email protected]> Authored: Wed Apr 20 10:06:55 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Apr 20 10:06:55 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheOperationFilter.java | 61 ++ .../processors/cache/GridCacheAtomicFuture.java | 5 - .../dht/atomic/GridDhtAtomicCache.java | 107 ++- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 - .../GridNearAtomicAbstractUpdateFuture.java | 244 +++++++ .../GridNearAtomicSingleUpdateFuture.java | 645 +++++++++++++++++++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 319 ++------- 7 files changed, 1085 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java new file mode 100644 index 0000000..7fdfaac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.jetbrains.annotations.Nullable; + +/** + * Cache operation filter. + */ +public enum CacheOperationFilter { + /** Always pass. */ + ALWAYS, + + /** No value. */ + NO_VAL, + + /** Has value. */ + HAS_VAL, + + /** Equals to value. */ + EQUALS_VAL; + + /** + * Creare predicate from operation filter. + * + * @param val Optional value. + * @return Predicate. + */ + @Nullable public CacheEntryPredicate createPredicate(@Nullable CacheObject val) { + switch (this) { + case ALWAYS: + return null; + + case NO_VAL: + return new CacheEntryPredicateNoValue(); + + case HAS_VAL: + return new CacheEntryPredicateHasValue(); + + default: + assert this == EQUALS_VAL; + + return new CacheEntryPredicateContainsValue(val); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 359909e..c96d00f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -38,9 +38,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { * @return Future or {@code null} if no need to wait. */ public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer); - - /** - * @return Future keys. - */ - public Collection<?> keys(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4f8b32c..013184b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -984,7 +984,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1016,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, null, null, null, retval, filter, true); if (statsEnabled) @@ -1043,7 +1043,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param waitTopFut Whether to wait for topology future. * @return Future. */ - private GridNearAtomicUpdateFuture createSingleUpdateFuture( + private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( K key, @Nullable V val, @Nullable EntryProcessor proc, @@ -1055,19 +1055,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheOperationContext opCtx = ctx.operationContextPerCall(); GridCacheOperation op; - Collection vals; + Object val0; if (val != null) { op = UPDATE; - vals = Collections.singletonList(val); + val0 = val; } else if (proc != null) { op = TRANSFORM; - vals = Collections.singletonList(proc); + val0 = proc; } else { op = DELETE; - vals = null; + val0 = null; } GridCacheDrInfo conflictPutVal = null; @@ -1081,37 +1081,75 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (op == UPDATE) { conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), ctx.versions().next(dcId)); - vals = null; + val0 = null; } else if (op == GridCacheOperation.TRANSFORM) { conflictPutVal = new GridCacheDrInfo(proc, ctx.versions().next(dcId)); - vals = null; + val0 = null; } else conflictRmvVer = ctx.versions().next(dcId); } - return new GridNearAtomicUpdateFuture( - ctx, - this, - ctx.config().getWriteSynchronizationMode(), - op, - Collections.singletonList(key), - vals, - invokeArgs, - conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, - conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, - retval, - false, - opCtx != null ? opCtx.expiry() : null, - CU.filterArray(filter), - ctx.subjectIdPerCall(null, opCtx), - ctx.kernalContext().job().currentTaskNameHash(), - opCtx != null && opCtx.skipStore(), - opCtx != null && opCtx.isKeepBinary(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + CacheEntryPredicate[] filters = CU.filterArray(filter); + + if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) { + return new GridNearAtomicSingleUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + op, + key, + val0, + invokeArgs, + retval, + false, + opCtx != null ? opCtx.expiry() : null, + filters, + ctx.subjectIdPerCall(null, opCtx), + ctx.kernalContext().job().currentTaskNameHash(), + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut + ); + } + else { + return new GridNearAtomicUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + op, + Collections.singletonList(key), + val0 != null ? Collections.singletonList(val0) : null, + invokeArgs, + conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, + conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, + retval, + false, + opCtx != null ? opCtx.expiry() : null, + filters, + ctx.subjectIdPerCall(null, opCtx), + ctx.kernalContext().job().currentTaskNameHash(), + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); + } + } + + /** + * Whether this is fast-map operation. + * + * @param filters Filters. + * @param op Operation. + * @return {@code True} if fast-map. + */ + public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) { + return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC && + ctx.config().getAtomicWriteOrderMode() == CLOCK && + !(ctx.writeThrough() && ctx.config().getInterceptor() != null); } /** @@ -2893,10 +2931,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridNearAtomicAbstractUpdateFuture fut = + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (fut != null) - fut.onResult(nodeId, res, false); + if (fut != null) { + if (fut instanceof GridNearAtomicSingleUpdateFuture) + ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false); + else + ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false); + } else U.warn(log, "Failed to find near update future for update response (will ignore) " + "[nodeId=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 9f52658..4721d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -212,11 +212,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return null; } - /** {@inheritDoc} */ - @Override public Collection<KeyCacheObject> keys() { - return keys; - } - /** * @param entry Entry to map. * @param val Value to write. http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java new file mode 100644 index 0000000..7f52299 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; + +/** + * Base for near atomic update futures. + */ +public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object> { + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + protected final GridCacheContext cctx; + + /** Cache. */ + protected final GridDhtAtomicCache cache; + + /** Write synchronization mode. */ + protected final CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected final GridCacheOperation op; + + /** Optional arguments for entry processor. */ + protected final Object[] invokeArgs; + + /** Return value require flag. */ + protected final boolean retval; + + /** Raw return value flag. */ + protected final boolean rawRetval; + + /** Expiry policy. */ + protected final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + protected final CacheEntryPredicate[] filter; + + /** Subject ID. */ + protected final UUID subjId; + + /** Task name hash. */ + protected final int taskNameHash; + + /** Skip store flag. */ + protected final boolean skipStore; + + /** Keep binary flag. */ + protected final boolean keepBinary; + + /** Wait for topology future flag. */ + protected final boolean waitTopFut; + + /** Near cache flag. */ + protected final boolean nearEnabled; + + /** Mutex to synchronize state updates. */ + protected final Object mux = new Object(); + + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + protected boolean topLocked; + + /** Remap count. */ + protected int remapCnt; + + /** Current topology version. */ + protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + protected GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + protected AffinityTopologyVersion mapErrTopVer; + + /** */ + protected int resCnt; + + /** Error. */ + protected CachePartialUpdateCheckedException err; + + /** Future ID. */ + protected GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + protected GridFutureAdapter<Void> topCompleteFut; + + /** Operation result. */ + protected GridCacheReturn opRes; + + /** + * Constructor. + * + * @param cctx Cache context. + * @param cache Cache. + * @param syncMode Synchronization mode. + * @param op Operation. + * @param invokeArgs Invoke arguments. + * @param retval Return value flag. + * @param rawRetval Raw return value flag. + * @param expiryPlc Expiry policy. + * @param filter Filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. + * @param remapCnt Remap count. + * @param waitTopFut Wait topology future flag. + */ + protected GridNearAtomicAbstractUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + @Nullable Object[] invokeArgs, + boolean retval, + boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + int remapCnt, + boolean waitTopFut + ) { + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.invokeArgs = invokeArgs; + this.retval = retval; + this.rawRetval = rawRetval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.waitTopFut = waitTopFut; + + nearEnabled = CU.isNearEnabled(cctx); + + if (!waitTopFut) + remapCnt = 1; + + this.remapCnt = remapCnt; + } + + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); + + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; + + // Cannot remap. + remapCnt = 1; + + map(topVer); + } + } + + /** + * @param topVer Topology version. + */ + protected abstract void map(AffinityTopologyVersion topVer); + + /** + * Maps future on ready topology. + */ + protected abstract void mapOnTopology(); + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + protected boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java new file mode 100644 index 0000000..abfc5c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -0,0 +1,645 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + +/** + * DHT atomic cache near update future. + */ +public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { + /** Keys */ + private Object key; + + /** Values. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private Object val; + + /** Not null is operation is mapped to single node. */ + private GridNearAtomicUpdateRequest req; + + /** + * @param cctx Cache context. + * @param cache Cache instance. + * @param syncMode Write synchronization mode. + * @param op Update operation. + * @param key Keys to update. + * @param val Values or transform closure. + * @param invokeArgs Optional arguments for entry processor. + * @param retval Return value require flag. + * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. + * @param expiryPlc Expiry policy explicitly specified for cache operation. + * @param filter Entry filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. + * @param remapCnt Maximum number of retries. + * @param waitTopFut If {@code false} does not wait for affinity change future. + */ + public GridNearAtomicSingleUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + Object key, + @Nullable Object val, + @Nullable Object[] invokeArgs, + final boolean retval, + final boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + final CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + int remapCnt, + boolean waitTopFut + ) { + super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, + skipStore, keepBinary, remapCnt, waitTopFut); + + assert subjId != null; + + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + synchronized (mux) { + return futVer; + } + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + GridNearAtomicUpdateResponse res = null; + + synchronized (mux) { + GridNearAtomicUpdateRequest req = this.req != null && this.req.nodeId().equals(nodeId) ? + this.req : null; + + if (req != null && req.response() == null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + nodeId, + req.futureVersion(), + cctx.deploymentEnabled()); + + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + nodeId); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); + } + } + + if (res != null) + onResult(nodeId, res, true); + + return false; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert res == null || res instanceof GridCacheReturn; + + GridCacheReturn ret = (GridCacheReturn)res; + + Object retval = + res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? + cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); + + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); + + if (super.onDone(retval, err)) { + GridCacheVersion futVer = onFutureDone(); + + if (futVer != null) + cctx.mvcc().removeAtomicFuture(futVer); + + return true; + } + + return false; + } + + /** + * Response callback. + * + * @param nodeId Node ID. + * @param res Update response. + * @param nodeErr {@code True} if response was created on node failure. + */ + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; + + AffinityTopologyVersion remapTopVer = null; + + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + + GridFutureAdapter<?> fut0 = null; + + synchronized (mux) { + if (!res.futureVersion().equals(futVer)) + return; + + if (!this.req.nodeId().equals(nodeId)) + return; + + req = this.req; + + this.req = null; + + boolean remapKey = !F.isEmpty(res.remapKeys()); + + if (remapKey) { + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) { + if (res.failedKeys() != null) { + if (err == null) + err = new CachePartialUpdateCheckedException( + "Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + + for (KeyCacheObject key : res.failedKeys()) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err.add(keys, res.error(), req.topologyVersion()); + } + } + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) { + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } + } + else + opRes = ret; + } + } + + if (remapKey) { + assert mapErrTopVer != null; + + remapTopVer = cctx.shared().exchange().topologyVersion(); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + updVer = null; + } + } + } + + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } + } + + if (res.error() != null && res.failedKeys() == null) { + onDone(res.error()); + + return; + } + + if (nearEnabled && !nodeErr) + updateNear(req, res); + + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); + + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); + + onDone(e); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = + cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + AffinityTopologyVersion topVer = fut.get(); + + map(topVer); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); + + return; + } + + onDone(opRes0, err0); + } + + /** + * Updates near cache. + * + * @param req Update request. + * @param res Update response. + */ + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + assert nearEnabled; + + if (res.remapKeys() != null || !req.hasPrimary()) + return; + + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); + + near.processNearAtomicUpdateResponse(req, res); + } + + /** {@inheritDoc} */ + @Override protected void mapOnTopology() { + cache.topology().readLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); + + return; + } + + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); + + return; + } + + topVer = fut.topologyVersion(); + } + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; + } + } + finally { + cache.topology().readUnlock(); + } + + map(topVer); + } + + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res, false); + } + }); + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, true, null, true)); + } + catch (IgniteCheckedException e) { + onSendError(req, e); + } + } + } + + /** + * @param req Request. + * @param e Error. + */ + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (mux) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion(), + cctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + onResult(req.nodeId(), res, true); + } + } + + /** {@inheritDoc} */ + protected void map(AffinityTopologyVersion topVer) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); + + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); + + return; + } + + Exception err = null; + GridNearAtomicUpdateRequest singleReq0 = null; + + GridCacheVersion futVer = cctx.versions().next(topVer); + + GridCacheVersion updVer; + + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { + updVer = this.updVer; + + if (updVer == null) { + updVer = cctx.versions().next(topVer); + + if (log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); + } + } + else + updVer = null; + + try { + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + + synchronized (mux) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.updVer = updVer; + this.futVer = futVer; + + resCnt = 0; + + req = singleReq0; + } + } + catch (Exception e) { + err = e; + } + + if (err != null) { + onDone(err); + + return; + } + + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, this)) { + assert isDone() : this; + + return; + } + } + + // Optimize mapping for single key. + mapSingle(singleReq0.nodeId(), singleReq0); + } + + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; + + GridFutureAdapter<Void> fut0; + + synchronized (mux) { + fut0 = topCompleteFut; + + topCompleteFut = null; + + ver0 = futVer; + + futVer = null; + } + + if (fut0 != null) + fut0.onDone(); + + return ver0; + } + + /** + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @return Request. + * @throws Exception If failed. + */ + private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer) throws Exception { + if (key == null) + throw new NullPointerException("Null key."); + + Object val = this.val; + + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); + + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + false, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + req.addUpdateEntry(cacheKey, + val, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + true); + + return req; + } + + /** {@inheritDoc} */ + public String toString() { + synchronized (mux) { + return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e833eb2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9955df7..edebd8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -23,10 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -36,9 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; -import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; @@ -57,34 +53,19 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<Object>{ - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Cache context. */ - private final GridCacheContext cctx; - - /** Cache. */ - private GridDhtAtomicCache cache; - - /** Update operation. */ - private final GridCacheOperation op; +public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture { + /** Fast map flag. */ + private final boolean fastMap; /** Keys */ private Collection<?> keys; @@ -93,9 +74,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<?> vals; - /** Optional arguments for entry processor. */ - private Object[] invokeArgs; - /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheDrInfo> conflictPutVals; @@ -104,85 +82,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheVersion> conflictRmvVals; - /** Return value require flag. */ - private final boolean retval; - - /** Expiry policy. */ - private final ExpiryPolicy expiryPlc; - - /** Optional filter. */ - private final CacheEntryPredicate[] filter; - - /** Write synchronization mode. */ - private final CacheWriteSynchronizationMode syncMode; - - /** Raw return value flag. */ - private final boolean rawRetval; - - /** Fast map flag. */ - private final boolean fastMap; - - /** Near cache flag. */ - private final boolean nearEnabled; - - /** Subject ID. */ - private final UUID subjId; - - /** Task name hash. */ - private final int taskNameHash; - - /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ - private boolean topLocked; - - /** Skip store flag. */ - private final boolean skipStore; - - /** */ - private final boolean keepBinary; - - /** Wait for topology future flag. */ - private final boolean waitTopFut; - - /** Remap count. */ - private int remapCnt; - - /** Mutex to synchronize state updates. */ - private final Object mux = new Object(); - - /** Current topology version. */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** */ - private GridCacheVersion updVer; - - /** Topology version when got mapping error. */ - private AffinityTopologyVersion mapErrTopVer; - /** Mappings if operations is mapped to more than one node. */ @GridToStringInclude private Map<UUID, GridNearAtomicUpdateRequest> mappings; - /** */ - private int resCnt; - - /** Error. */ - private CachePartialUpdateCheckedException err; - - /** Future ID. */ - private GridCacheVersion futVer; - - /** Completion future for a particular topology version. */ - private GridFutureAdapter<Void> topCompleteFut; - /** Keys to remap. */ private Collection<KeyCacheObject> remapKeys; /** Not null is operation is mapped to single node. */ private GridNearAtomicUpdateRequest singleReq; - /** Operation result. */ - private GridCacheReturn opRes; - /** * @param cctx Cache context. * @param cache Cache instance. @@ -225,49 +134,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> int remapCnt, boolean waitTopFut ) { - this.rawRetval = rawRetval; + super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, + skipStore, keepBinary, remapCnt, waitTopFut); assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert subjId != null; - this.cctx = cctx; - this.cache = cache; - this.syncMode = syncMode; - this.op = op; this.keys = keys; this.vals = vals; - this.invokeArgs = invokeArgs; this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; - this.retval = retval; - this.expiryPlc = expiryPlc; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.waitTopFut = waitTopFut; - - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && - cctx.config().getAtomicWriteOrderMode() == CLOCK && - !(cctx.writeThrough() && cctx.config().getInterceptor() != null); - - nearEnabled = CU.isNearEnabled(cctx); - - if (!waitTopFut) - remapCnt = 1; - - this.remapCnt = remapCnt; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - throw new UnsupportedOperationException(); + fastMap = cache.isFastMap(filter, op); } /** {@inheritDoc} */ @@ -277,19 +157,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } - /** - * @return {@code True} if this future should block partition map exchange. - */ - private boolean waitForPartitionExchange() { - // Wait fast-map near atomic update futures in CLOCK mode. - return fastMap; - } - - /** {@inheritDoc} */ - @Override public Collection<?> keys() { - return keys; - } - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridNearAtomicUpdateResponse res = null; @@ -324,37 +191,21 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. - } - - /** - * Performs future mapping. - */ - public void map() { - AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); - - if (topVer == null) - mapOnTopology(); - else { - topLocked = true; - - // Cannot remap. - remapCnt = 1; + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + // Wait fast-map near atomic update futures in CLOCK mode. + if (fastMap) { + GridFutureAdapter<Void> fut; - map(topVer, null); - } - } + synchronized (mux) { + if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - if (waitForPartitionExchange()) { - GridFutureAdapter<Void> fut = completeFuture0(topVer); + fut = topCompleteFut; + } + else + fut = null; + } if (fut != null && isDone()) { fut.onDone(); @@ -454,16 +305,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { - if (res.failedKeys() != null) - addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + if (res.failedKeys() != null) { + if (err == null) + err = new CachePartialUpdateCheckedException( + "Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + + for (KeyCacheObject key : res.failedKeys()) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err.add(keys, res.error(), req.topologyVersion()); + } } else { if (!req.fastMap() || req.hasPrimary()) { GridCacheReturn ret = res.returnValue(); if (op == TRANSFORM) { - if (ret != null) - addInvokeResults(ret); + if (ret != null) { + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } } else opRes = ret; @@ -620,10 +489,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> near.processNearAtomicUpdateResponse(req, res); } - /** - * Maps future on ready topology. - */ - private void mapOnTopology() { + /** {@inheritDoc} */ + @Override protected void mapOnTopology() { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -677,35 +544,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - private boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; - } - - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. - * @return Collection of nodes to which key is mapped. - */ - private Collection<ClusterNode> mapKey( - KeyCacheObject key, - AffinityTopologyVersion topVer, - boolean fastMap - ) { - GridCacheAffinityManager affMgr = cctx.affinity(); - - // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); - } - - /** * Maps future to single node. * * @param nodeId Node ID. @@ -797,6 +635,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } + /** {@inheritDoc} */ + protected void map(AffinityTopologyVersion topVer) { + map(topVer, null); + } + /** * @param topVer Topology version. * @param remapKeys Keys to remap. @@ -914,26 +757,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param topVer Topology version. - * @return Future. - */ - @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) { - synchronized (mux) { - if (this.topVer == AffinityTopologyVersion.ZERO) - return null; - - if (this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); - - return topCompleteFut; - } - - return null; - } - } - - /** * @return Future version. */ GridCacheVersion onFutureDone() { @@ -1039,7 +862,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); if (affNodes.isEmpty()) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -1190,41 +1013,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } - - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @return Collection of nodes to which key is mapped. */ - private void addFailedKeys(Collection<KeyCacheObject> failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; - - if (err0 == null) - err0 = this.err = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(failedKeys.size()); - - for (KeyCacheObject key : failedKeys) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + private Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { + GridCacheAffinityManager affMgr = cctx.affinity(); - err0.add(keys, err, topVer); + // If we can send updates in parallel - do it. + return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); } /** {@inheritDoc} */
