ignite-4984 Fixed potential double processing of node failed event. Removed unused 'tryPut' operation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c1db677 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c1db677 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c1db677 Branch: refs/heads/ignite-5024 Commit: 0c1db6777bf0ef49b1219e90e2e2b932d17dfa82 Parents: 6e2c51d Author: sboikov <[email protected]> Authored: Fri Apr 21 17:35:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Apr 21 17:35:23 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 6 - .../processors/cache/GridCacheAtomicFuture.java | 5 - .../processors/cache/GridCacheMvccManager.java | 10 +- .../processors/cache/GridCacheProxyImpl.java | 12 -- .../cache/GridCacheTryPutFailedException.java | 28 ---- .../processors/cache/IgniteInternalCache.java | 11 -- .../dht/GridPartitionedSingleGetFuture.java | 3 - .../GridDhtAtomicAbstractUpdateFuture.java | 9 +- .../dht/atomic/GridDhtAtomicCache.java | 54 +------ .../atomic/GridDhtAtomicSingleUpdateFuture.java | 3 - .../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 - .../GridNearAtomicAbstractUpdateFuture.java | 79 +++++++-- .../GridNearAtomicSingleUpdateFuture.java | 134 +++++----------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 159 +++++++------------ .../distributed/near/GridNearAtomicCache.java | 5 - .../cache/local/GridLocalLockFuture.java | 3 - .../cache/hibernate/HibernateCacheProxy.java | 5 - .../cache/hibernate/HibernateCacheProxy.java | 5 - 18 files changed, 180 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 5438163..c9f7430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2708,12 +2708,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - // Supported only in ATOMIC cache. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Nullable @Override public final V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException { return getAndPut(key, val, ctx.noVal()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 87ae29c..35e49c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -25,11 +25,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; */ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** - * @return Future ID. - */ - public long id(); - - /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. * * @param topVer Topology version to finish. http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 712d136..3ae9f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -262,16 +262,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) fut.onNodeLeft(discoEvt.eventNode().id()); - for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { + for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) cacheFut.onNodeLeft(discoEvt.eventNode().id()); - - if (cacheFut.isCancelled() || cacheFut.isDone()) { - long futId = cacheFut.id(); - - if (futId > 0) - atomicFuts.remove(futId, cacheFut); - } - } } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index eaa448f..2979a57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1274,18 +1274,6 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - CacheOperationContext prev = gate.enter(opCtx); - - try { - return delegate.tryGetAndPut(key, val); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public <T> EntryProcessorResult<T> invoke( AffinityTopologyVersion topVer, K key, http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java deleted file mode 100644 index e4217e3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.IgniteCheckedException; - -/** - * Try put failed exception. - */ -public class GridCacheTryPutFailedException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index ca8c7fc..ce65fd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1887,17 +1887,6 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public V getTopologySafe(K key) throws IgniteCheckedException; /** - * Tries to get and put value in cache. Will fail with {@link GridCacheTryPutFailedException} - * if topology exchange is in progress. - * - * @param key Key. - * @param val value. - * @return Old value. - * @throws IgniteCheckedException In case of error. - */ - @Nullable public V tryGetAndPut(K key, V val) throws IgniteCheckedException; - - /** * @param topVer Locked topology version. * @param key Key. * @param entryProcessor Entry processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 3dfae6f..dbf1fe1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -63,9 +63,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>, CacheGetFuture { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 039cb99..5c7c027 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -60,9 +60,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC */ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Void> implements GridCacheAtomicFuture<Void> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger. */ protected static IgniteLogger log; @@ -294,8 +291,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA throw new UnsupportedOperationException(); } - /** {@inheritDoc} */ - @Override public final long id() { + /** + * @return Future ID. + */ + final long id() { return futId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 5bbfe14..e477592 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -643,7 +643,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, filter, - true, false).get(); } @@ -656,7 +655,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, filter, - true, false).get(); assert res != null; @@ -674,7 +672,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, filter, - true, true); } @@ -688,26 +685,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, filter, - true, true); } /** {@inheritDoc} */ - @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - return (V)update0( - key, - val, - null, - null, - true, - null, - false, - false).get(); - } - - /** {@inheritDoc} */ @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { updateAll0(m, null, @@ -716,7 +697,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, false).get(); } @@ -730,7 +710,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, true).chain(RET2NULL); } @@ -752,7 +731,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, true); } @@ -897,7 +875,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { args, false, null, - true, async); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @@ -968,7 +945,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, async); @@ -1000,7 +976,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, false).get(); } @@ -1022,7 +997,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, true); } @@ -1037,7 +1011,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param conflictRmvMap Conflict remove map. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. - * @param waitTopFut Whether to wait for topology future. * @param async Async operation flag. * @return Completion future. */ @@ -1050,7 +1023,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, - final boolean waitTopFut, final GridCacheOperation op, boolean async ) { @@ -1127,8 +1099,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1155,7 +1126,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param invokeArgs Invoke arguments. * @param retval Return value flag. * @param filter Filter. - * @param waitTopFut Whether to wait for topology future. * @param async Async operation flag. * @return Future. */ @@ -1166,7 +1136,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate filter, - final boolean waitTopFut, boolean async ) { assert val == null || proc == null; @@ -1178,7 +1147,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); final GridNearAtomicAbstractUpdateFuture updateFut = - createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); + createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1217,8 +1186,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, null, retval, - filter, - true); + filter); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1245,7 +1213,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param invokeArgs Invoke arguments. * @param retval Return value flag. * @param filter Filter. - * @param waitTopFut Whether to wait for topology future. * @return Future. */ private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( @@ -1254,8 +1221,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable EntryProcessor proc, @Nullable Object[] invokeArgs, boolean retval, - @Nullable CacheEntryPredicate filter, - boolean waitTopFut + @Nullable CacheEntryPredicate filter ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1317,8 +1283,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES ); } else { @@ -1341,8 +1306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); } } @@ -1408,8 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - true); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -3018,8 +2981,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.skipStore(), req.keepBinary(), req.recovery(), - MAX_RETRIES, - true); + MAX_RETRIES); updateFut.map(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 8ebe9c3..f053d21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -37,9 +37,6 @@ import org.jetbrains.annotations.Nullable; */ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { /** */ - private static final long serialVersionUID = 0L; - - /** */ private boolean allUpdated; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 5d5ddf0..2a84445 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -36,9 +36,6 @@ import org.jetbrains.annotations.Nullable; */ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { /** */ - private static final long serialVersionUID = 0L; - - /** */ private int updateCntr; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index cf5aa19..6969971 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -55,6 +55,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * Base for near atomic update futures. @@ -112,9 +113,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** Recovery flag. */ protected final boolean recovery; - /** Wait for topology future flag. */ - protected final boolean waitTopFut; - /** Near cache flag. */ protected final boolean nearEnabled; @@ -137,9 +135,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture @GridToStringInclude protected CachePartialUpdateCheckedException err; - /** Future ID. */ + /** Future ID, changes when operation is remapped. */ @GridToStringInclude - protected volatile long futId; + protected long futId; /** Operation result. */ protected GridCacheReturn opRes; @@ -162,7 +160,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @param keepBinary Keep binary flag. * @param recovery {@code True} if cache operation is called in recovery mode. * @param remapCnt Remap count. - * @param waitTopFut Wait topology future flag. */ protected GridNearAtomicAbstractUpdateFuture( GridCacheContext cctx, @@ -179,8 +176,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -201,16 +197,27 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture this.skipStore = skipStore; this.keepBinary = keepBinary; this.recovery = recovery; - this.waitTopFut = waitTopFut; nearEnabled = CU.isNearEnabled(cctx); - if (!waitTopFut) - remapCnt = 1; - this.remapCnt = remapCnt; } + /** + * @return {@code True} if future was initialized and waits for responses. + */ + final boolean futureMapped() { + return topVer != AffinityTopologyVersion.ZERO; + } + + /** + * @param futId Expected future ID. + * @return {@code True} if future was initialized with the same ID. + */ + final boolean checkFutureId(long futId) { + return topVer != AffinityTopologyVersion.ZERO && this.futId == futId; + } + /** {@inheritDoc} */ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { return null; @@ -227,7 +234,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture onSendError(req, e); } catch (IgniteCheckedException e) { - onDone(e); + completeFuture(null, e, req.futureId()); } } @@ -336,6 +343,50 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res); /** + * @param ret Result. + * @param err Error. + * @param futId Not null ID if need remove future. + */ + final void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullable Long futId) { + Object retval = ret == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? + cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); + + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); + + if (futId != null) + cctx.mvcc().removeAtomicFuture(futId); + + super.onDone(retval, err); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public final boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert err != null : "onDone should be called only to finish future with error on cache/node stop"; + + Long futId = null; + + synchronized (this) { + if (futureMapped()) { + futId = this.futId; + + topVer = AffinityTopologyVersion.ZERO; + this.futId = 0; + } + } + + if (super.onDone(null, err)) { + if (futId != null) + cctx.mvcc().removeAtomicFuture(futId); + + return true; + } + + return false; + } + + /** * @param req Request. * @param res Response. */ @@ -533,7 +584,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @return Request if need process primary fail response, {@code null} otherwise. */ @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() { - if (finished()) + if (finished() || req.nodeFailedResponse()) return null; /* http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 1a8ae1c..6ffa373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjecto import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -82,7 +81,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param keepBinary Keep binary flag. * @param recovery {@code True} if cache operation is called in recovery mode. * @param remapCnt Maximum number of retries. - * @param waitTopFut If {@code false} does not wait for affinity change future. */ public GridNearAtomicSingleUpdateFuture( GridCacheContext cctx, @@ -101,8 +99,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { super(cctx, cache, @@ -118,8 +115,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda skipStore, keepBinary, recovery, - remapCnt, - waitTopFut); + remapCnt); assert subjId != null; @@ -128,11 +124,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - @Override public long id() { - return futId; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; @@ -142,10 +133,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda boolean rcvAll = false; + long futId; + synchronized (this) { - if (reqState == null) + if (!futureMapped()) return false; + futId = this.futId; + if (reqState.req.nodeId.equals(nodeId)) { GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail(); @@ -180,32 +175,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (checkReq != null) sendCheckUpdateRequest(checkReq); else if (rcvAll) - finishUpdateFuture(opRes0, err0, remapTopVer0); - - return false; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - assert res == null || res instanceof GridCacheReturn; - - GridCacheReturn ret = (GridCacheReturn)res; - - Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? - cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); - - if (op == TRANSFORM && retval == null) - retval = Collections.emptyMap(); - - if (super.onDone(retval, err)) { - Long futVer = onFutureDone(); - - if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); - - return true; - } + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); return false; } @@ -217,7 +187,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion remapTopVer0; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; assert reqState != null; @@ -240,12 +210,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (errors != null) { assert errors.error() != null; - onDone(errors.error()); + completeFuture(null, errors.error(), res.futureId()); return; } - finishUpdateFuture(opRes0, err0, remapTopVer0); + finishUpdateFuture(opRes0, err0, remapTopVer0, res.futureId()); } /** {@inheritDoc} */ @@ -259,7 +229,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda CachePartialUpdateCheckedException err0 = null; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; req = reqState.processPrimaryResponse(nodeId, res); @@ -311,7 +281,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } if (res.error() != null && res.failedKeys() == null) { - onDone(res.error()); + completeFuture(null, res.error(), res.futureId()); return; } @@ -325,7 +295,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (nearEnabled && !nodeErr) updateNear(req, res); - onDone(opRes0, err0); + completeFuture(opRes0, err0, res.futureId()); } /** @@ -333,7 +303,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda */ private AffinityTopologyVersion onAllReceived() { assert Thread.holdsLock(this); - assert futId > 0; + assert futureMapped() : this; AffinityTopologyVersion remapTopVer0 = null; @@ -364,8 +334,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cctx.mvcc().removeAtomicFuture(futId); reqState = null; - futId = 0; topVer = AffinityTopologyVersion.ZERO; + futId = 0; remapTopVer = null; } @@ -377,12 +347,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param remapTopVer New topology version. */ private void waitAndRemap(AffinityTopologyVersion remapTopVer) { - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); - - return; - } - if (topLocked) { CachePartialUpdateCheckedException e = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); @@ -394,7 +358,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); - onDone(e); + completeFuture(null, e, null); return; } @@ -437,8 +401,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion topVer; if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + completeFuture(null, + new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + cache.name()), + null); return; } @@ -449,7 +414,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null); if (err != null) { - onDone(err); + completeFuture(null, err, null); return; } @@ -457,21 +422,17 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda topVer = fut.topologyVersion(); } else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); return; } @@ -490,7 +451,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda reqState0 = mapSingleUpdate(topVer, futId); synchronized (this) { - assert this.futId == 0 : this; + assert topVer.topologyVersion() > 0 : topVer; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -510,7 +471,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } if (err != null) { - onDone(err); + completeFuture(null, err, futId); return; } @@ -519,7 +480,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda sendSingleRequest(reqState0.req.nodeId(), reqState0.req); if (syncMode == FULL_ASYNC) { - onDone(new GridCacheReturn(cctx, true, true, null, true)); + completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, null); return; } @@ -539,7 +500,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicCheckUpdateRequest checkReq = null; synchronized (this) { - if (this.futId == 0 || this.futId != futId) + if (!checkFutureId(futId)) return; assert reqState != null; @@ -560,22 +521,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (checkReq != null) sendCheckUpdateRequest(checkReq); else - finishUpdateFuture(opRes0, err0, remapTopVer0); - } - - /** - * @return Future ID. - */ - private Long onFutureDone() { - Long id0; - - synchronized (this) { - id0 = futId; - - futId = 0; - } - - return id0; + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); } /** @@ -712,10 +658,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param opRes Operation result. * @param err Operation error. * @param remapTopVer Not-null topology version if need remap update. + * @param futId Future ID. */ private void finishUpdateFuture(GridCacheReturn opRes, CachePartialUpdateCheckedException err, - @Nullable AffinityTopologyVersion remapTopVer) { + @Nullable AffinityTopologyVersion remapTopVer, + long futId) { if (remapTopVer != null) { waitAndRemap(remapTopVer); @@ -728,7 +676,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda updateNear(reqState.req, reqState.req.response()); } - onDone(opRes, err); + completeFuture(opRes, err, futId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 82af2a6..46a3c34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjecto import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -112,7 +111,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param skipStore Skip store flag. * @param keepBinary Keep binary flag. * @param remapCnt Maximum number of retries. - * @param waitTopFut If {@code false} does not wait for affinity change future. */ public GridNearAtomicUpdateFuture( GridCacheContext cctx, @@ -133,11 +131,23 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, recovery, remapCnt, waitTopFut); + super(cctx, + cache, + syncMode, + op, + invokeArgs, + retval, + rawRetval, + expiryPlc, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + recovery, + remapCnt); assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); @@ -151,11 +161,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override public long id() { - return futId; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; @@ -165,10 +170,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu List<GridNearAtomicCheckUpdateRequest> checkReqs = null; + long futId; + synchronized (this) { - if (futId == 0) + if (!futureMapped()) return false; + futId = this.futId; + if (singleReq != null) { if (singleReq.req.nodeId.equals(nodeId)) { GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail(); @@ -261,33 +270,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu sendCheckUpdateRequest(checkReqs.get(i)); } else if (rcvAll) - finishUpdateFuture(opRes0, err0, remapTopVer0); - - return false; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - assert res == null || res instanceof GridCacheReturn; - - GridCacheReturn ret = (GridCacheReturn)res; - - Object retval = - res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? - cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); - - if (op == TRANSFORM && retval == null) - retval = Collections.emptyMap(); - - if (super.onDone(retval, err)) { - Long futId = onFutureDone(); - - if (futId != null) - cctx.mvcc().removeAtomicFuture(futId); - - return true; - } + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); return false; } @@ -299,7 +282,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion remapTopVer0; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; PrimaryRequestState reqState; @@ -351,12 +334,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (errors != null) { assert errors.error() != null; - onDone(errors.error()); + completeFuture(null, errors.error(), res.futureId()); return; } - finishUpdateFuture(opRes0, err0, remapTopVer0); + finishUpdateFuture(opRes0, err0, remapTopVer0, res.futureId()); } /** {@inheritDoc} */ @@ -372,7 +355,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; if (singleReq != null) { @@ -457,7 +440,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (res.error() != null && res.failedKeys() == null) { - onDone(res.error()); + completeFuture(null, res.error(), res.futureId()); return; } @@ -483,18 +466,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (rcvAll) - onDone(opRes0, err0); + completeFuture(opRes0, err0, res.futureId()); } private void waitAndRemap(AffinityTopologyVersion remapTopVer) { assert remapTopVer != null; - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); - - return; - } - if (topLocked) { assert !F.isEmpty(remapKeys) : remapKeys; @@ -508,7 +485,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu e.add(remapKeys, cause); - onDone(e); + completeFuture(null, e, null); return; } @@ -534,7 +511,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu */ @Nullable private AffinityTopologyVersion onAllReceived() { assert Thread.holdsLock(this); - assert futId > 0; + assert futureMapped() : this; AffinityTopologyVersion remapTopVer0 = null; @@ -577,8 +554,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (remapTopVer0 != null) { cctx.mvcc().removeAtomicFuture(futId); - futId = 0; topVer = AffinityTopologyVersion.ZERO; + futId = 0; remapTopVer = null; } @@ -589,10 +566,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** * @param opRes Operation result. * @param err Operation error. + * @param remapTopVer Not-null topology version if need remap update. + * @param futId Future ID. */ private void finishUpdateFuture(GridCacheReturn opRes, CachePartialUpdateCheckedException err, - @Nullable AffinityTopologyVersion remapTopVer) { + @Nullable AffinityTopologyVersion remapTopVer, + long futId) { if (nearEnabled) { if (mappings != null) { for (PrimaryRequestState reqState : mappings.values()) { @@ -618,7 +598,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - onDone(opRes, err); + completeFuture(opRes, err, futId); } /** @@ -643,8 +623,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion topVer; if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + completeFuture(null, + new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + cache.name()), + null); return; } @@ -655,7 +636,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu Throwable err = fut.validateCache(cctx, recovery, false, null, keys); if (err != null) { - onDone(err); + completeFuture(null, err, null); return; } @@ -663,21 +644,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu topVer = fut.topologyVersion(); } else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); return; } @@ -745,7 +722,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); + completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, null); } /** {@inheritDoc} */ @@ -761,8 +738,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); + completeFuture(null, + new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes left the grid)."), + null); return; } @@ -801,7 +779,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (this) { - assert this.futId == 0 : this; + assert topVer.topologyVersion() > 0 : topVer; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -826,7 +804,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (err != null) { - onDone(err); + completeFuture(null, err, futId); return; } @@ -838,7 +816,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu assert mappings0 != null; if (size == 0) { - onDone(new GridCacheReturn(cctx, true, true, null, true)); + completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, futId); return; } @@ -847,7 +825,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } if (syncMode == FULL_ASYNC) { - onDone(new GridCacheReturn(cctx, true, true, null, true)); + completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, futId); return; } @@ -866,7 +844,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll = false; synchronized (this) { - if (this.futId == 0 || this.futId != futId) + if (!checkFutureId(futId)) return; if (singleReq != null) { @@ -928,22 +906,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu sendCheckUpdateRequest(checkReqs.get(i)); } else if (rcvAll) - finishUpdateFuture(opRes0, err0, remapTopVer0); - } - - /** - * @return Future version. - */ - private Long onFutureDone() { - Long id0; - - synchronized (this) { - id0 = futId; - - futId = 0; - } - - return id0; + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 422a3fc..2d5c8a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -457,11 +457,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - return dht.tryGetAndPut(key, val); - } - - /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException { dht.putAll(m); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index d8e95b9..59d0adb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -55,9 +55,6 @@ import org.jetbrains.annotations.Nullable; */ public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Boolean> implements GridCacheMvccFuture<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java index c814f9a..48fc1f8 100644 --- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java @@ -683,11 +683,6 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object> } /** {@inheritDoc} */ - @Nullable @Override public Object tryGetAndPut(Object key, Object val) throws IgniteCheckedException { - return delegate.tryGetAndPut(keyTransformer.transform(key), val); - } - - /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { return delegate.lostPartitions(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c1db677/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java index c814f9a..48fc1f8 100644 --- a/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java +++ b/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java @@ -683,11 +683,6 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object> } /** {@inheritDoc} */ - @Nullable @Override public Object tryGetAndPut(Object key, Object val) throws IgniteCheckedException { - return delegate.tryGetAndPut(keyTransformer.transform(key), val); - } - - /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { return delegate.lostPartitions(); }
