Finalization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b78262c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b78262c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b78262c Branch: refs/heads/ignite-2926 Commit: 4b78262c678c5f12bb6952576b7cdf7c87c7c346 Parents: c8b1bb1 Author: vozerov-gridgain <[email protected]> Authored: Fri Apr 15 14:34:19 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Apr 15 14:34:19 2016 +0300 ---------------------------------------------------------------------- .../GridAbstractNearAtomicUpdateFuture.java | 220 ----------- .../dht/atomic/GridDhtAtomicCache.java | 107 ++++-- .../GridNearAtomicAbstractUpdateFuture.java | 217 +++++++++++ .../GridNearAtomicSingleUpdateFuture.java | 363 +++---------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 53 +-- 5 files changed, 375 insertions(+), 585 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java deleted file mode 100644 index 3088d05..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; -import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -import javax.cache.expiry.ExpiryPolicy; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; - -/** - * Base for near atomic update futures. - */ -public abstract class GridAbstractNearAtomicUpdateFuture extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<Object> { - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Cache context. */ - protected final GridCacheContext cctx; - - /** Cache. */ - protected final GridDhtAtomicCache cache; - - /** Write synchronization mode. */ - protected final CacheWriteSynchronizationMode syncMode; - - /** Update operation. */ - protected final GridCacheOperation op; - - /** Return value require flag. */ - protected final boolean retval; - - /** Raw return value flag. */ - protected final boolean rawRetval; - - /** Expiry policy. */ - protected final ExpiryPolicy expiryPlc; - - /** Optional filter. */ - protected final CacheEntryPredicate[] filter; - - /** Subject ID. */ - protected final UUID subjId; - - /** Task name hash. */ - protected final int taskNameHash; - - /** Skip store flag. */ - protected final boolean skipStore; - - /** Keep binary flag. */ - protected final boolean keepBinary; - - /** Wait for topology future flag. */ - protected final boolean waitTopFut; - - /** Fast map flag. */ - protected final boolean fastMap; - - /** Near cache flag. */ - protected final boolean nearEnabled; - - /** Mutex to synchronize state updates. */ - protected final Object mux = new Object(); - - /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ - protected boolean topLocked; - - /** Remap count. */ - protected int remapCnt; - - /** Current topology version. */ - protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** */ - protected GridCacheVersion updVer; - - /** Topology version when got mapping error. */ - protected AffinityTopologyVersion mapErrTopVer; - - /** */ - protected int resCnt; - - /** Error. */ - protected CachePartialUpdateCheckedException err; - - /** Future ID. */ - protected GridCacheVersion futVer; - - /** Completion future for a particular topology version. */ - protected GridFutureAdapter<Void> topCompleteFut; - - /** Operation result. */ - protected GridCacheReturn opRes; - - /** - * Constructor. - */ - protected GridAbstractNearAtomicUpdateFuture( - GridCacheContext cctx, - GridDhtAtomicCache cache, - CacheWriteSynchronizationMode syncMode, - GridCacheOperation op, - boolean retval, - boolean rawRetval, - @Nullable ExpiryPolicy expiryPlc, - CacheEntryPredicate[] filter, - UUID subjId, - int taskNameHash, - boolean skipStore, - boolean keepBinary, - boolean waitTopFut - ) { - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - - this.cctx = cctx; - this.cache = cache; - this.syncMode = syncMode; - this.op = op; - this.retval = retval; - this.rawRetval = rawRetval; - this.expiryPlc = expiryPlc; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.waitTopFut = waitTopFut; - - fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && - cctx.config().getAtomicWriteOrderMode() == CLOCK && - !(cctx.writeThrough() && cctx.config().getInterceptor() != null); - - nearEnabled = CU.isNearEnabled(cctx); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. - } - - /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - protected boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; - } - - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @return Collection of nodes to which key is mapped. - */ - protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { - GridCacheAffinityManager affMgr = cctx.affinity(); - - // If we can send updates in parallel - do it. - return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4f8b32c..013184b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -984,7 +984,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1016,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - final GridNearAtomicUpdateFuture updateFut = + final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, null, null, null, retval, filter, true); if (statsEnabled) @@ -1043,7 +1043,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param waitTopFut Whether to wait for topology future. * @return Future. */ - private GridNearAtomicUpdateFuture createSingleUpdateFuture( + private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( K key, @Nullable V val, @Nullable EntryProcessor proc, @@ -1055,19 +1055,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheOperationContext opCtx = ctx.operationContextPerCall(); GridCacheOperation op; - Collection vals; + Object val0; if (val != null) { op = UPDATE; - vals = Collections.singletonList(val); + val0 = val; } else if (proc != null) { op = TRANSFORM; - vals = Collections.singletonList(proc); + val0 = proc; } else { op = DELETE; - vals = null; + val0 = null; } GridCacheDrInfo conflictPutVal = null; @@ -1081,37 +1081,75 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (op == UPDATE) { conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), ctx.versions().next(dcId)); - vals = null; + val0 = null; } else if (op == GridCacheOperation.TRANSFORM) { conflictPutVal = new GridCacheDrInfo(proc, ctx.versions().next(dcId)); - vals = null; + val0 = null; } else conflictRmvVer = ctx.versions().next(dcId); } - return new GridNearAtomicUpdateFuture( - ctx, - this, - ctx.config().getWriteSynchronizationMode(), - op, - Collections.singletonList(key), - vals, - invokeArgs, - conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, - conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, - retval, - false, - opCtx != null ? opCtx.expiry() : null, - CU.filterArray(filter), - ctx.subjectIdPerCall(null, opCtx), - ctx.kernalContext().job().currentTaskNameHash(), - opCtx != null && opCtx.skipStore(), - opCtx != null && opCtx.isKeepBinary(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + CacheEntryPredicate[] filters = CU.filterArray(filter); + + if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) { + return new GridNearAtomicSingleUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + op, + key, + val0, + invokeArgs, + retval, + false, + opCtx != null ? opCtx.expiry() : null, + filters, + ctx.subjectIdPerCall(null, opCtx), + ctx.kernalContext().job().currentTaskNameHash(), + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut + ); + } + else { + return new GridNearAtomicUpdateFuture( + ctx, + this, + ctx.config().getWriteSynchronizationMode(), + op, + Collections.singletonList(key), + val0 != null ? Collections.singletonList(val0) : null, + invokeArgs, + conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, + conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, + retval, + false, + opCtx != null ? opCtx.expiry() : null, + filters, + ctx.subjectIdPerCall(null, opCtx), + ctx.kernalContext().job().currentTaskNameHash(), + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.isKeepBinary(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); + } + } + + /** + * Whether this is fast-map operation. + * + * @param filters Filters. + * @param op Operation. + * @return {@code True} if fast-map. + */ + public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) { + return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC && + ctx.config().getAtomicWriteOrderMode() == CLOCK && + !(ctx.writeThrough() && ctx.config().getInterceptor() != null); } /** @@ -2893,10 +2931,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridNearAtomicAbstractUpdateFuture fut = + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (fut != null) - fut.onResult(nodeId, res, false); + if (fut != null) { + if (fut instanceof GridNearAtomicSingleUpdateFuture) + ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false); + else + ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false); + } else U.warn(log, "Failed to find near update future for update response (will ignore) " + "[nodeId=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java new file mode 100644 index 0000000..0c40969 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; + +/** + * Base for near atomic update futures. + */ +public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object> { + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + protected final GridCacheContext cctx; + + /** Cache. */ + protected final GridDhtAtomicCache cache; + + /** Write synchronization mode. */ + protected final CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected final GridCacheOperation op; + + /** Return value require flag. */ + protected final boolean retval; + + /** Raw return value flag. */ + protected final boolean rawRetval; + + /** Expiry policy. */ + protected final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + protected final CacheEntryPredicate[] filter; + + /** Subject ID. */ + protected final UUID subjId; + + /** Task name hash. */ + protected final int taskNameHash; + + /** Skip store flag. */ + protected final boolean skipStore; + + /** Keep binary flag. */ + protected final boolean keepBinary; + + /** Wait for topology future flag. */ + protected final boolean waitTopFut; + + /** Near cache flag. */ + protected final boolean nearEnabled; + + /** Mutex to synchronize state updates. */ + protected final Object mux = new Object(); + + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + protected boolean topLocked; + + /** Remap count. */ + protected int remapCnt; + + /** Current topology version. */ + protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + protected GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + protected AffinityTopologyVersion mapErrTopVer; + + /** */ + protected int resCnt; + + /** Error. */ + protected CachePartialUpdateCheckedException err; + + /** Future ID. */ + protected GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + protected GridFutureAdapter<Void> topCompleteFut; + + /** Operation result. */ + protected GridCacheReturn opRes; + + /** + * Constructor. + */ + protected GridNearAtomicAbstractUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + boolean waitTopFut + ) { + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.retval = retval; + this.rawRetval = rawRetval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.waitTopFut = waitTopFut; + + nearEnabled = CU.isNearEnabled(cctx); + } + + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); + + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; + + // Cannot remap. + remapCnt = 1; + + map(topVer); + } + } + + /** + * @param topVer Topology version. + */ + protected abstract void map(AffinityTopologyVersion topVer); + + /** + * Maps future on ready topology. + */ + protected abstract void mapOnTopology(); + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + protected boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index ce5b41a..3917936 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -36,14 +36,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import javax.cache.expiry.ExpiryPolicy; @@ -55,13 +53,13 @@ import java.util.UUID; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ -public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpdateFuture { +// TODO: Only for !fastMap, only for !conflicts +public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { /** Keys */ private Object key; @@ -72,12 +70,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda /** Optional arguments for entry processor. */ private Object[] invokeArgs; - /** Mappings if operations is mapped to more than one node. */ - @GridToStringInclude - private Map<UUID, GridNearAtomicUpdateRequest> mappings; - /** Not null is operation is mapped to single node. */ - private GridNearAtomicUpdateRequest singleReq; + private GridNearAtomicUpdateRequest req; /** * @param cctx Cache context. @@ -144,12 +138,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda GridNearAtomicUpdateResponse res = null; synchronized (mux) { - GridNearAtomicUpdateRequest req; - - if (singleReq != null) - req = singleReq.nodeId().equals(nodeId) ? singleReq : null; - else - req = mappings != null ? mappings.get(nodeId) : null; + GridNearAtomicUpdateRequest req = this.req.nodeId().equals(nodeId) ? this.req : null; if (req != null && req.response() == null) { res = new GridNearAtomicUpdateResponse(cctx.cacheId(), @@ -172,50 +161,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda return false; } - /** - * Performs future mapping. - */ - public void map() { - AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); - - if (topVer == null) - mapOnTopology(); - else { - topLocked = true; - - // Cannot remap. - remapCnt = 1; - - map(topVer); - } - } - /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - // Wait fast-map near atomic update futures in CLOCK mode. - if (fastMap) { - GridFutureAdapter<Void> fut; - - synchronized (mux) { - if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); - - fut = topCompleteFut; - } - else - fut = null; - } - - if (fut != null && isDone()) { - fut.onDone(); - - return null; - } - - return fut; - } - return null; } @@ -261,41 +208,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; - boolean rcvAll; - GridFutureAdapter<?> fut0 = null; synchronized (mux) { if (!res.futureVersion().equals(futVer)) return; - if (singleReq != null) { - if (!singleReq.nodeId().equals(nodeId)) - return; - - req = singleReq; - - singleReq = null; + if (!this.req.nodeId().equals(nodeId)) + return; - rcvAll = true; - } - else { - req = mappings != null ? mappings.get(nodeId) : null; + req = this.req; - if (req != null && req.onResponse(res)) { - resCnt++; - - rcvAll = mappings.size() == resCnt; - } - else - return; - } - - assert req != null && req.topologyVersion().equals(topVer) : req; + this.req = null; if (res.remapKeys() != null) { - assert !fastMap || cctx.kernalContext().clientNode(); - if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) mapErrTopVer = req.topologyVersion(); } @@ -334,51 +260,49 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } } - if (rcvAll) { - if (res.remapKeys() != null) { - assert mapErrTopVer != null; + if (res.remapKeys() != null) { + assert mapErrTopVer != null; - remapTopVer = cctx.shared().exchange().topologyVersion(); - } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - --remapCnt > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); + remapTopVer = cctx.shared().exchange().topologyVersion(); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); - assert cause != null && cause.topologyVersion() != null : err; + assert cause != null && cause.topologyVersion() != null : err; - remapTopVer = - new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); - err = null; + err = null; - updVer = null; - } + updVer = null; } } + } - if (remapTopVer == null) { - err0 = err; - opRes0 = opRes; - } - else { - fut0 = topCompleteFut; + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; - topCompleteFut = null; + topCompleteFut = null; - cctx.mvcc().removeAtomicFuture(futVer); + cctx.mvcc().removeAtomicFuture(futVer); - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } + futVer = null; + topVer = AffinityTopologyVersion.ZERO; } } @@ -388,18 +312,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda return; } - if (rcvAll && nearEnabled) { - if (mappings != null) { - for (GridNearAtomicUpdateRequest req0 : mappings.values()) { - GridNearAtomicUpdateResponse res0 = req0.response(); + if (nearEnabled && !nodeErr) { + if (res.remapKeys() != null || !req.hasPrimary()) + return; - assert res0 != null : req0; + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - updateNear(req0, res0); - } - } - else if (!nodeErr) - updateNear(req, res); + near.processNearAtomicUpdateResponse(req, res); } if (remapTopVer != null) { @@ -454,31 +373,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda return; } - if (rcvAll) - onDone(opRes0, err0); + onDone(opRes0, err0); } - /** - * Updates near cache. - * - * @param req Update request. - * @param res Update response. - */ - private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - assert nearEnabled; - - if (res.remapKeys() != null || !req.hasPrimary()) - return; - - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - - near.processNearAtomicUpdateResponse(req, res); - } - - /** - * Maps future on ready topology. - */ - private void mapOnTopology() { + /** {@inheritDoc} */ + @Override protected void mapOnTopology() { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -563,50 +462,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } /** - * Sends messages to remote nodes and updates local cache. - * - * @param mappings Mappings to send. - */ - private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { - UUID locNodeId = cctx.localNodeId(); - - GridNearAtomicUpdateRequest locUpdate = null; - - // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest req : mappings.values()) { - if (locNodeId.equals(req.nodeId())) { - assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + - ", req=" + req + ']'; - - locUpdate = req; - } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - } - catch (IgniteCheckedException e) { - onSendError(req, e); - } - } - } - - if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); - } - }); - } - - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - } - - /** * @param req Request. * @param e Error. */ @@ -623,10 +478,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } } - /** - * @param topVer Topology version. - */ - void map(AffinityTopologyVersion topVer) { + /** {@inheritDoc} */ + protected void map(AffinityTopologyVersion topVer) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -638,7 +491,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda Exception err = null; GridNearAtomicUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; GridCacheVersion futVer = cctx.versions().next(topVer); @@ -659,31 +511,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda updVer = null; try { - if (!fastMap) - singleReq0 = mapSingleUpdate(topVer, futVer, updVer); - else { - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, - topVer, - futVer, - updVer); - - if (pendingMappings.size() == 1) - singleReq0 = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings0 = U.newHashMap(pendingMappings.size()); - - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings0.put(req.nodeId(), req); - } - } - else - mappings0 = pendingMappings; - - assert !mappings0.isEmpty() : this; - } - } + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); synchronized (mux) { assert this.futVer == null : this; @@ -695,8 +523,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda resCnt = 0; - singleReq = singleReq0; - mappings = mappings0; + req = singleReq0; } } catch (Exception e) { @@ -718,13 +545,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } // Optimize mapping for single key. - if (singleReq0 != null) - mapSingle(singleReq0.nodeId(), singleReq0); - else { - assert mappings0 != null; - - doUpdate(mappings0); - } + mapSingle(singleReq0.nodeId(), singleReq0); } /** @@ -752,84 +573,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } /** - * @param topNodes Cache nodes. - * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. - * @return Mapping. - * @throws Exception If failed. - */ - private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, - AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer) throws Exception { - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); - - if (key == null) - throw new NullPointerException("Null key."); - - Object val = this.val; - - if (val == null && op != GridCacheOperation.DELETE) - throw new NullPointerException("Null value."); - - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - - if (op != TRANSFORM) - val = cctx.toCacheObject(val); - - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); - - if (affNodes.isEmpty()) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - int i = 0; - - for (ClusterNode affNode : affNodes) { - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - UUID nodeId = affNode.id(); - - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); - - pendingMappings.put(nodeId, mapped); - } - - mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, null, i == 0); - - i++; - } - - return pendingMappings; - } - - /** * @param topVer Topology version. * @param futVer Future version. * @param updVer Update version. @@ -862,7 +605,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda cctx.cacheId(), primary.id(), futVer, - fastMap, + false, updVer, topVer, topLocked, http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index bf7b0e9..009642d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -62,7 +63,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFuture { +public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture { + /** Fast map flag. */ + private final boolean fastMap; + /** Keys */ private Collection<?> keys; @@ -151,6 +155,8 @@ public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFutu remapCnt = 1; this.remapCnt = remapCnt; + + fastMap = cache.isFastMap(filter, op); } /** {@inheritDoc} */ @@ -193,24 +199,6 @@ public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFutu return false; } - /** - * Performs future mapping. - */ - public void map() { - AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); - - if (topVer == null) - mapOnTopology(); - else { - topLocked = true; - - // Cannot remap. - remapCnt = 1; - - map(topVer, null); - } - } - /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { // Wait fast-map near atomic update futures in CLOCK mode. @@ -510,10 +498,8 @@ public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFutu near.processNearAtomicUpdateResponse(req, res); } - /** - * Maps future on ready topology. - */ - private void mapOnTopology() { + /** {@inheritDoc} */ + @Override protected void mapOnTopology() { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -658,6 +644,11 @@ public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFutu } } + /** {@inheritDoc} */ + protected void map(AffinityTopologyVersion topVer) { + map(topVer, null); + } + /** * @param topVer Topology version. * @param remapKeys Keys to remap. @@ -1030,6 +1021,22 @@ public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFutu return req; } + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @return Collection of nodes to which key is mapped. + */ + private Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { + GridCacheAffinityManager affMgr = cctx.affinity(); + + // If we can send updates in parallel - do it. + return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } + /** {@inheritDoc} */ public String toString() { synchronized (mux) {
