Repository: ignite Updated Branches: refs/heads/ignite-zk 74526d19e -> be7ae489b
Minor changes in datastructures - Fixes #3129. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f08c9d3a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f08c9d3a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f08c9d3a Branch: refs/heads/ignite-zk Commit: f08c9d3ada4139df6835188f327a4181a0470b61 Parents: 6101fde Author: Ilya Lantukh <[email protected]> Authored: Mon Dec 4 11:04:41 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Dec 4 11:04:41 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 34 +++- .../AtomicDataStructureProxy.java | 189 +++++++++++++++++++ .../datastructures/DataStructuresProcessor.java | 20 ++ .../datastructures/GridCacheAtomicLongImpl.java | 120 ++---------- .../GridCacheAtomicReferenceImpl.java | 114 ++--------- .../GridCacheAtomicSequenceImpl.java | 113 ++--------- .../GridCacheAtomicStampedImpl.java | 115 ++--------- .../GridCacheCountDownLatchImpl.java | 73 +------ .../datastructures/GridCacheLockImpl.java | 91 ++------- .../datastructures/GridCacheRemovable.java | 6 + .../datastructures/GridCacheSemaphoreImpl.java | 85 ++------- .../AtomicCacheAffinityConfigurationTest.java | 7 +- 12 files changed, 348 insertions(+), 619 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 99ff4cb..dcc3d13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1676,6 +1676,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Gets a collection of currently started public cache names. + * + * @return Collection of currently started public cache names + */ + public Collection<String> publicAndDsCacheNames() { + return F.viewReadOnly(cacheDescriptors().values(), + new IgniteClosure<DynamicCacheDescriptor, String>() { + @Override public String apply(DynamicCacheDescriptor desc) { + return desc.cacheConfiguration().getName(); + } + }, + new IgnitePredicate<DynamicCacheDescriptor>() { + @Override public boolean apply(DynamicCacheDescriptor desc) { + return desc.cacheType().userCache() || desc.cacheType() == CacheType.DATA_STRUCTURES; + } + } + ); + } + + /** * Gets cache mode. * * @param cacheName Cache name to check. @@ -1852,8 +1872,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName()); - if (!disabledAfterStart && proxy != null && proxy.isRestarting()) + if (!disabledAfterStart && proxy != null && proxy.isRestarting()) { proxy.onRestarted(cacheCtx, cache); + + if (cacheCtx.dataStructuresCache()) + ctx.dataStructures().restart(proxy.internalProxy()); + } } /** @@ -1874,6 +1898,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches.get(proxy.getName()).active(true); proxy.onRestarted(cacheCtx, cacheCtx.cache()); + + if (cacheCtx.dataStructuresCache()) + ctx.dataStructures().restart(proxy.internalProxy()); } } } @@ -1977,6 +2004,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Break the proxy before exchange future is done. if (req.restart()) { + if (DataStructuresProcessor.isDataStructureCache(req.cacheName())) + ctx.dataStructures().suspend(req.cacheName()); + GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); if (cache != null) @@ -2668,6 +2698,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { ct = CacheType.UTILITY; else if (internalCaches.contains(ccfg.getName())) ct = CacheType.INTERNAL; + else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName())) + ct = CacheType.DATA_STRUCTURES; else ct = CacheType.USER; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java new file mode 100644 index 0000000..214672a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import org.apache.ignite.IgniteCacheRestartingException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.internal.U; + +public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValue> + implements GridCacheRemovable,IgniteChangeGlobalStateSupport { + /** Logger. */ + protected IgniteLogger log; + + /** Removed flag. */ + protected volatile boolean rmvd; + + /** Suspended future. */ + private volatile GridFutureAdapter<Void> suspendFut; + + /** Check removed flag. */ + private boolean rmvCheck; + + /** Structure name. */ + protected String name; + + /** Structure key. */ + protected GridCacheInternalKey key; + + /** Structure projection. */ + protected IgniteInternalCache<GridCacheInternalKey, V> cacheView; + + /** Cache context. */ + protected volatile GridCacheContext<GridCacheInternalKey, V> ctx; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public AtomicDataStructureProxy() { + // No-op. + } + + /** + * Default constructor. + * + * @param name Structure name. + * @param key Structure key. + * @param cacheView Cache projection. + */ + public AtomicDataStructureProxy(String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, V> cacheView) + { + assert key != null; + assert cacheView != null; + + this.ctx = cacheView.context(); + this.key = key; + this.cacheView = cacheView; + this.name = name; + + log = ctx.logger(getClass()); + } + + /** {@inheritDoc} */ + public String name() { + return name; + } + + /** {@inheritDoc} */ + public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + public boolean removed() { + return rmvd; + } + + /** + * Check removed status. + * + * @throws IllegalStateException If removed. + */ + protected void checkRemoved() throws IllegalStateException { + if (rmvd) + throw removedError(); + + GridFutureAdapter<Void> suspendFut0 = suspendFut; + + if (suspendFut0 != null && !suspendFut0.isDone()) + throw suspendedError(); + + if (rmvCheck) { + try { + rmvd = cacheView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Sequence was removed from cache: " + name); + } + + /** + * @return Error. + */ + private IllegalStateException suspendedError() { + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(suspendFut), "Underlying cache is restarting: " + ctx.name()); + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void needCheckNotRemoved() { + rmvCheck = true; + } + + /** {@inheritDoc} */ + @Override public void suspend() { + suspendFut = new GridFutureAdapter<>(); + } + + /** {@inheritDoc} */ + @Override public void restart(IgniteInternalCache cache) { + invalidateLocalState(); + + cacheView = cache; + ctx = cache.context(); + rmvCheck = true; + suspendFut.onDone(); + } + + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { + this.ctx = kctx.cache().<GridCacheInternalKey, V>context().cacheContext(ctx.cacheId()); + this.cacheView = ctx.cache(); + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) { + // No-op. + } + + protected void invalidateLocalState() { + // No-op + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index b26acdd..acd8c11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.datastructures; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -73,6 +74,7 @@ import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPR; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -663,6 +665,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen }); } + public void suspend(String cacheName) { + for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) { + String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName(); + + if (cacheName0.equals(cacheName)) + e.getValue().suspend(); + } + } + + public void restart(IgniteInternalCache cache) { + for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) { + String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName(); + + if (cacheName0.equals(cache.name())) + e.getValue().restart(cache); + } + } + /** * Gets an atomic reference from cache or creates one if it's not cached. * http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 0bc0c63..8e6f913 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -40,7 +40,8 @@ import org.apache.ignite.lang.IgniteBiTuple; /** * Cache atomic long implementation. */ -public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheAtomicLongImpl extends AtomicDataStructureProxy<GridCacheAtomicLongValue> + implements GridCacheAtomicLongEx, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -52,24 +53,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } }; - /** Atomic long name. */ - private String name; - - /** Removed flag.*/ - private volatile boolean rmvd; - - /** Check removed flag. */ - private boolean rmvCheck; - - /** Atomic long key. */ - private GridCacheInternalKey key; - - /** Atomic long projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheAtomicLongValue> ctx; - /** * Empty constructor required by {@link Externalizable}. */ @@ -87,19 +70,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView) { - assert key != null; - assert atomicView != null; - assert name != null; - - this.ctx = atomicView.context(); - this.key = key; - this.atomicView = atomicView; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; + super(name, key, atomicView); } /** {@inheritDoc} */ @@ -107,7 +78,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - GridCacheAtomicLongValue val = atomicView.get(key); + GridCacheAtomicLongValue val = cacheView.get(key); if (val == null) throw new IgniteException("Failed to find atomic long: " + name); @@ -124,7 +95,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try{ - EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE); + EntryProcessorResult<Long> res = cacheView.invoke(key, IncrementAndGetProcessor.INSTANCE); assert res != null && res.get() != null : res; @@ -143,7 +114,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE); + EntryProcessorResult<Long> res = cacheView.invoke(key, GetAndIncrementProcessor.INSTANCE); assert res != null && res.get() != null : res; @@ -162,7 +133,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l)); + EntryProcessorResult<Long> res = cacheView.invoke(key, new AddAndGetProcessor(l)); assert res != null && res.get() != null : res; @@ -181,7 +152,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l)); + EntryProcessorResult<Long> res = cacheView.invoke(key, new GetAndAddProcessor(l)); assert res != null && res.get() != null : res; @@ -200,7 +171,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE); + EntryProcessorResult<Long> res = cacheView.invoke(key, DecrementAndGetProcessor.INSTANCE); assert res != null && res.get() != null : res; @@ -219,7 +190,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE); + EntryProcessorResult<Long> res = cacheView.invoke(key, GetAndDecrementProcessor.INSTANCE); assert res != null && res.get() != null : res; @@ -238,7 +209,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l)); + EntryProcessorResult<Long> res = cacheView.invoke(key, new GetAndSetProcessor(l)); assert res != null && res.get() != null : res; @@ -257,7 +228,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); + EntryProcessorResult<Long> res = cacheView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); assert res != null && res.get() != null : res; @@ -280,7 +251,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); + EntryProcessorResult<Long> res = cacheView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); assert res != null && res.get() != null : res; @@ -294,60 +265,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } } - /** - * Check removed flag. - * - * @throws IllegalStateException If removed. - */ - private void checkRemoved() throws IllegalStateException { - if (rmvd) - throw removedError(); - - if (rmvCheck) { - try { - rmvd = atomicView.get(key) == null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - - rmvCheck = false; - - if (rmvd) { - ctx.kernalContext().dataStructures().onRemoved(key, this); - - throw removedError(); - } - } - } - - /** - * @return Error. - */ - private IllegalStateException removedError() { - return new IllegalStateException("Atomic long was removed from cache: " + name); - } - - /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ - @Override public void needCheckNotRemoved() { - rmvCheck = true; - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - /** {@inheritDoc} */ @Override public void close() { if (rmvd) @@ -362,17 +279,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheAtomicLongValue>context().cacheContext(ctx.cacheId()); - this.atomicView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); out.writeUTF(name); http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 42f16f2..df126d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -31,7 +31,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; @@ -47,7 +46,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Cache atomic reference implementation. */ -public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicReferenceEx<T>, +public final class GridCacheAtomicReferenceImpl<T> extends AtomicDataStructureProxy<GridCacheAtomicReferenceValue<T>> implements GridCacheAtomicReferenceEx<T>, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -60,24 +59,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } }; - /** Atomic reference name. */ - private String name; - - /** Status.*/ - private volatile boolean rmvd; - - /** Check removed flag. */ - private boolean rmvCheck; - - /** Atomic reference key. */ - private GridCacheInternalKey key; - - /** Atomic reference projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> ctx; - /** * Empty constructor required by {@link Externalizable}. */ @@ -95,14 +76,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef public GridCacheAtomicReferenceImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView) { - assert key != null; - assert atomicView != null; - assert name != null; - - this.ctx = atomicView.context(); - this.key = key; - this.atomicView = atomicView; - this.name = name; + super(name, key, atomicView); } /** {@inheritDoc} */ @@ -115,7 +89,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef checkRemoved(); try { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + GridCacheAtomicReferenceValue<T> ref = cacheView.get(key); if (ref == null) throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); @@ -133,17 +107,17 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef try { if (ctx.dataStructures().knownType(val)) - atomicView.invoke(key, new ReferenceSetEntryProcessor<>(val)); + cacheView.invoke(key, new ReferenceSetEntryProcessor<>(val)); else { CU.retryTopologySafe(new Callable<Void>() { @Override public Void call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue<T> ref = cacheView.get(key); if (ref == null) throw new IgniteException("Failed to find atomic reference with given name: " + name); - atomicView.put(key, new GridCacheAtomicReferenceValue<>(val)); + cacheView.put(key, new GridCacheAtomicReferenceValue<>(val)); tx.commit(); } @@ -168,7 +142,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef try { if (ctx.dataStructures().knownType(expVal) && ctx.dataStructures().knownType(newVal)) { EntryProcessorResult<Boolean> res = - atomicView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal)); + cacheView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal)); assert res != null && res.get() != null : res; @@ -177,8 +151,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef else { return CU.retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue<T> ref = cacheView.get(key); if (ref == null) throw new IgniteException("Failed to find atomic reference with given name: " + name); @@ -188,7 +162,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef if (!F.eq(expVal, curVal)) return false; else { - atomicView.put(key, new GridCacheAtomicReferenceValue<>(newVal)); + cacheView.put(key, new GridCacheAtomicReferenceValue<>(newVal)); tx.commit(); @@ -220,7 +194,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef try { if (ctx.dataStructures().knownType(expVal) && ctx.dataStructures().knownType(newVal)) { EntryProcessorResult<T> res = - atomicView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal)); + cacheView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal)); assert res != null; @@ -229,8 +203,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef else { return CU.retryTopologySafe(new Callable<T>() { @Override public T call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue<T> ref = cacheView.get(key); if (ref == null) throw new IgniteException("Failed to find atomic reference with given name: " + name); @@ -240,7 +214,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef if (!F.eq(expVal, curVal)) return curVal; else { - atomicView.put(key, new GridCacheAtomicReferenceValue<>(newVal)); + cacheView.put(key, new GridCacheAtomicReferenceValue<>(newVal)); tx.commit(); @@ -260,26 +234,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ - @Override public void needCheckNotRemoved() { - rmvCheck = true; - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ @Override public void close() { if (rmvd) return; @@ -292,44 +246,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } } - /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>>context().cacheContext(ctx.cacheId()); - this.atomicView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** - * Check removed status. - * - * @throws IllegalStateException If removed. - */ - private void checkRemoved() throws IllegalStateException { - if (rmvd) - throw removedError(); - - if (rmvCheck) { - try { - rmvd = atomicView.get(key) == null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - - rmvCheck = false; - - if (rmvd) { - ctx.kernalContext().dataStructures().onRemoved(key, this); - - throw removedError(); - } - } - } - /** * @return Error. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 019de3c..fd4db4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -26,6 +26,7 @@ import java.io.ObjectStreamException; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -34,6 +35,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -49,7 +52,8 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Cache sequence implementation. */ -public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<GridCacheAtomicSequenceValue> + implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -61,27 +65,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } }; - /** Logger. */ - private IgniteLogger log; - - /** Sequence name. */ - private String name; - - /** Removed flag. */ - private volatile boolean rmvd; - - /** Check removed flag. */ - private boolean rmvCheck; - - /** Sequence key. */ - private GridCacheInternalKey key; - - /** Sequence projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView; - - /** Cache context. */ - private volatile GridCacheContext<GridCacheInternalKey, GridCacheAtomicSequenceValue> ctx; - /** Local value of sequence. */ @GridToStringInclude(sensitive = true) private volatile long locVal; @@ -131,24 +114,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc long locVal, long upBound) { - assert key != null; - assert seqView != null; + super(name, key, seqView); + assert locVal <= upBound; this.batchSize = batchSize; - this.ctx = seqView.context(); - this.key = key; - this.seqView = seqView; this.upBound = upBound; this.locVal = locVal; - this.name = name; - - log = ctx.logger(getClass()); - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; } /** {@inheritDoc} */ @@ -291,58 +263,10 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } } - /** - * Check removed status. - * - * @throws IllegalStateException If removed. - */ - private void checkRemoved() throws IllegalStateException { - if (rmvd) - throw removedError(); - - if (rmvCheck) { - try { - rmvd = seqView.get(key) == null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - - rmvCheck = false; - - if (rmvd) { - ctx.kernalContext().dataStructures().onRemoved(key, this); - - throw removedError(); - } - } - } - - /** - * @return Error. - */ - private IllegalStateException removedError() { - return new IllegalStateException("Sequence was removed from cache: " + name); - } - - /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - /** {@inheritDoc} */ - @Override public void needCheckNotRemoved() { - rmvCheck = true; - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; + @Override protected void invalidateLocalState() { + locVal = 0; + upBound = -1; } /** {@inheritDoc} */ @@ -371,8 +295,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc @Override public Long call() throws Exception { assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread(); - try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seq = seqView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seq = cacheView.get(key); checkRemoved(); @@ -428,7 +352,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc // Global counter must be more than reserved upper bound. seq.set(newUpBound + 1); - seqView.put(key, seq); + cacheView.put(key, seq); tx.commit(); @@ -444,17 +368,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) { - ctx = kctx.cache().<GridCacheInternalKey, GridCacheAtomicSequenceValue>context().cacheContext(ctx.cacheId()); - seqView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); out.writeUTF(name); http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index ed7a225..70f3b48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -47,7 +47,8 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Cache atomic stamped implementation. */ -public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicStampedEx<T, S>, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheAtomicStampedImpl<T, S> extends AtomicDataStructureProxy<GridCacheAtomicStampedValue<T, S>> + implements GridCacheAtomicStampedEx<T, S>, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -59,24 +60,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } }; - /** Atomic stamped name. */ - private String name; - - /** Removed flag.*/ - private volatile boolean rmvd; - - /** Check removed flag. */ - private boolean rmvCheck; - - /** Atomic stamped key. */ - private GridCacheInternalKey key; - - /** Atomic stamped projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> ctx; - /** * Empty constructor required by {@link Externalizable}. */ @@ -94,19 +77,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView) { - assert key != null; - assert atomicView != null; - assert name != null; - - this.ctx = atomicView.context(); - this.key = key; - this.atomicView = atomicView; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; + super(name, key, atomicView); } /** {@inheritDoc} */ @@ -114,7 +85,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + GridCacheAtomicStampedValue<T, S> stmp = cacheView.get(key); if (stmp == null) throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); @@ -132,17 +103,17 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt try { if (ctx.dataStructures().knownType(val) && ctx.dataStructures().knownType(stamp)) - atomicView.invoke(key, new StampedSetEntryProcessor<>(val, stamp)); + cacheView.invoke(key, new StampedSetEntryProcessor<>(val, stamp)); else { CU.retryTopologySafe(new Callable<Void>() { @Override public Void call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue<T, S> ref = atomicView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicStampedValue<T, S> ref = cacheView.get(key); if (ref == null) throw new IgniteException("Failed to find atomic stamped with given name: " + name); - atomicView.put(key, new GridCacheAtomicStampedValue<>(val, stamp)); + cacheView.put(key, new GridCacheAtomicStampedValue<>(val, stamp)); tx.commit(); } @@ -170,7 +141,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt ctx.dataStructures().knownType(expStamp) && ctx.dataStructures().knownType(newStamp)) { EntryProcessorResult<Boolean> res = - atomicView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp)); + cacheView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp)); assert res != null && res.get() != null : res; @@ -179,14 +150,14 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt else { return CU.retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue<T, S> val = atomicView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicStampedValue<T, S> val = cacheView.get(key); if (val == null) throw new IgniteException("Failed to find atomic stamped with given name: " + name); if (F.eq(expVal, val.value()) && F.eq(expStamp, val.stamp())) { - atomicView.put(key, new GridCacheAtomicStampedValue<>(newVal, newStamp)); + cacheView.put(key, new GridCacheAtomicStampedValue<>(newVal, newStamp)); tx.commit(); @@ -212,7 +183,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + GridCacheAtomicStampedValue<T, S> stmp = cacheView.get(key); if (stmp == null) throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); @@ -229,7 +200,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + GridCacheAtomicStampedValue<T, S> stmp = cacheView.get(key); if (stmp == null) throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); @@ -242,26 +213,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ - @Override public void needCheckNotRemoved() { - rmvCheck = true; - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ @Override public void close() { if (rmvd) return; @@ -310,44 +261,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } /** - * Check removed status. - * - * @throws IllegalStateException If removed. - */ - private void checkRemoved() throws IllegalStateException { - if (rmvd) - throw removedError(); - - if (rmvCheck) { - try { - rmvd = atomicView.get(key) == null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - - rmvCheck = false; - - if (rmvd) { - ctx.kernalContext().dataStructures().onRemoved(key, this); - - throw removedError(); - } - } - } - - /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>>context().cacheContext(ctx.cacheId()); - this.atomicView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** * @return Error. */ private IllegalStateException removedError() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 7f331c3..72311c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -47,7 +47,8 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Cache count down latch implementation. */ -public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatchEx, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheCountDownLatchImpl extends AtomicDataStructureProxy<GridCacheCountDownLatchValue> + implements GridCacheCountDownLatchEx, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -68,24 +69,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } }; - /** Logger. */ - private IgniteLogger log; - - /** Latch name. */ - private String name; - - /** Removed flag.*/ - private volatile boolean rmvd; - - /** Latch key. */ - private GridCacheInternalKey key; - - /** Latch projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> latchView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheCountDownLatchValue> ctx; - /** Initial count. */ private int initCnt; @@ -126,30 +109,20 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> latchView) { + super(name, key, latchView); + assert name != null; - assert initCnt >= 0; assert key != null; assert latchView != null; - this.name = name; this.initCnt = initCnt; this.autoDel = autoDel; - this.key = key; - this.latchView = latchView; - this.ctx = latchView.context(); - - log = ctx.logger(getClass()); - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; } /** {@inheritDoc} */ @Override public int count() { try { - GridCacheCountDownLatchValue latchVal = latchView.get(key); + GridCacheCountDownLatchValue latchVal = cacheView.get(key); return latchVal == null ? 0 : latchVal.get(); } @@ -225,26 +198,11 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ @Override public void needCheckNotRemoved() { // No-op. } /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ @Override public void onUpdate(int cnt) { assert cnt >= 0; @@ -280,8 +238,8 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc try { internalLatch = retryTopologySafe(new Callable<CountDownLatch>() { @Override public CountDownLatch call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = latchView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue val = cacheView.get(key); if (val == null) { if (log.isDebugEnabled()) @@ -334,17 +292,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheCountDownLatchValue>context().cacheContext(ctx.cacheId()); - this.latchView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); out.writeUTF(name); @@ -402,8 +349,8 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public Integer call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue latchVal = latchView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue latchVal = cacheView.get(key); if (latchVal == null) { if (log.isDebugEnabled()) @@ -425,7 +372,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc latchVal.set(retVal); - latchView.put(key, latchVal); + cacheView.put(key, latchVal); tx.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index fac7eaf..f677ff5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -67,31 +67,14 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Cache reentrant lock implementation based on AbstractQueuedSynchronizer. */ -public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheLockImpl extends AtomicDataStructureProxy<GridCacheLockState> + implements GridCacheLockEx, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; /** Deserialization stash. */ private static final ThreadLocal<String> stash = new ThreadLocal<>(); - /** Logger. */ - private IgniteLogger log; - - /** Reentrant lock name. */ - private String name; - - /** Removed flag. */ - private volatile boolean rmvd; - - /** Reentrant lock key. */ - private GridCacheInternalKey key; - - /** Reentrant lock projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheLockState> ctx; - /** Initialization guard. */ private final AtomicBoolean initGuard = new AtomicBoolean(); @@ -513,7 +496,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo } final boolean isLocked() throws IgniteCheckedException { - return getState() != 0 || lockView.get(key).get() != 0; + return getState() != 0 || cacheView.get(key).get() != 0; } /** @@ -524,8 +507,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo try { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheLockState val = lockView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = cacheView.get(key); if (val == null) throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); @@ -555,7 +538,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo val.setChanged(true); - lockView.put(key, val); + cacheView.put(key, val); tx.commit(); @@ -609,8 +592,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo try { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheLockState val = lockView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = cacheView.get(key); if (val == null) throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); @@ -622,7 +605,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo val.setChanged(false); - lockView.put(key, val); + cacheView.put(key, val); tx.commit(); @@ -640,7 +623,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo nodes.removeLastOccurrence(thisNode); - lockView.put(key, val); + cacheView.put(key, val); tx.commit(); @@ -705,8 +688,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo try { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheLockState val = lockView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = cacheView.get(key); if (val == null) throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); @@ -797,7 +780,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo val.setConditionMap(condMap); - lockView.put(key, val); + cacheView.put(key, val); tx.commit(); @@ -1054,16 +1037,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo public GridCacheLockImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView) { - assert name != null; - assert key != null; - assert lockView != null; - - this.name = name; - this.key = key; - this.lockView = lockView; - this.ctx = lockView.context(); - - log = ctx.logger(getClass()); + super(name, key, lockView); } /** @@ -1074,8 +1048,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo try { sync = retryTopologySafe(new Callable<Sync>() { @Override public Sync call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheLockState val = lockView.get(key); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = cacheView.get(key); if (val == null) { if (log.isDebugEnabled()) @@ -1184,11 +1158,6 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo } /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ @Override public void lock() { ctx.kernalContext().gateway().readLock(); @@ -1454,34 +1423,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo } /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ @Override public void needCheckNotRemoved() { - - } - - /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheLockState>context().cacheContext(ctx.cacheId()); - this.lockView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. + // no-op } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java index e222e57..d26a153 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.datastructures; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; + /** * Provides callback for marking object as removed. */ @@ -32,4 +34,8 @@ public interface GridCacheRemovable { * */ public void needCheckNotRemoved(); + + public void suspend(); + + public void restart(IgniteInternalCache cache); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index 4abefc9..9502a6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -59,7 +59,8 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * acquired by the failing node. In case this parameter is false, IgniteInterruptedException is called on every node * waiting on this semaphore. */ -public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, IgniteChangeGlobalStateSupport, Externalizable { +public final class GridCacheSemaphoreImpl extends AtomicDataStructureProxy<GridCacheSemaphoreState> + implements GridCacheSemaphoreEx, IgniteChangeGlobalStateSupport, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -71,24 +72,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit } }; - /** Logger. */ - private IgniteLogger log; - - /** Semaphore name. */ - private String name; - - /** Removed flag. */ - private volatile boolean rmvd; - - /** Semaphore key. */ - private GridCacheInternalKey key; - - /** Semaphore projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView; - - /** Cache context. */ - private GridCacheContext<GridCacheInternalKey, GridCacheSemaphoreState> ctx; - /** Initialization guard. */ private final AtomicBoolean initGuard = new AtomicBoolean(); @@ -290,10 +273,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, - semView, + cacheView, PESSIMISTIC, REPEATABLE_READ) ) { - GridCacheSemaphoreState val = semView.get(key); + GridCacheSemaphoreState val = cacheView.get(key); if (val == null) throw new IgniteCheckedException("Failed to find semaphore with given name: " + @@ -328,7 +311,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit val.setCount(newVal); - semView.put(key, val); + cacheView.put(key, val); tx.commit(); } @@ -370,10 +353,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit @Override public Boolean call() throws Exception { try ( GridNearTxLocal tx = CU.txStartInternal(ctx, - semView, + cacheView, PESSIMISTIC, REPEATABLE_READ) ) { - GridCacheSemaphoreState val = semView.get(key); + GridCacheSemaphoreState val = cacheView.get(key); if (val == null) throw new IgniteCheckedException("Failed to find semaphore with given name: " + @@ -391,7 +374,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit if (broken) { val.setBroken(true); - semView.put(key, val); + cacheView.put(key, val); tx.commit(); @@ -415,7 +398,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit val.setWaiters(map); - semView.put(key, val); + cacheView.put(key, val); sync.nodeMap = map; @@ -457,16 +440,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView ) { - assert name != null; - assert key != null; - assert semView != null; - - this.name = name; - this.key = key; - this.semView = semView; - this.ctx = semView.context(); - - log = ctx.logger(getClass()); + super(name, key, semView); } /** @@ -478,8 +452,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit sync = retryTopologySafe(new Callable<Sync>() { @Override public Sync call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, - semView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheSemaphoreState val = semView.get(key); + cacheView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheSemaphoreState val = cacheView.get(key); if (val == null) { if (log.isDebugEnabled()) @@ -521,26 +495,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit } /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ @Override public void onUpdate(GridCacheSemaphoreState val) { if (sync == null) return; @@ -722,9 +676,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit @Override public Integer call() throws Exception { try ( GridNearTxLocal tx = CU.txStartInternal(ctx, - semView, PESSIMISTIC, REPEATABLE_READ) + cacheView, PESSIMISTIC, REPEATABLE_READ) ) { - GridCacheSemaphoreState val = semView.get(key); + GridCacheSemaphoreState val = cacheView.get(key); if (val == null) throw new IgniteException("Failed to find semaphore with given name: " + name); @@ -962,17 +916,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit } /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.ctx = kctx.cache().<GridCacheInternalKey, GridCacheSemaphoreState>context().cacheContext(ctx.cacheId()); - this.semView = ctx.cache(); - } - - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); out.writeUTF(name); http://git-wip-us.apache.org/repos/asf/ignite/blob/f08c9d3a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java index 623b076..3fd4e1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/AtomicCacheAffinityConfigurationTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.datastructures.AtomicDataStructureProxy; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -58,7 +59,7 @@ public class AtomicCacheAffinityConfigurationTest extends GridCommonAbstractTest IgniteAtomicLong atomic = igniteEx.atomicLong("test", 0, true); - GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, "ctx"); + GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, AtomicDataStructureProxy.class, "ctx"); AffinityFunction aff = cctx.config().getAffinity(); @@ -90,7 +91,7 @@ public class AtomicCacheAffinityConfigurationTest extends GridCommonAbstractTest IgniteAtomicLong atomic = igniteEx.atomicLong("test", 0, true); - GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, "ctx"); + GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, AtomicDataStructureProxy.class, "ctx"); TestAffinityFunction aff = (TestAffinityFunction) cctx.config().getAffinity(); @@ -122,7 +123,7 @@ public class AtomicCacheAffinityConfigurationTest extends GridCommonAbstractTest IgniteAtomicLong atomic = igniteEx.atomicLong("test", 0, true); - GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, "ctx"); + GridCacheContext cctx = GridTestUtils.getFieldValue(atomic, AtomicDataStructureProxy.class, "ctx"); AffinityFunction aff = cctx.config().getAffinity();
