Repository: ignite Updated Branches: refs/heads/master 1334e77a7 -> 18fb46e2b
ignite-1787 Added synchronization in CacheConfiguration for cache entry listeners collection Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18fb46e2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18fb46e2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18fb46e2 Branch: refs/heads/master Commit: 18fb46e2bdb5c75dae0d567f1db68bd593aa0355 Parents: 1334e77 Author: sboikov <sboi...@gridgain.com> Authored: Tue Oct 27 12:21:46 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Oct 27 12:21:46 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 28 ++++++++++- .../IgniteCacheEntryListenerAbstractTest.java | 50 +++++++++++++++++++- 2 files changed, 76 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18fb46e2/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 6ac2b64..374743f 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.HashSet; import javax.cache.Cache; +import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.CompleteConfiguration; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableConfiguration; @@ -336,7 +337,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /* No-op. */ } - /** Cache name. */ + /** + * @param name Cache name. + */ public CacheConfiguration(String name) { this.name = name; } @@ -1800,6 +1803,29 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { return this; } + /** {@inheritDoc} */ + @Override public Iterable<CacheEntryListenerConfiguration<K, V>> getCacheEntryListenerConfigurations() { + synchronized (this) { + return new HashSet<>(listenerConfigurations); + } + } + + /** {@inheritDoc} */ + @Override public MutableConfiguration<K, V> addCacheEntryListenerConfiguration( + CacheEntryListenerConfiguration<K, V> cacheEntryLsnrCfg) { + synchronized (this) { + return super.addCacheEntryListenerConfiguration(cacheEntryLsnrCfg); + } + } + + /** {@inheritDoc} */ + @Override public MutableConfiguration<K, V> removeCacheEntryListenerConfiguration( + CacheEntryListenerConfiguration<K, V> cacheEntryLsnrCfg) { + synchronized (this) { + return super.removeCacheEntryListenerConfiguration(cacheEntryLsnrCfg); + } + } + /** * Creates a copy of current configuration and removes all cache entry listeners. * They are executed only locally and should never be sent to remote nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/18fb46e2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 3fdd7fc..8a3d756 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; @@ -366,6 +367,42 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @throws Exception If failed. */ + public void testConcurrentRegisterDeregister() throws Exception { + final int THREADS = 10; + + final CyclicBarrier barrier = new CyclicBarrier(THREADS); + + final IgniteCache<Integer, Integer> cache = jcache(0); + + GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Integer, Integer>>() { + @Override public CacheEntryListener<Integer, Integer> create() { + return new CreateUpdateRemoveExpireListener(); + } + }, + null, + true, + false + ); + + barrier.await(); + + for (int i = 0; i < 200; i++) { + cache.registerCacheEntryListener(cfg); + + cache.deregisterCacheEntryListener(cfg); + } + + return null; + } + }, THREADS, "register-thread").get(); + } + + /** + * @throws Exception If failed. + */ public void testSerialization() throws Exception { if (cacheMode() == LOCAL) return; @@ -951,6 +988,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new CreateUpdateRemoveExpireListener(); } @@ -960,6 +998,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new NoOpCreateUpdateListener(); } @@ -969,6 +1008,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new CreateUpdateListener(); } @@ -978,6 +1018,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new CreateListener(); } @@ -987,6 +1028,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new RemoveListener(); } @@ -996,6 +1038,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new UpdateListener(); } @@ -1005,6 +1048,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + /** {@inheritDoc} */ @Override public CacheEntryListener<Integer, Integer> create() { return new ExpireListener(); } @@ -1024,6 +1068,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> { + /** {@inheritDoc} */ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) onEvent(evt); @@ -1034,6 +1079,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> { + /** {@inheritDoc} */ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) onEvent(evt); @@ -1044,6 +1090,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> { + /** {@inheritDoc} */ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) onEvent(evt); @@ -1054,6 +1101,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> { + /** {@inheritDoc} */ @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) onEvent(evt); @@ -1238,7 +1286,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts) + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { // No-op. }