Repository: ignite Updated Branches: refs/heads/ignite-1452 89d2cb7f7 -> 16729a964
ignite-1452 Avoid cache operations hang on node stop Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16729a96 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16729a96 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16729a96 Branch: refs/heads/ignite-1452 Commit: 16729a964e183ef7d68e760a461d42782664a6cd Parents: 89d2cb7 Author: sboikov <[email protected]> Authored: Mon Sep 14 14:47:44 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Sep 14 14:52:19 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 12 +++++----- .../processors/cache/GridCacheAdapter.java | 22 ++++------------- .../processors/cache/GridCacheMvccManager.java | 25 ++++---------------- .../processors/cache/GridCacheProcessor.java | 8 +++++-- .../processors/cache/GridCacheProxyImpl.java | 24 +++++++++++++++++++ .../processors/cache/IgniteInternalCache.java | 20 ++++++++++++++++ .../preloader/GridDhtPartitionDemandPool.java | 2 +- .../IgniteCacheQuerySelfTestSuite.java | 23 ++++++++++-------- 8 files changed, 79 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 5c9b54f..f15567c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.PluginProvider; @@ -52,7 +52,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { private IgniteLogger log; /** */ - private volatile GridCacheAdapter<Integer, String> cache; + private volatile IgniteInternalCache<Integer, String> cache; /** Non-volatile on purpose. */ private int failedCnt; @@ -103,7 +103,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { /** {@inheritDoc} */ @Override protected boolean registerClassName(int id, String clsName) throws IgniteCheckedException { - GridCacheAdapter<Integer, String> cache0 = cache; + IgniteInternalCache<Integer, String> cache0 = cache; if (cache0 == null) return false; @@ -137,7 +137,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { /** {@inheritDoc} */ @Override protected String className(int id) throws IgniteCheckedException { - GridCacheAdapter<Integer, String> cache0 = cache; + IgniteInternalCache<Integer, String> cache0 = cache; if (cache0 == null) { U.awaitQuiet(latch); @@ -187,9 +187,9 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { } /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events) + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<? extends Integer, ? extends String> evt : events) { + for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts) { assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()): "Received cache entry update for system marshaller cache: " + evt; http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 4460a2a..7ea22e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1332,14 +1332,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); } - /** - * Gets value without waiting for toplogy changes. - * - * @param key Key. - * @return Value. - * @throws IgniteCheckedException If failed. - */ - public V getTopologySafe(K key) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public V getTopologySafe(K key) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync( @@ -2453,16 +2447,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); } - /** - * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} - * if topology exchange is in progress. - * - * @param key Key. - * @param val value. - * @return Old value. - * @throws IgniteCheckedException In case of error. - */ - @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { // Supported only in ATOMIC cache. throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 77d47f7..555bbda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -326,14 +326,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * Cancels all client futures. */ public void cancelClientFutures() { - cancelClientFutures(stopError()); - } - - /** - * @return Local node stop error. - */ - private IgniteCheckedException stopError() { - return new IgniteCheckedException("Operation has been cancelled (node is stopping)."); + cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping).")); } /** {@inheritDoc} */ @@ -392,7 +385,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; - onFutureAdded(fut); + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); } /** @@ -513,7 +507,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { fut.onNodeLeft(n.id()); } - onFutureAdded(fut); + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); // Just in case if future was completed before it was added. if (fut.isDone()) @@ -523,16 +518,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param fut Future. - */ - private void onFutureAdded(IgniteInternalFuture<?> fut) { - if (cctx.kernalContext().isStopping()) - ((GridFutureAdapter)fut).onDone(stopError()); - else if (cctx.kernalContext().clientDisconnected()) - ((GridFutureAdapter)fut).onDone(disconnectedError(null)); - } - - /** * @param fut Future to remove. * @return {@code True} if removed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 75d4c43..a3e1553 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2877,8 +2877,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @return Marshaller system cache. */ - public GridCacheAdapter<Integer, String> marshallerCache() { - return internalCache(CU.MARSH_CACHE_NAME); + public IgniteInternalCache<Integer, String> marshallerCache() { + IgniteCacheProxy cache = jCacheProxies.get(CU.MARSH_CACHE_NAME); + + assert cache != null : jCacheProxies; + + return cache.internalProxy(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 4d26bd8..8f3ba21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1514,6 +1514,30 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.tryPutIfAbsent(key, val); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public V getTopologySafe(K key) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getTopologySafe(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); out.writeObject(delegate); http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 07650da..a062803 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1802,4 +1802,24 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @return Future to be completed whenever loading completes. */ public IgniteInternalFuture<?> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args); + + /** + * Tries to put value in cache. Will fail with {@link GridCacheTryPutFailedException} + * if topology exchange is in progress. + * + * @param key Key. + * @param val value. + * @return Old value. + * @throws IgniteCheckedException In case of error. + */ + @Nullable public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException; + + /** + * Gets value without waiting for toplogy changes. + * + * @param key Key. + * @return Value. + * @throws IgniteCheckedException If failed. + */ + public V getTopologySafe(K key) throws IgniteCheckedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index e993a88..90a6f2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -857,7 +857,7 @@ public class GridDhtPartitionDemandPool { log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); try { - cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get(); + cctx.kernalContext().cache().internalCache(CU.MARSH_CACHE_NAME).preloader().syncFuture().get(); } catch (IgniteInterruptedCheckedException ignored) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/16729a96/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index b9cee62..629d761 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -97,25 +97,28 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite Cache Queries Test Suite"); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); return suite;
