IGNITE-2926: WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17080659 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17080659 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17080659 Branch: refs/heads/ignite-2523-1 Commit: 17080659b28ea02f6b95e63fb00dac7bd19c7add Parents: 143ce34 Author: vozerov-gridgain <[email protected]> Authored: Thu Apr 14 16:15:37 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Apr 14 16:15:37 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAtomicFuture.java | 5 - .../GridAbstractNearAtomicUpdateFuture.java | 187 ++++++++++++++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 - .../dht/atomic/GridNearAtomicUpdateFuture.java | 247 +++---------------- 4 files changed, 227 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 359909e..c96d00f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -38,9 +38,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { * @return Future or {@code null} if no need to wait. */ public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer); - - /** - * @return Future keys. - */ - public Collection<?> keys(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java new file mode 100644 index 0000000..e40e8ca --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.expiry.ExpiryPolicy; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + +/** + * Base for near atomic update futures. + */ +public abstract class GridAbstractNearAtomicUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object> { + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + protected final GridCacheContext cctx; + + /** Cache. */ + protected final GridDhtAtomicCache cache; + + /** Write synchronization mode. */ + protected final CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected final GridCacheOperation op; + + /** Return value require flag. */ + protected final boolean retval; + + /** Raw return value flag. */ + protected final boolean rawRetval; + + /** Expiry policy. */ + protected final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + protected final CacheEntryPredicate[] filter; + + /** Subject ID. */ + protected final UUID subjId; + + /** Task name hash. */ + protected final int taskNameHash; + + /** Skip store flag. */ + protected final boolean skipStore; + + /** Keep binary flag. */ + protected final boolean keepBinary; + + /** Wait for topology future flag. */ + protected final boolean waitTopFut; + + /** Fast map flag. */ + protected final boolean fastMap; + + /** Near cache flag. */ + protected final boolean nearEnabled; + + /** Mutex to synchronize state updates. */ + protected final Object mux = new Object(); + + /** + * Constructor. + */ + protected GridAbstractNearAtomicUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + boolean waitTopFut + ) { + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.retval = retval; + this.rawRetval = rawRetval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.keepBinary = keepBinary; + this.waitTopFut = waitTopFut; + + fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && + cctx.config().getAtomicWriteOrderMode() == CLOCK && + !(cctx.writeThrough() && cctx.config().getInterceptor() != null); + + nearEnabled = CU.isNearEnabled(cctx); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + protected boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } + + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @return Collection of nodes to which key is mapped. + */ + protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) { + GridCacheAffinityManager affMgr = cctx.affinity(); + + // If we can send updates in parallel - do it. + return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 9f52658..4721d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -212,11 +212,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return null; } - /** {@inheritDoc} */ - @Override public Collection<KeyCacheObject> keys() { - return keys; - } - /** * @param entry Entry to map. * @param val Value to write. http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9955df7..d68b4ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -23,10 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -35,10 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; -import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; @@ -57,35 +52,17 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ -public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> - implements GridCacheAtomicFuture<Object>{ - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Cache context. */ - private final GridCacheContext cctx; - - /** Cache. */ - private GridDhtAtomicCache cache; - - /** Update operation. */ - private final GridCacheOperation op; - +public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFuture { /** Keys */ private Collection<?> keys; @@ -104,51 +81,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheVersion> conflictRmvVals; - /** Return value require flag. */ - private final boolean retval; - - /** Expiry policy. */ - private final ExpiryPolicy expiryPlc; - - /** Optional filter. */ - private final CacheEntryPredicate[] filter; - - /** Write synchronization mode. */ - private final CacheWriteSynchronizationMode syncMode; - - /** Raw return value flag. */ - private final boolean rawRetval; - - /** Fast map flag. */ - private final boolean fastMap; - - /** Near cache flag. */ - private final boolean nearEnabled; - - /** Subject ID. */ - private final UUID subjId; - - /** Task name hash. */ - private final int taskNameHash; - /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ private boolean topLocked; - /** Skip store flag. */ - private final boolean skipStore; - - /** */ - private final boolean keepBinary; - - /** Wait for topology future flag. */ - private final boolean waitTopFut; - /** Remap count. */ private int remapCnt; - /** Mutex to synchronize state updates. */ - private final Object mux = new Object(); - /** Current topology version. */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; @@ -225,39 +163,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> int remapCnt, boolean waitTopFut ) { - this.rawRetval = rawRetval; + super(cctx, cache, syncMode, op, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, skipStore, + keepBinary, waitTopFut); assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); assert subjId != null; - this.cctx = cctx; - this.cache = cache; - this.syncMode = syncMode; - this.op = op; this.keys = keys; this.vals = vals; this.invokeArgs = invokeArgs; this.conflictPutVals = conflictPutVals; this.conflictRmvVals = conflictRmvVals; - this.retval = retval; - this.expiryPlc = expiryPlc; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.waitTopFut = waitTopFut; - - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - - fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && - cctx.config().getAtomicWriteOrderMode() == CLOCK && - !(cctx.writeThrough() && cctx.config().getInterceptor() != null); - - nearEnabled = CU.isNearEnabled(cctx); if (!waitTopFut) remapCnt = 1; @@ -266,30 +184,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public GridCacheVersion version() { synchronized (mux) { return futVer; } } - /** - * @return {@code True} if this future should block partition map exchange. - */ - private boolean waitForPartitionExchange() { - // Wait fast-map near atomic update futures in CLOCK mode. - return fastMap; - } - - /** {@inheritDoc} */ - @Override public Collection<?> keys() { - return keys; - } - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridNearAtomicUpdateResponse res = null; @@ -323,16 +223,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return false; } - /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. - } - /** * Performs future mapping. */ @@ -353,8 +243,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - if (waitForPartitionExchange()) { - GridFutureAdapter<Void> fut = completeFuture0(topVer); + // Wait fast-map near atomic update futures in CLOCK mode. + if (fastMap) { + GridFutureAdapter<Void> fut; + + synchronized (mux) { + if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + fut = topCompleteFut; + } + else + fut = null; + } if (fut != null && isDone()) { fut.onDone(); @@ -454,16 +356,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { - if (res.failedKeys() != null) - addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + if (res.failedKeys() != null) { + if (err == null) + err = new CachePartialUpdateCheckedException( + "Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + + for (KeyCacheObject key : res.failedKeys()) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err.add(keys, res.error(), req.topologyVersion()); + } } else { if (!req.fastMap() || req.hasPrimary()) { GridCacheReturn ret = res.returnValue(); if (op == TRANSFORM) { - if (ret != null) - addInvokeResults(ret); + if (ret != null) { + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } } else opRes = ret; @@ -677,35 +597,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - private boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; - } - - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. - * @return Collection of nodes to which key is mapped. - */ - private Collection<ClusterNode> mapKey( - KeyCacheObject key, - AffinityTopologyVersion topVer, - boolean fastMap - ) { - GridCacheAffinityManager affMgr = cctx.affinity(); - - // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); - } - - /** * Maps future to single node. * * @param nodeId Node ID. @@ -914,26 +805,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param topVer Topology version. - * @return Future. - */ - @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) { - synchronized (mux) { - if (this.topVer == AffinityTopologyVersion.ZERO) - return null; - - if (this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); - - return topCompleteFut; - } - - return null; - } - } - - /** * @return Future version. */ GridCacheVersion onFutureDone() { @@ -1039,7 +910,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); if (affNodes.isEmpty()) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -1189,44 +1060,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return req; } - /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } - - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. - */ - private void addFailedKeys(Collection<KeyCacheObject> failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; - - if (err0 == null) - err0 = this.err = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(failedKeys.size()); - - for (KeyCacheObject key : failedKeys) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); - - err0.add(keys, err, topVer); - } - /** {@inheritDoc} */ public String toString() { synchronized (mux) {
