ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4ae7ed7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4ae7ed7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4ae7ed7 Branch: refs/heads/ignite-1537 Commit: a4ae7ed750b736a0c8580388f9bada0bc47c7f12 Parents: 02140e6 Author: sboikov <[email protected]> Authored: Mon Dec 21 14:03:59 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 21 14:03:59 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 41 +++++++++++++------- .../processors/cache/GridCacheProxyImpl.java | 26 +++++++++++++ .../cache/GridCacheSharedContext.java | 5 +-- .../processors/cache/IgniteCacheProxy.java | 32 +++++++++++++++ .../processors/cache/IgniteInternalCache.java | 24 ++++++++++++ .../binary/CacheObjectBinaryProcessorImpl.java | 2 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 7 +++- .../near/GridNearOptimisticTxPrepareFuture.java | 7 +++- ...ridNearOptimisticTxPrepareFutureAdapter.java | 22 ++++++++++- .../cache/distributed/near/GridNearTxLocal.java | 16 ++++++-- .../cache/transactions/IgniteInternalTx.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 22 +++++++---- .../cache/transactions/IgniteTxLocalEx.java | 1 + .../cache/transactions/IgniteTxManager.java | 5 ++- ...niteBinaryMetadataUpdateNodeRestartTest.java | 13 ++++++- .../IgniteCacheRestartTestSuite2.java | 3 ++ .../cache/websession/WebSessionListener.java | 24 ++++++++++-- 23 files changed, 215 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cc4e962..9bb3f55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2077,8 +2077,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + return invoke0(false, key, entryProcessor, args); + } + + /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, + final Object... args) throws IgniteCheckedException { + return invoke0(true, key, entryProcessor, args); + } + + /** + * @param waitTopFut If {@code false} does not wait for affinity change future. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Entry processor arguments. + * @return Invoke result. + * @throws IgniteCheckedException If failed. + */ + private <T> EntryProcessorResult<T> invoke0( + final boolean waitTopFut, + final K key, + final EntryProcessor<K, V, T> entryProcessor, final Object... args) throws IgniteCheckedException { A.notNull(key, "key", entryProcessor, "entryProcessor"); @@ -2090,7 +2113,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args); + tx.invokeAsync(ctx, waitTopFut, key, (EntryProcessor<K, V, Object>)entryProcessor, args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); @@ -2324,16 +2347,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); } - /** - * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} - * if topology exchange is in progress. - * - * @param key Key. - * @param val value. - * @return Old value. - * @throws IgniteCheckedException In case of error. - */ - @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { // Supported only in ATOMIC cache. throw new UnsupportedOperationException(); } @@ -3969,7 +3984,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut, new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) { - return tx.commitAsync(); + return tx.commitAsync(true); } }); @@ -3978,7 +3993,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return f; } - IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(); + IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(true); saveFuture(holder, f); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d1d93d8..cbdb5b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1231,6 +1231,32 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.tryPutIfAbsent(key, val); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> EntryProcessorResult<T> tryInvoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.tryInvoke(key, entryProcessor, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void removeAll() throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 5ed1df9..0a03494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -91,9 +91,6 @@ public class GridCacheSharedContext<K, V> { /** Tx metrics. */ private volatile TransactionMetricsAdapter txMetrics; - /** Preloaders start future. */ - private IgniteInternalFuture<Object> preloadersStartFut; - /** Store session listeners. */ private Collection<CacheStoreSessionListener> storeSesLsnrs; @@ -621,7 +618,7 @@ public class GridCacheSharedContext<K, V> { if (ctx == null) { tx.txState().awaitLastFut(this); - return tx.commitAsync(); + return tx.commitAsync(true); } else return ctx.cache().commitTxAsync(tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 1768ecf..e935191 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1483,6 +1483,38 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args); } + /** + * Tries to execute invoke operation. Fails if topology exchange is in progress. + * + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke result. + */ + public <T> T tryInvoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) + throw new UnsupportedOperationException(); + else { + EntryProcessorResult<T> res = delegate.tryInvoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 186de68..fcba9c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1863,4 +1863,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @throws IgniteCheckedException If failed. */ public V getTopologySafe(K key) throws IgniteCheckedException; + + /** + * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} + * if topology exchange is in progress. + * + * @param key Key. + * @param val value. + * @return Old value. + * @throws IgniteCheckedException In case of error. + */ + @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException; + + /** + * Tries to execute invoke operation. Will fail if topology exchange is in progress. + * + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke result. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <T> EntryProcessorResult<T> tryInvoke(K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 6aee7a9..615b1a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -486,7 +486,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm BinaryMetadata oldMeta = metaDataCache.localPeek(key); BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0); - BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta)); + BinaryObjectException err = metaDataCache.tryInvoke(key, new MetadataProcessor(mergedMeta)); if (err != null) throw err; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1fd0b2e..56dc684 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -728,7 +728,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { try { commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index ae24ed1..b7bca06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -950,7 +950,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (resp.error() == null && t.onePhaseCommit()) { assert t.implicit(); - return t.commitAsync().chain( + return t.commitAsync(true).chain( new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index f344d48..621281c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -480,7 +480,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 40399b4..334cee7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -643,7 +643,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (prepErr == null) { try { - fut = tx.commitAsync(); + fut = tx.commitAsync(true); } catch (RuntimeException | Error e) { Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " + http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index f52b3fc..7a1789d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -83,9 +83,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param cctx Context. * @param tx Transaction. + * @param waitTopFut If {@code false} does not wait for affinity change future. */ - public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { - super(cctx, tx); + public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, + GridNearTxLocal tx, + boolean waitTopFut) { + super(cctx, tx, waitTopFut); assert tx.optimistic() && tx.serializable() : tx; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 2ce14af..706f82c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -71,9 +71,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param cctx Context. * @param tx Transaction. + * @param waitTopFut If {@code false} does not wait for affinity change future. */ - public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { - super(cctx, tx); + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, + GridNearTxLocal tx, + boolean waitTopFut) { + super(cctx, tx, waitTopFut); assert tx.optimistic() && !tx.serializable() : tx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index b3eab34..f836166 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -37,14 +38,22 @@ import org.jetbrains.annotations.Nullable; * */ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter { + /** */ + private final boolean waitTopFut; + /** * @param cctx Context. * @param tx Transaction. + * @param waitTopFut If {@code false} does not wait for affinity change future. */ - public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, + GridNearTxLocal tx, + boolean waitTopFut) { super(cctx, tx); assert tx.optimistic() : tx; + + this.waitTopFut = waitTopFut; } /** {@inheritDoc} */ @@ -138,6 +147,17 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT c.run(); } else { + if (!waitTopFut) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to execute update, " + + "cluster topology changed."); + + err.retryReadyFuture(topFut); + + onDone(err); + + return; + } + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ae4972e..e5ccad1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -784,14 +784,22 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> prepareAsync() { + return prepareAsync0(true); + } + + /** + * @param waitTopFut If {@code false} does not wait for affinity change future. + * @return Prepare future. + */ + private IgniteInternalFuture<?> prepareAsync0(boolean waitTopFut) { GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. if (optimistic()) { fut = serializable() ? - new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) : - new GridNearOptimisticTxPrepareFuture(cctx, this); + new GridNearOptimisticSerializableTxPrepareFuture(cctx, this, waitTopFut) : + new GridNearOptimisticTxPrepareFuture(cctx, this, waitTopFut); } else fut = new GridNearPessimisticTxPrepareFuture(cctx, this); @@ -812,11 +820,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); - prepareAsync(); + prepareAsync0(waitTopFut); GridNearTxFinishFuture fut = commitFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index f5f99f5..7d75b2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -619,9 +619,10 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * Asynchronously commits this transaction by initiating {@code two-phase-commit} process. * + * @param waitTopFut If {@code false} does not wait for affinity change future. * @return Future for commit operation. */ - public IgniteInternalFuture<IgniteInternalTx> commitAsync(); + public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut); /** * Callback invoked whenever there is a lock that has been acquired http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 53f4f56..f2ada64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -2063,7 +2063,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync(boolean waitTopFut) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index b25baf8..81c8a3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -691,7 +691,7 @@ public class IgniteTxHandler { tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); - IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); + IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(true); // Only for error logging. commitFut.listen(CU.errorLogger(log)); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 720832e..08e564e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -580,7 +580,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public void commit() throws IgniteCheckedException { try { - commitAsync().get(); + commitAsync(true).get(); } finally { cctx.tm().resetContext(); @@ -1953,15 +1953,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter V val, boolean retval, CacheEntryPredicate[] filter) { - return putAsync0(cacheCtx, key, val, null, null, retval, filter); + return putAsync0(cacheCtx, true, key, val, null, null, retval, filter); } /** {@inheritDoc} */ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx, + boolean waitTopFut, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs) { - return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null); + return (IgniteInternalFuture)putAsync0(cacheCtx, waitTopFut, key, null, entryProcessor, invokeArgs, true, null); } /** {@inheritDoc} */ @@ -2914,6 +2915,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter */ private <K, V> IgniteInternalFuture putAsync0( final GridCacheContext cacheCtx, + boolean waitTopFut, K key, @Nullable V val, @Nullable EntryProcessor<K, V, Object> entryProcessor, @@ -3014,7 +3016,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else - return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); + return optimisticPutFuture(cacheCtx, waitTopFut, loadFut, ret, keepBinary); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); @@ -3193,7 +3195,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else - return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); + return optimisticPutFuture(cacheCtx, true, loadFut, ret, keepBinary); } catch (RuntimeException e) { onException(); @@ -3203,12 +3205,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param waitTopFut If {@code false} does not wait for affinity change future. * @param loadFut Missing keys load future. * @param ret Future result. + * @param keepBinary Keep binary flag. * @return Future. */ private IgniteInternalFuture optimisticPutFuture( final GridCacheContext cacheCtx, + boolean waitTopFut, IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret, final boolean keepBinary @@ -3225,7 +3231,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return new GridFinishedFuture<>(e); } - return nonInterruptable(commitAsync().chain( + return nonInterruptable(commitAsync(waitTopFut).chain( new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { @@ -3466,7 +3472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // with prepare response, if required. assert loadFut.isDone(); - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + return nonInterruptable(commitAsync(true).chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { try { @@ -3961,7 +3967,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (commit && commitAfterLock()) { rollback = false; - return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, T>() { + return commitAsync(true).chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, T>() { @Override public T applyx(IgniteInternalFuture<IgniteInternalTx> f) throws IgniteCheckedException { f.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index a5d3373..e1b73cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -115,6 +115,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, + boolean waitTopFut, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d2b803a..205be49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -630,8 +630,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Transaction. + * @return {@code True} if topology hint was set. */ - public boolean setTxTopologyHint(IgniteInternalTx tx) { + public boolean setTxTopologyHint(@Nullable IgniteInternalTx tx) { if (tx == null) txTopology.remove(); else { @@ -1761,7 +1762,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } if (commit) - tx.commitAsync().listen(new CommitListener(tx)); + tx.commitAsync(true).listen(new CommitListener(tx)); else tx.rollbackAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java index b06ec35..8aa75cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java @@ -27,9 +27,11 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -113,7 +115,7 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testNodeRestart() throws Exception { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 5; i++) { log.info("Iteration: " + i); client = false; @@ -170,6 +172,13 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac } catch (CacheException e) { log.info("Error: " + e); + + if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + if (cause.retryReadyFuture() != null) + cause.retryReadyFuture().get(); + } } } @@ -177,7 +186,7 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac } }, 10, "update-thread"); - U.sleep(10_000); + U.sleep(5_000); stop.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index c9e9467..de87e99 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.GridCachePutAllFailoverSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailoverSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; @@ -42,6 +43,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCachePutAllRestartTest.class); suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); + suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4ae7ed7/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java index 82f1633..89e56bd 100644 --- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java +++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java @@ -30,12 +30,15 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CachePartialUpdateException; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -117,7 +120,7 @@ class WebSessionListener { break; } - catch (CachePartialUpdateException ignored) { + catch (CacheException e) { if (i == retries - 1) { U.warn(log, "Failed to apply updates for session (maximum number of retries exceeded) [sesId=" + sesId + ", retries=" + retries + ']'); @@ -125,12 +128,25 @@ class WebSessionListener { else { U.warn(log, "Failed to apply updates for session (will retry): " + sesId); - U.sleep(RETRY_DELAY); + IgniteFuture<?> retryFut = null; + + if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null : e; + + retryFut = cause.retryReadyFuture(); + } + + if (retryFut != null) + retryFut.get(); + else + U.sleep(RETRY_DELAY); } } } } - catch (CacheException | IgniteInterruptedCheckedException e) { + catch (CacheException | IgniteException | IgniteInterruptedCheckedException e) { U.error(log, "Failed to update session attributes [id=" + sesId + ']', e); } }
