http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index a6a1fc1..24b7f16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -31,6 +31,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import javax.cache.Cache; @@ -100,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; @@ -122,9 +122,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< private static final IgniteProductVersion CONT_QRY_WITH_TRANSFORMER_SINCE = IgniteProductVersion.fromString("2.5.0"); + /** Cache name. */ + private String cacheName; + /** Context. */ private volatile GridCacheContext<K, V> ctx; + /** Old context. */ + private transient volatile GridCacheContext<K, V> oldContext; + /** Delegate. */ @GridToStringInclude private volatile IgniteInternalCache<K, V> delegate; @@ -185,6 +191,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< assert ctx != null; assert delegate != null; + cacheName = ctx.name(); + + assert cacheName.equals(delegate.name()) : "ctx.name=" + cacheName + ", delegate.name=" + delegate.name(); + this.ctx = ctx; this.delegate = delegate; @@ -203,6 +213,65 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Context. */ @Override public GridCacheContext<K, V> context() { + return getContextSafe(); + } + + /** + * @return Context or throw restart exception. + */ + private GridCacheContext<K, V> getContextSafe() { + while (true) { + GridCacheContext<K, V> ctx = this.ctx; + + if (ctx == null) { + checkRestart(); + + if (Thread.currentThread().isInterrupted()) + throw new IgniteException(new InterruptedException()); + } + else + return ctx; + } + } + + /** + * @return Delegate or throw restart exception. + */ + private IgniteInternalCache<K, V> getDelegateSafe() { + while (true) { + IgniteInternalCache<K, V> delegate = this.delegate; + + if (delegate == null) { + checkRestart(); + + if (Thread.currentThread().isInterrupted()) + throw new IgniteException(new InterruptedException()); + } + else + return delegate; + } + } + + /** + * @return Context. + */ + public GridCacheContext<K, V> context0() { + GridCacheContext<K, V> ctx = this.ctx; + + if (ctx == null) { + synchronized (this) { + ctx = this.ctx; + + if (ctx == null) { + GridCacheContext<K, V> context = oldContext; + + assert context != null; + + return context; + } + } + } + return ctx; } @@ -219,36 +288,49 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< return cachedProxy; cachedProxy = new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), true); + return cachedProxy; } /** {@inheritDoc} */ @Override public CacheMetrics metrics() { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.cache().clusterMetrics(); } /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.cache().clusterMetrics(grp); } /** {@inheritDoc} */ @Override public CacheMetrics localMetrics() { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.cache().localMetrics(); } /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.cache().clusterMxBean(); } /** {@inheritDoc} */ @Override public CacheMetricsMXBean localMxBean() { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.cache().localMxBean(); } /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + GridCacheContext<K, V> ctx = getContextSafe(); + CacheConfiguration cfg = ctx.config(); if (!clazz.isAssignableFrom(cfg.getClass())) @@ -284,6 +366,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + GridCacheContext<K, V> ctx = getContextSafe(); + try { if (isAsync()) { if (ctx.cache().isLocal()) @@ -306,6 +390,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + GridCacheContext<K, V> ctx = getContextSafe(); + try { if (ctx.cache().isLocal()) return (IgniteFuture<Void>)createFuture(ctx.cache().localLoadCacheAsync(p, args)); @@ -319,6 +405,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.localLoadCacheAsync(p, args)); @@ -333,11 +421,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args)); } /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAndPutIfAbsentAsync(key, val)); @@ -354,6 +446,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAndPutIfAbsentAsync(key, val)); } @@ -364,6 +458,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Lock lockAll(final Collection<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + GridCacheContext<K, V> ctx = getContextSafe(); + //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); @@ -372,6 +469,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); } @@ -386,8 +485,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< private <T, R> QueryCursor<R> query( final ScanQuery scanQry, @Nullable final IgniteClosure<T, R> transformer, - @Nullable ClusterGroup grp) - throws IgniteCheckedException { + @Nullable ClusterGroup grp + ) throws IgniteCheckedException { + GridCacheContext<K, V> ctx = getContextSafe(); CacheOperationContext opCtxCall = ctx.operationContextPerCall(); @@ -405,7 +505,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< qry.projection(grp); final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, - ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() { + cacheName, ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() { @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException { return qry.executeScanQuery(); } @@ -423,6 +523,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @SuppressWarnings("unchecked") private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) throws IgniteCheckedException { + GridCacheContext<K, V> ctx = getContextSafe(); + final CacheQuery qry; CacheOperationContext opCtxCall = ctx.operationContextPerCall(); @@ -515,11 +617,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Local node cluster group. */ private ClusterGroup projection(boolean loc) { + GridCacheContext<K, V> ctx = getContextSafe(); + if (loc || ctx.isLocal() || ctx.isReplicatedAffinityNode()) return ctx.kernalContext().grid().cluster().forLocal(); if (ctx.isReplicated()) - return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom(); + return ctx.kernalContext().grid().cluster().forDataNodes(cacheName).forRandom(); return null; } @@ -534,6 +638,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< */ @SuppressWarnings("unchecked") private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery qry, boolean loc, boolean keepBinary) { + GridCacheContext<K, V> ctx = getContextSafe(); + assert qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer; if (qry.getInitialQuery() instanceof ContinuousQuery || @@ -638,6 +744,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> queryMultipleStatements(SqlFieldsQuery qry) { + GridCacheContext<K, V> ctx = getContextSafe(); + A.notNull(qry, "qry"); try { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -662,6 +770,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(Query<R> qry) { + GridCacheContext<K, V> ctx = getContextSafe(); + A.notNull(qry, "qry"); try { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -699,6 +809,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { + GridCacheContext<K, V> ctx = getContextSafe(); + A.notNull(qry, "qry"); A.notNull(transformer, "transformer"); @@ -726,6 +838,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @param qry Query. */ private void convertToBinary(final Query qry) { + GridCacheContext<K, V> ctx = getContextSafe(); + if (ctx.binaryMarshaller()) { if (qry instanceof SqlQuery) { final SqlQuery sqlQry = (SqlQuery) qry; @@ -754,6 +868,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< if (args == null) return; + GridCacheContext<K, V> ctx = getContextSafe(); + for (int i = 0; i < args.length; i++) args[i] = ctx.cacheObjects().binary().toBinary(args[i]); } @@ -765,10 +881,12 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @throws CacheException If query indexing disabled for sql query. */ private void validate(Query qry) { + GridCacheContext<K, V> ctx = getContextSafe(); + if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && !(qry instanceof ContinuousQuery) && !(qry instanceof ContinuousQueryWithTransformer) && !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && !(qry instanceof SqlFieldsQuery)) - throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() + + throw new CacheException("Indexing is disabled for cache: " + cacheName + ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable."); if (!ctx.kernalContext().query().moduleEnabled() && @@ -779,6 +897,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localEntries(peekModes); } @@ -789,31 +909,43 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return delegate.context().queries().metrics(); } /** {@inheritDoc} */ @Override public void resetQueryMetrics() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + delegate.context().queries().resetMetrics(); } /** {@inheritDoc} */ @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return delegate.context().queries().detailMetrics(); } /** {@inheritDoc} */ @Override public void resetQueryDetailMetrics() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + delegate.context().queries().resetDetailMetrics(); } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + delegate.evictAll(keys); } /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localPeek(key, peekModes); } @@ -824,6 +956,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.sizeAsync(peekModes)); @@ -840,11 +974,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.sizeAsync(peekModes)); } /** {@inheritDoc} */ @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.sizeLongAsync(peekModes)); @@ -861,11 +999,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.sizeLongAsync(peekModes)); } /** {@inheritDoc} */ @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.sizeLongAsync(part, peekModes)); @@ -882,11 +1024,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.sizeLongAsync(part, peekModes)); } /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localSize(peekModes); } @@ -897,6 +1043,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public long localSizeLong(CachePeekMode... peekModes) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localSizeLong(peekModes); } @@ -907,6 +1055,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public long localSizeLong(int part, CachePeekMode... peekModes) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localSizeLong(part, peekModes); } @@ -917,6 +1067,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public V get(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAsync(key)); @@ -933,11 +1085,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<V> getAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAsync(key)); } /** {@inheritDoc} */ @Override public CacheEntry<K, V> getEntry(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getEntryAsync(key)); @@ -954,11 +1110,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getEntryAsync(key)); } /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAllAsync(keys)); @@ -975,11 +1135,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAllAsync(keys)); } /** {@inheritDoc} */ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getEntriesAsync(keys)); @@ -996,11 +1160,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getEntriesAsync(keys)); } /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAllOutTxAsync(keys)); @@ -1017,6 +1185,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAllOutTxAsync(keys)); } @@ -1025,6 +1195,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Values map. */ public Map<K, V> getAll(Collection<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAllAsync(keys)); @@ -1041,6 +1213,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public boolean containsKey(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + if (isAsync()) { setFuture(delegate.containsKeyAsync(key)); @@ -1052,11 +1226,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.containsKeyAsync(key)); } /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + if (isAsync()) { setFuture(delegate.containsKeysAsync(keys)); @@ -1068,6 +1246,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.containsKeysAsync(keys)); } @@ -1077,6 +1257,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< boolean replaceExisting, @Nullable final CompletionListener completionLsnr ) { + GridCacheContext<K, V> ctx = getContextSafe(); + IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); if (completionLsnr != null) { @@ -1097,6 +1279,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void put(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(putAsync0(key, val)); @@ -1121,6 +1305,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Internal future. */ private IgniteInternalFuture<Void> putAsync0(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val); return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() { @@ -1139,6 +1325,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAndPutAsync(key, val)); @@ -1155,11 +1343,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndPutAsync(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAndPutAsync(key, val)); } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.putAllAsync(map)); @@ -1173,11 +1365,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map)); } /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.putIfAbsentAsync(key, val)); @@ -1194,11 +1390,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.putIfAbsentAsync(key, val)); } /** {@inheritDoc} */ @Override public boolean remove(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.removeAsync(key)); @@ -1215,11 +1415,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> removeAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.removeAsync(key)); } /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.removeAsync(key, oldVal)); @@ -1236,11 +1440,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.removeAsync(key, oldVal)); } /** {@inheritDoc} */ @Override public V getAndRemove(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAndRemoveAsync(key)); @@ -1257,11 +1465,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndRemoveAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAndRemoveAsync(key)); } /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.replaceAsync(key, oldVal, newVal)); @@ -1278,11 +1490,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.replaceAsync(key, oldVal, newVal)); } /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.replaceAsync(key, val)); @@ -1299,11 +1515,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.replaceAsync(key, val)); } /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.getAndReplaceAsync(key, val)); @@ -1320,11 +1540,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.getAndReplaceAsync(key, val)); } /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.removeAllAsync(keys)); @@ -1338,11 +1562,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys)); } /** {@inheritDoc} */ @Override public void removeAll() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.removeAllAsync()); @@ -1356,11 +1584,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> removeAllAsync() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync()); } /** {@inheritDoc} */ @Override public void clear(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.clearAsync(key)); @@ -1374,11 +1606,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAsync(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key)); } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.clearAllAsync(keys)); @@ -1392,11 +1628,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys)); } /** {@inheritDoc} */ @Override public void clear() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) setFuture(delegate.clearAsync()); @@ -1410,16 +1650,22 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAsync() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return (IgniteFuture<Void>)createFuture(delegate.clearAsync()); } /** {@inheritDoc} */ @Override public void localClear(K key) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + delegate.clearLocally(key); } /** {@inheritDoc} */ @Override public void localClearAll(Set<? extends K> keys) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + for (K key : keys) delegate.clearLocally(key); } @@ -1427,6 +1673,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(invokeAsync0(key, entryProcessor, args)); @@ -1459,6 +1707,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Internal future. */ private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { @@ -1497,7 +1747,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< public <T> T invoke(@Nullable AffinityTopologyVersion topVer, K key, EntryProcessor<K, V, T> entryProcessor, - Object... args) { + Object... args + ) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) throw new UnsupportedOperationException(); @@ -1515,7 +1768,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, - Object... args) { + Object... args + ) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); @@ -1533,13 +1789,19 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); } /** {@inheritDoc} */ - @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, - Object... args) { + Object... args + ) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); @@ -1557,6 +1819,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); } @@ -1564,6 +1828,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { if (isAsync()) { setFuture(delegate.invokeAllAsync(map, args)); @@ -1581,12 +1847,14 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return createFuture(delegate.invokeAllAsync(map, args)); } /** {@inheritDoc} */ @Override public String getName() { - return delegate.name(); + return cacheName; } /** {@inheritDoc} */ @@ -1608,7 +1876,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<?> destroyAsync() { - return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), false, true, false)); + GridCacheContext<K, V> ctx = getContextSafe(); + + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null)); } /** {@inheritDoc} */ @@ -1618,11 +1888,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<?> closeAsync() { - return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(ctx.name())); + GridCacheContext<K, V> ctx = getContextSafe(); + + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName)); } /** {@inheritDoc} */ @Override public boolean isClosed() { + GridCacheContext<K, V> ctx = getContextSafe(); + return ctx.kernalContext().cache().context().closed(ctx); } @@ -1630,14 +1904,19 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @Override public <T> T unwrap(Class<T> clazz) { if (clazz.isAssignableFrom(getClass())) return (T)this; - else if (clazz.isAssignableFrom(IgniteEx.class)) + else if (clazz.isAssignableFrom(IgniteEx.class)) { + GridCacheContext<K, V> ctx = getContextSafe(); + return (T)ctx.grid(); + } throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); } /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + GridCacheContext<K, V> ctx = getContextSafe(); + try { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1650,6 +1929,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + GridCacheContext<K, V> ctx = getContextSafe(); + try { ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); } @@ -1660,6 +1941,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { + GridCacheContext<K, V> ctx = getContextSafe(); + try { return ctx.cache().igniteIterator(); } @@ -1670,6 +1953,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override protected IgniteCache<K, V> createAsyncInstance() { + GridCacheContext<K, V> ctx = getContextSafe(); + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return new IgniteCacheProxyImpl<K, V>( ctx, delegate, @@ -1741,10 +2027,25 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< private RuntimeException cacheException(Exception e) { GridFutureAdapter<Void> restartFut = this.restartFut.get(); + if (X.hasCause(e, IgniteCacheRestartingException.class)) { + IgniteCacheRestartingException restartingException = X.cause(e, IgniteCacheRestartingException.class); + + if (restartingException.restartFuture() == null) { + if (restartFut == null) + restartFut = suspend(); + + assert restartFut != null; + + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), cacheName); + } + else + throw restartingException; + } + if (restartFut != null) { if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class)) throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " + - ctx.name(), e); + cacheName, e); } if (e instanceof IgniteException && X.hasCause(e, CacheException.class)) @@ -1778,7 +2079,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Internal proxy. */ @Override public GridCacheProxyImpl<K, V> internalProxy() { - checkRestart(); + GridCacheContext<K, V> ctx = getContextSafe(); + IgniteInternalCache<K, V> delegate = getDelegateSafe(); return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall()); } @@ -1799,11 +2101,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + return delegate.lostPartitions(); } /** {@inheritDoc} */ @Override public void enableStatistics(boolean enabled) { + GridCacheContext<K, V> ctx = getContextSafe(); + try { ctx.kernalContext().cache().enableStatistics(Collections.singleton(getName()), enabled); } @@ -1814,6 +2120,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void clearStatistics() { + GridCacheContext<K, V> ctx = getContextSafe(); + try { ctx.kernalContext().cache().clearStatistics(Collections.singleton(getName())); } @@ -1824,6 +2132,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public void preloadPartition(int part) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { delegate.preloadPartition(part); } @@ -1834,6 +2144,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public IgniteFuture<Void> preloadPartitionAsync(int part) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return (IgniteFuture<Void>)createFuture(delegate.preloadPartitionAsync(part)); } @@ -1844,6 +2156,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public boolean localPreloadPartition(int part) { + IgniteInternalCache<K, V> delegate = getDelegateSafe(); + try { return delegate.localPreloadPartition(part); } @@ -1864,15 +2178,23 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< ctx = (GridCacheContext<K, V>)in.readObject(); delegate = (IgniteInternalCache<K, V>)in.readObject(); + + cacheName = ctx.name(); + + assert cacheName.equals(delegate.name()) : "ctx.name=" + cacheName + ", delegate.name=" + delegate.name(); } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> rebalance() { + GridCacheContext<K, V> ctx = getContextSafe(); + return new IgniteFutureImpl<>(ctx.preloader().forceRebalance()); } /** {@inheritDoc} */ @Override public IgniteFuture<?> indexReadyFuture() { + GridCacheContext<K, V> ctx = getContextSafe(); + IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId()); if (fut == null) @@ -1885,10 +2207,29 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * Throws {@code IgniteCacheRestartingException} if proxy is restarting. */ public void checkRestart() { + checkRestart(false); + } + + /** + * Throws {@code IgniteCacheRestartingException} if proxy is restarting. + */ + public void checkRestart(boolean noWait) { RestartFuture currentFut = restartFut.get(); - if (currentFut != null) - currentFut.checkRestartOrAwait(); + if (currentFut != null) { + try { + if (!noWait) { + currentFut.get(1, TimeUnit.SECONDS); + + return; + } + } + catch (IgniteCheckedException ignore) { + //do nothing + } + + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), cacheName); + } } /** @@ -1899,26 +2240,33 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< } /** - * Restarts this cache proxy. + * Suspend this cache proxy. + * To make cache proxy active again, it's needed to restart it. */ - public boolean restart() { - RestartFuture restartFut = new RestartFuture(ctx.name()); - - RestartFuture curFut = this.restartFut.get(); - - boolean changed = this.restartFut.compareAndSet(curFut, restartFut); + public RestartFuture suspend() { + while (true) { + RestartFuture curFut = this.restartFut.get(); + + if (curFut == null) { + RestartFuture restartFut = new RestartFuture(cacheName); + + if (this.restartFut.compareAndSet(null, restartFut)) { + synchronized (this) { + if (!restartFut.isDone()) { + if (oldContext == null) { + oldContext = ctx; + delegate = null; + ctx = null; + } + } + } - if (changed && curFut != null) - restartFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> fut) { - if (fut.error() != null) - curFut.onDone(fut.error()); - else - curFut.onDone(); + return restartFut; } - }); - - return changed; + } + else + return curFut; + } } /** @@ -1934,19 +2282,30 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** * If proxy is already being restarted, returns future to wait on, else restarts this cache proxy. * - * @return Future to wait on, or null. + * @param cache To use for restart proxy. */ - public GridFutureAdapter<Void> opportunisticRestart() { - RestartFuture restartFut = new RestartFuture(ctx.name()); + public void opportunisticRestart(IgniteInternalCache<K, V> cache) { + RestartFuture restartFut = new RestartFuture(cacheName); while (true) { - if (this.restartFut.compareAndSet(null, restartFut)) - return null; + if (this.restartFut.compareAndSet(null, restartFut)) { + onRestarted(cache.context(), cache.context().cache()); + + return; + } GridFutureAdapter<Void> curFut = this.restartFut.get(); - if (curFut != null) - return curFut; + if (curFut != null) { + try { + curFut.get(); + } + catch (IgniteCheckedException ignore) { + // Do notrhing. + } + + return; + } } } @@ -1961,12 +2320,18 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< assert restartFut != null; - this.ctx = ctx; - this.delegate = delegate; + synchronized (this) { + this.restartFut.compareAndSet(restartFut, null); + + this.ctx = ctx; + oldContext = null; + this.delegate = delegate; - this.restartFut.compareAndSet(restartFut, null); + restartFut.onDone(); + } - restartFut.onDone(); + assert delegate == null || cacheName.equals(delegate.name()) && cacheName.equals(ctx.name()) : + "ctx.name=" + ctx.name() + ", delegate.name=" + delegate.name() + ", cacheName=" + cacheName; } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ab4287e..447f16f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -31,6 +31,7 @@ import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -1761,7 +1762,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (top.stopping()) { - res.addFailedKeys(req.keys(), new CacheStoppedException(name())); + if (ctx.shared().cache().isCacheRestarting(name())) + res.addFailedKeys(req.keys(), new IgniteCacheRestartingException(name())); + else + res.addFailedKeys(req.keys(), new CacheStoppedException(name())); completionCb.apply(req, res); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index d4e81e0..264dbe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -403,8 +404,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion topVer; if (cache.topology().stopping()) { - completeFuture(null,new CacheStoppedException( - cache.name()), + completeFuture( + null, + cctx.shared().cache().isCacheRestarting(cache.name())? + new IgniteCacheRestartingException(cache.name()): + new CacheStoppedException(cache.name()), null); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 072918e..3835d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -623,7 +624,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion topVer; if (cache.topology().stopping()) { - completeFuture(null,new CacheStoppedException(cache.name()), null); + completeFuture( + null, + cctx.shared().cache().isCacheRestarting(cache.name())? + new IgniteCacheRestartingException(cache.name()): + new CacheStoppedException(cache.name()), + null); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 2c81036..fec572e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -830,7 +831,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF try { if (cctx.topology().stopping()) { - onDone(new CacheStoppedException(cctx.name())); + onDone( + cctx.shared().cache().isCacheRestarting(cctx.name())? + new IgniteCacheRestartingException(cctx.name()): + new CacheStoppedException(cctx.name())); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 3691250..ba882c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -893,7 +894,10 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo try { if (cctx.topology().stopping()) { - onDone(new CacheStoppedException(cctx.name())); + onDone( + cctx.shared().cache().isCacheRestarting(cctx.name())? + new IgniteCacheRestartingException(cctx.name()): + new CacheStoppedException(cctx.name())); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java index b52d440..27b2fd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -317,7 +318,10 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun try { if (cctx.topology().stopping()) { - onDone(new CacheStoppedException(cctx.name())); + onDone( + cctx.shared().cache().isCacheRestarting(cctx.name())? + new IgniteCacheRestartingException(cctx.name()): + new CacheStoppedException(cctx.name())); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java index c13bf0e..7f8a121 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -111,7 +112,10 @@ public class TxTopologyVersionFuture extends GridFutureAdapter<AffinityTopologyV try { if (cctx.topology().stopping()) { - onDone(new CacheStoppedException(cctx.name())); + onDone( + cctx.shared().cache().isCacheRestarting(cctx.name())? + new IgniteCacheRestartingException(cctx.name()): + new CacheStoppedException(cctx.name())); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 5a4eb8b..e4178f7 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; @@ -69,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.GridStripedReadWriteLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -159,6 +161,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** */ private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); + /** */ + private final GridStripedReadWriteLock initDirLock = + new GridStripedReadWriteLock(Math.max(Runtime.getRuntime().availableProcessors(), 8)); + /** * @param ctx Kernal context. */ @@ -709,69 +715,78 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws IgniteCheckedException { boolean dirExisted = false; - if (!Files.exists(cacheWorkDir.toPath())) { - try { - Files.createDirectory(cacheWorkDir.toPath()); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to initialize cache working directory " + - "(failed to create, make sure the work folder has correct permissions): " + - cacheWorkDir.getAbsolutePath(), e); + ReadWriteLock lock = initDirLock.getLock(cacheWorkDir.getName().hashCode()); + + lock.writeLock().lock(); + + try { + if (!Files.exists(cacheWorkDir.toPath())) { + try { + Files.createDirectory(cacheWorkDir.toPath()); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to initialize cache working directory " + + "(failed to create, make sure the work folder has correct permissions): " + + cacheWorkDir.getAbsolutePath(), e); + } } - } - else { - if (cacheWorkDir.isFile()) - throw new IgniteCheckedException("Failed to initialize cache working directory " + - "(a file with the same name already exists): " + cacheWorkDir.getAbsolutePath()); + else { + if (cacheWorkDir.isFile()) + throw new IgniteCheckedException("Failed to initialize cache working directory " + + "(a file with the same name already exists): " + cacheWorkDir.getAbsolutePath()); - File lockF = new File(cacheWorkDir, IgniteCacheSnapshotManager.SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME); + File lockF = new File(cacheWorkDir, IgniteCacheSnapshotManager.SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME); - Path cacheWorkDirPath = cacheWorkDir.toPath(); + Path cacheWorkDirPath = cacheWorkDir.toPath(); - Path tmp = cacheWorkDirPath.getParent().resolve(cacheWorkDir.getName() + TMP_SUFFIX); + Path tmp = cacheWorkDirPath.getParent().resolve(cacheWorkDir.getName() + TMP_SUFFIX); - if (Files.exists(tmp) && Files.isDirectory(tmp) && + if (Files.exists(tmp) && Files.isDirectory(tmp) && Files.exists(tmp.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER))) { - U.warn(log, "Ignite node crashed during the snapshot restore process " + - "(there is a snapshot restore lock file left for cache). But old version of cache was saved. " + - "Trying to restore it. Cache - [" + cacheWorkDir.getAbsolutePath() + ']'); + U.warn(log, "Ignite node crashed during the snapshot restore process " + + "(there is a snapshot restore lock file left for cache). But old version of cache was saved. " + + "Trying to restore it. Cache - [" + cacheWorkDir.getAbsolutePath() + ']'); - U.delete(cacheWorkDir); + U.delete(cacheWorkDir); - try { - Files.move(tmp, cacheWorkDirPath, StandardCopyOption.ATOMIC_MOVE); + try { + Files.move(tmp, cacheWorkDirPath, StandardCopyOption.ATOMIC_MOVE); - cacheWorkDirPath.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER).toFile().delete(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); + cacheWorkDirPath.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER).toFile().delete(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } } - } - else if (lockF.exists()) { - U.warn(log, "Ignite node crashed during the snapshot restore process " + - "(there is a snapshot restore lock file left for cache). Will remove both the lock file and " + - "incomplete cache directory [cacheDir=" + cacheWorkDir.getAbsolutePath() + ']'); + else if (lockF.exists()) { + U.warn(log, "Ignite node crashed during the snapshot restore process " + + "(there is a snapshot restore lock file left for cache). Will remove both the lock file and " + + "incomplete cache directory [cacheDir=" + cacheWorkDir.getAbsolutePath() + ']'); - boolean deleted = U.delete(cacheWorkDir); + boolean deleted = U.delete(cacheWorkDir); - if (!deleted) - throw new IgniteCheckedException("Failed to remove obsolete cache working directory " + - "(remove the directory manually and make sure the work folder has correct permissions): " + - cacheWorkDir.getAbsolutePath()); + if (!deleted) + throw new IgniteCheckedException("Failed to remove obsolete cache working directory " + + "(remove the directory manually and make sure the work folder has correct permissions): " + + cacheWorkDir.getAbsolutePath()); - cacheWorkDir.mkdirs(); - } - else - dirExisted = true; + cacheWorkDir.mkdirs(); + } + else + dirExisted = true; - if (!cacheWorkDir.exists()) - throw new IgniteCheckedException("Failed to initialize cache working directory " + - "(failed to create, make sure the work folder has correct permissions): " + - cacheWorkDir.getAbsolutePath()); + if (!cacheWorkDir.exists()) + throw new IgniteCheckedException("Failed to initialize cache working directory " + + "(failed to create, make sure the work folder has correct permissions): " + + cacheWorkDir.getAbsolutePath()); - if (Files.exists(tmp)) - U.delete(tmp); + if (Files.exists(tmp)) + U.delete(tmp); + } + } + finally { + lock.writeLock().unlock(); } return dirExisted; http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 9e15c8d..a3842a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -37,7 +37,7 @@ public interface PageMemoryEx extends PageMemory { * @param grpId Group ID. * @param pageId Page ID. * @param page Page pointer. - * @param restore Determines if the page is locked for restore. + * @param restore Determines if the page is locked for restore memory (crash recovery). * @return ByteBuffer for modifying the page. */ long writeLock(int grpId, long pageId, long page, boolean restore); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 808b7ca..b1e1b02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -148,7 +149,10 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { cacheCtx.topology().readLock(); if (cacheCtx.topology().stopping()) { - fut.onDone(new CacheStoppedException(cacheCtx.name())); + fut.onDone( + cctx.cache().isCacheRestarting(cacheCtx.name())? + new IgniteCacheRestartingException(cacheCtx.name()): + new CacheStoppedException(cacheCtx.name())); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 8a00244..9cbea0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -293,7 +294,10 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { nonLocCtx.topology().readLock(); if (nonLocCtx.topology().stopping()) { - fut.onDone(new CacheStoppedException(nonLocCtx.name())); + fut.onDone( + cctx.cache().isCacheRestarting(nonLocCtx.name())? + new IgniteCacheRestartingException(nonLocCtx.name()): + new CacheStoppedException(nonLocCtx.name())); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java index 2c336a0..1767234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java @@ -149,7 +149,7 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu * @return Error. */ private IllegalStateException suspendedError() { - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(suspendFut), "Underlying cache is restarting: " + ctx.name()); + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(suspendFut), ctx.name()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index ea78f6c..cc799fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -704,6 +704,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen }); } + /** + * Would suspend calls for this cache if it is atomics cache. + * @param cacheName To suspend. + */ public void suspend(String cacheName) { for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) { String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName(); @@ -713,12 +717,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen } } - public void restart(IgniteInternalCache cache) { + + /** + * Would return this cache to normal work if it was suspened (and if it is atomics cache). + * @param cacheName To restart. + */ + public void restart(String cacheName, IgniteInternalCache cache) { for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) { String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName(); - if (cacheName0.equals(cache.name())) - e.getValue().restart(cache); + if (cacheName0.equals(cacheName)) { + if (cache != null) + e.getValue().restart(cache); + else { + e.getValue().onRemoved(); + + dsMap.remove(e.getKey(), e.getValue()); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java index d26a153..aa66ae0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java @@ -35,7 +35,14 @@ public interface GridCacheRemovable { */ public void needCheckNotRemoved(); + /** + * Would suspend calls for this object. + */ public void suspend(); + /** + * Would return this object work to normal. + * @param cache To update with. + */ public void restart(IgniteInternalCache cache); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 729f6eb..541ca30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -365,7 +365,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { if (delegate.separated()) { IgniteInternalFuture<Boolean> fut = cctx.kernalContext().cache().dynamicDestroyCache( - cctx.cache().name(), false, true, false); + cctx.cache().name(), false, true, false, null); ((GridFutureAdapter)fut).ignoreInterrupts(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 6e83bd3..e0b6a89 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -47,8 +49,10 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -189,7 +193,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { - futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false)); + futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false, null)); return null; } @@ -258,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false)); + futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false, null)); return null; } @@ -1366,4 +1370,62 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { fut.get(); } + + /** + * @throws Exception If failed. + */ + @Test + public void testCacheRestartIsAllowedOnlyToItsInititator() throws Exception { + IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); + + CacheConfiguration ccfg = new CacheConfiguration("testCacheRestartIsAllowedOnlyToItsInititator"); + + kernal.createCache(ccfg); + + IgniteUuid restartId = IgniteUuid.randomUuid(); + + kernal.context().cache().dynamicDestroyCache(ccfg.getName(), false, true, true, restartId) + .get(getTestTimeout(), TimeUnit.MILLISECONDS); + + try { + kernal.createCache(ccfg); + + fail(); + } + catch (Exception e) { + assertTrue(X.hasCause(e, CacheExistsException.class)); + + System.out.println("User couldn't start new cache with the same name"); + } + + try { + kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, false, true).get(); + + fail(); + } + catch (Exception e) { + assertTrue(X.hasCause(e, CacheExistsException.class)); + + System.out.println("We couldn't start new cache with private API"); + } + + StoredCacheData storedCacheData = new StoredCacheData(ccfg); + + try { + kernal.context().cache().dynamicStartCachesByStoredConf(Collections.singleton(storedCacheData), true, true, false, IgniteUuid.randomUuid()).get(); + + fail(); + } + catch (Exception e) { + assertTrue(X.hasCause(e, CacheExistsException.class)); + + System.out.println("We couldn't start new cache with wrong restart id."); + } + + kernal.context().cache().dynamicStartCachesByStoredConf(Collections.singleton(storedCacheData), true, true, false, restartId).get(); + + System.out.println("We successfully restarted cache with initial restartId."); + + kernal.destroyCache(ccfg.getName()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java index f58e110..b4dae39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java @@ -134,7 +134,7 @@ public class IgnitePdsCacheStartStopWithFreqCheckpointTest extends GridCommonAbs try { // Stop cache without destroy. - crd.context().cache().dynamicDestroyCaches(cacheNames, false, false, false).get(); + crd.context().cache().dynamicDestroyCaches(cacheNames, false,false).get(); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to destroy cache", e);