Repository: ignite Updated Branches: refs/heads/ignite-4984 [created] 8c7566241
ignite-4984 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c756624 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c756624 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c756624 Branch: refs/heads/ignite-4984 Commit: 8c756624137806b964ef32e45113e9b7333b7f58 Parents: 130b1fd Author: sboikov <sboi...@gridgain.com> Authored: Wed Apr 19 19:51:19 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Apr 19 19:51:19 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 6 - .../processors/cache/GridCacheAtomicFuture.java | 5 - .../processors/cache/GridCacheMvccManager.java | 10 +- .../processors/cache/GridCacheProxyImpl.java | 12 -- .../cache/GridCacheTryPutFailedException.java | 28 ----- .../processors/cache/IgniteInternalCache.java | 11 -- .../GridDhtAtomicAbstractUpdateFuture.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 54 ++------ .../GridNearAtomicAbstractUpdateFuture.java | 20 +-- .../GridNearAtomicSingleUpdateFuture.java | 125 +++++++++---------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 71 +++++------ .../distributed/near/GridNearAtomicCache.java | 5 - .../cache/hibernate/HibernateCacheProxy.java | 5 - .../cache/hibernate/HibernateCacheProxy.java | 5 - 14 files changed, 115 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a3d4c81..fdf4b89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2708,12 +2708,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - // Supported only in ATOMIC cache. - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Nullable @Override public final V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException { return getAndPut(key, val, ctx.noVal()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 87ae29c..35e49c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -25,11 +25,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; */ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** - * @return Future ID. - */ - public long id(); - - /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. * * @param topVer Topology version to finish. http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 712d136..3ae9f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -262,16 +262,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) fut.onNodeLeft(discoEvt.eventNode().id()); - for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { + for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) cacheFut.onNodeLeft(discoEvt.eventNode().id()); - - if (cacheFut.isCancelled() || cacheFut.isDone()) { - long futId = cacheFut.id(); - - if (futId > 0) - atomicFuts.remove(futId, cacheFut); - } - } } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index eaa448f..2979a57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1274,18 +1274,6 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - CacheOperationContext prev = gate.enter(opCtx); - - try { - return delegate.tryGetAndPut(key, val); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Nullable @Override public <T> EntryProcessorResult<T> invoke( AffinityTopologyVersion topVer, K key, http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java deleted file mode 100644 index e4217e3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTryPutFailedException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.IgniteCheckedException; - -/** - * Try put failed exception. - */ -public class GridCacheTryPutFailedException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index ca8c7fc..ce65fd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1887,17 +1887,6 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public V getTopologySafe(K key) throws IgniteCheckedException; /** - * Tries to get and put value in cache. Will fail with {@link GridCacheTryPutFailedException} - * if topology exchange is in progress. - * - * @param key Key. - * @param val value. - * @return Old value. - * @throws IgniteCheckedException In case of error. - */ - @Nullable public V tryGetAndPut(K key, V val) throws IgniteCheckedException; - - /** * @param topVer Locked topology version. * @param key Key. * @param entryProcessor Entry processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 039cb99..87ef091 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -294,8 +294,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA throw new UnsupportedOperationException(); } - /** {@inheritDoc} */ - @Override public final long id() { + /** + * @return Future ID. + */ + final long id() { return futId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 5bbfe14..e477592 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -643,7 +643,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, filter, - true, false).get(); } @@ -656,7 +655,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, filter, - true, false).get(); assert res != null; @@ -674,7 +672,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, true, filter, - true, true); } @@ -688,26 +685,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, filter, - true, true); } /** {@inheritDoc} */ - @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - return (V)update0( - key, - val, - null, - null, - true, - null, - false, - false).get(); - } - - /** {@inheritDoc} */ @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { updateAll0(m, null, @@ -716,7 +697,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, false).get(); } @@ -730,7 +710,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, true).chain(RET2NULL); } @@ -752,7 +731,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, UPDATE, true); } @@ -897,7 +875,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { args, false, null, - true, async); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @@ -968,7 +945,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, async); @@ -1000,7 +976,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, false).get(); } @@ -1022,7 +997,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, false, false, - true, TRANSFORM, true); } @@ -1037,7 +1011,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param conflictRmvMap Conflict remove map. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. - * @param waitTopFut Whether to wait for topology future. * @param async Async operation flag. * @return Completion future. */ @@ -1050,7 +1023,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, - final boolean waitTopFut, final GridCacheOperation op, boolean async ) { @@ -1127,8 +1099,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1155,7 +1126,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param invokeArgs Invoke arguments. * @param retval Return value flag. * @param filter Filter. - * @param waitTopFut Whether to wait for topology future. * @param async Async operation flag. * @return Future. */ @@ -1166,7 +1136,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate filter, - final boolean waitTopFut, boolean async ) { assert val == null || proc == null; @@ -1178,7 +1147,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); final GridNearAtomicAbstractUpdateFuture updateFut = - createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); + createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1217,8 +1186,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, null, retval, - filter, - true); + filter); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -1245,7 +1213,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param invokeArgs Invoke arguments. * @param retval Return value flag. * @param filter Filter. - * @param waitTopFut Whether to wait for topology future. * @return Future. */ private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( @@ -1254,8 +1221,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable EntryProcessor proc, @Nullable Object[] invokeArgs, boolean retval, - @Nullable CacheEntryPredicate filter, - boolean waitTopFut + @Nullable CacheEntryPredicate filter ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1317,8 +1283,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES ); } else { @@ -1341,8 +1306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - waitTopFut); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); } } @@ -1408,8 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), - opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - true); + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); if (async) { return asyncOp(new CO<IgniteInternalFuture<Object>>() { @@ -3018,8 +2981,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.skipStore(), req.keepBinary(), req.recovery(), - MAX_RETRIES, - true); + MAX_RETRIES); updateFut.map(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index cf5aa19..480a866 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -112,9 +112,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** Recovery flag. */ protected final boolean recovery; - /** Wait for topology future flag. */ - protected final boolean waitTopFut; - /** Near cache flag. */ protected final boolean nearEnabled; @@ -179,8 +176,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -201,16 +197,20 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture this.skipStore = skipStore; this.keepBinary = keepBinary; this.recovery = recovery; - this.waitTopFut = waitTopFut; nearEnabled = CU.isNearEnabled(cctx); - if (!waitTopFut) - remapCnt = 1; - this.remapCnt = remapCnt; } + final boolean futureMapped() { + return topVer != AffinityTopologyVersion.ZERO; + } + + final boolean checkFutureId(long futId) { + return topVer != AffinityTopologyVersion.ZERO && this.futId == futId; + } + /** {@inheritDoc} */ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { return null; @@ -533,7 +533,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @return Request if need process primary fail response, {@code null} otherwise. */ @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() { - if (finished()) + if (finished() || req.nodeFailedResponse()) return null; /* http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 1a8ae1c..c14b104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjecto import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -82,7 +81,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param keepBinary Keep binary flag. * @param recovery {@code True} if cache operation is called in recovery mode. * @param remapCnt Maximum number of retries. - * @param waitTopFut If {@code false} does not wait for affinity change future. */ public GridNearAtomicSingleUpdateFuture( GridCacheContext cctx, @@ -101,8 +99,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { super(cctx, cache, @@ -118,8 +115,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda skipStore, keepBinary, recovery, - remapCnt, - waitTopFut); + remapCnt); assert subjId != null; @@ -128,11 +124,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - @Override public long id() { - return futId; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; @@ -142,8 +133,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda boolean rcvAll = false; + long futId = 0; + synchronized (this) { - if (reqState == null) + if (!futureMapped()) return false; if (reqState.req.nodeId.equals(nodeId)) { @@ -174,35 +167,46 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda opRes0 = opRes; err0 = err; remapTopVer0 = onAllReceived(); + futId = this.futId; } } if (checkReq != null) sendCheckUpdateRequest(checkReq); else if (rcvAll) - finishUpdateFuture(opRes0, err0, remapTopVer0); + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); return false; } - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - assert res == null || res instanceof GridCacheReturn; - - GridCacheReturn ret = (GridCacheReturn)res; - - Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? + private void completeFuture(GridCacheReturn ret, Throwable err, Long futId) { + Object retval = ret == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); - if (super.onDone(retval, err)) { - Long futVer = onFutureDone(); + if (futId != null) + cctx.mvcc().removeAtomicFuture(futId); + + super.onDone(retval, err); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert err != null; - if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); + Long futId = null; + + synchronized (this) { + if (futureMapped()) + futId = this.futId; + } + + if (super.onDone(null, err)) { + if (futId != null) + cctx.mvcc().removeAtomicFuture(futId); return true; } @@ -217,7 +221,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion remapTopVer0; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; assert reqState != null; @@ -240,12 +244,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (errors != null) { assert errors.error() != null; - onDone(errors.error()); + completeFuture(null, errors.error(), res.futureId()); return; } - finishUpdateFuture(opRes0, err0, remapTopVer0); + finishUpdateFuture(opRes0, err0, remapTopVer0, res.futureId()); } /** {@inheritDoc} */ @@ -259,7 +263,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda CachePartialUpdateCheckedException err0 = null; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; req = reqState.processPrimaryResponse(nodeId, res); @@ -311,7 +315,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } if (res.error() != null && res.failedKeys() == null) { - onDone(res.error()); + completeFuture(null, res.error(), res.futureId()); return; } @@ -325,7 +329,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (nearEnabled && !nodeErr) updateNear(req, res); - onDone(opRes0, err0); + completeFuture(opRes0, err0, res.futureId()); } /** @@ -333,7 +337,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda */ private AffinityTopologyVersion onAllReceived() { assert Thread.holdsLock(this); - assert futId > 0; + assert futureMapped() : this; AffinityTopologyVersion remapTopVer0 = null; @@ -364,7 +368,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cctx.mvcc().removeAtomicFuture(futId); reqState = null; - futId = 0; topVer = AffinityTopologyVersion.ZERO; remapTopVer = null; @@ -377,12 +380,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param remapTopVer New topology version. */ private void waitAndRemap(AffinityTopologyVersion remapTopVer) { - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); - - return; - } - if (topLocked) { CachePartialUpdateCheckedException e = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); @@ -394,7 +391,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); - onDone(e); + completeFuture(null, e, null); return; } @@ -437,8 +434,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion topVer; if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + completeFuture(null, + new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + cache.name()), + null); return; } @@ -449,7 +447,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda Throwable err = fut.validateCache(cctx, recovery, /*read*/false, key, null); if (err != null) { - onDone(err); + completeFuture(null, err, null); return; } @@ -457,21 +455,17 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda topVer = fut.topologyVersion(); } else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); return; } @@ -490,7 +484,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda reqState0 = mapSingleUpdate(topVer, futId); synchronized (this) { - assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -510,7 +503,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } if (err != null) { - onDone(err); + completeFuture(null, err, futId); return; } @@ -519,7 +512,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda sendSingleRequest(reqState0.req.nodeId(), reqState0.req); if (syncMode == FULL_ASYNC) { - onDone(new GridCacheReturn(cctx, true, true, null, true)); + completeFuture(new GridCacheReturn(cctx, true, true, null, true), null, null); return; } @@ -539,7 +532,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicCheckUpdateRequest checkReq = null; synchronized (this) { - if (this.futId == 0 || this.futId != futId) + if (!checkFutureId(futId)) return; assert reqState != null; @@ -560,7 +553,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (checkReq != null) sendCheckUpdateRequest(checkReq); else - finishUpdateFuture(opRes0, err0, remapTopVer0); + finishUpdateFuture(opRes0, err0, remapTopVer0, futId); } /** @@ -712,10 +705,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param opRes Operation result. * @param err Operation error. * @param remapTopVer Not-null topology version if need remap update. + * @param futId Future ID. */ private void finishUpdateFuture(GridCacheReturn opRes, CachePartialUpdateCheckedException err, - @Nullable AffinityTopologyVersion remapTopVer) { + @Nullable AffinityTopologyVersion remapTopVer, + long futId) { if (remapTopVer != null) { waitAndRemap(remapTopVer); @@ -728,7 +723,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda updateNear(reqState.req, reqState.req.response()); } - onDone(opRes, err); + completeFuture(opRes, err, futId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 82af2a6..6ebb1de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjecto import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -112,7 +111,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param skipStore Skip store flag. * @param keepBinary Keep binary flag. * @param remapCnt Maximum number of retries. - * @param waitTopFut If {@code false} does not wait for affinity change future. */ public GridNearAtomicUpdateFuture( GridCacheContext cctx, @@ -133,11 +131,23 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean skipStore, boolean keepBinary, boolean recovery, - int remapCnt, - boolean waitTopFut + int remapCnt ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, recovery, remapCnt, waitTopFut); + super(cctx, + cache, + syncMode, + op, + invokeArgs, + retval, + rawRetval, + expiryPlc, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + recovery, + remapCnt); assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); @@ -151,11 +161,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override public long id() { - return futId; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; @@ -166,7 +171,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu List<GridNearAtomicCheckUpdateRequest> checkReqs = null; synchronized (this) { - if (futId == 0) + if (!futureMapped()) return false; if (singleReq != null) { @@ -299,7 +304,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion remapTopVer0; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; PrimaryRequestState reqState; @@ -372,7 +377,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll; synchronized (this) { - if (futId == 0 || futId != res.futureId()) + if (!checkFutureId(res.futureId())) return; if (singleReq != null) { @@ -489,12 +494,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu private void waitAndRemap(AffinityTopologyVersion remapTopVer) { assert remapTopVer != null; - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); - - return; - } - if (topLocked) { assert !F.isEmpty(remapKeys) : remapKeys; @@ -534,7 +533,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu */ @Nullable private AffinityTopologyVersion onAllReceived() { assert Thread.holdsLock(this); - assert futId > 0; + assert futureMapped() : this; AffinityTopologyVersion remapTopVer0 = null; @@ -577,7 +576,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (remapTopVer0 != null) { cctx.mvcc().removeAtomicFuture(futId); - futId = 0; topVer = AffinityTopologyVersion.ZERO; remapTopVer = null; @@ -663,21 +661,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu topVer = fut.topologyVersion(); } else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); return; } @@ -801,7 +795,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (this) { - assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -866,7 +859,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll = false; synchronized (this) { - if (this.futId == 0 || this.futId != futId) + if (!checkFutureId(futId)) return; if (singleReq != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 422a3fc..2d5c8a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -457,11 +457,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { - return dht.tryGetAndPut(key, val); - } - - /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException { dht.putAll(m); http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java index c814f9a..48fc1f8 100644 --- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java @@ -683,11 +683,6 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object> } /** {@inheritDoc} */ - @Nullable @Override public Object tryGetAndPut(Object key, Object val) throws IgniteCheckedException { - return delegate.tryGetAndPut(keyTransformer.transform(key), val); - } - - /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { return delegate.lostPartitions(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8c756624/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java b/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java index c814f9a..48fc1f8 100644 --- a/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java +++ b/modules/hibernate5/src/main/java/org/apache/ignite/cache/hibernate/HibernateCacheProxy.java @@ -683,11 +683,6 @@ public class HibernateCacheProxy implements IgniteInternalCache<Object, Object> } /** {@inheritDoc} */ - @Nullable @Override public Object tryGetAndPut(Object key, Object val) throws IgniteCheckedException { - return delegate.tryGetAndPut(keyTransformer.transform(key), val); - } - - /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { return delegate.lostPartitions(); }