This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-13082 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 1f023f3b0c304c290d76ceab5f99b503d96779c2 Author: Alexey Goncharuk <[email protected]> AuthorDate: Mon Oct 28 15:16:19 2019 +0300 IGNITE-13082 Fix deadlock between topology update and CQ registration. --- .../continuous/CacheContinuousQueryHandler.java | 14 +++++++++++ .../continuous/CacheContinuousQueryListener.java | 12 +++++++++- .../continuous/CacheContinuousQueryManager.java | 28 +++++++++++++++------- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 970116c..cc672c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -392,6 +392,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { + @Override public void onBeforeRegister() { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx != null && !cctx.isLocal()) + cctx.topology().readLock(); + } + + @Override public void onAfterRegister() { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx != null && !cctx.isLocal()) + cctx.topology().readUnlock(); + } + @Override public void onRegister() { GridCacheContext<K, V> cctx = cacheContext(ctx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index e534fdd..0c13672 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -41,8 +41,18 @@ public interface CacheContinuousQueryListener<K, V> { boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut); /** + * + */ + public void onBeforeRegister(); + + /** + * + */ + public void onAfterRegister(); + + /** * Listener registration callback. - * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock} write lock held. + * NOTE: This method should be called under the {@link CacheGroupContext#listenerLock()}} write lock held. */ public void onRegister(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index ecb32ea..5ce9945 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -57,6 +57,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -982,24 +983,33 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K intLsnrCnt.incrementAndGet(); } else { - cctx.group().listenerLock().writeLock().lock(); + lsnr.onBeforeRegister(); try { - added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + CacheGroupContext grp = cctx.group(); + + grp.listenerLock().writeLock().lock(); + + try { + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; - if (added) { - lsnrCnt.incrementAndGet(); + if (added) { + lsnrCnt.incrementAndGet(); - lsnr.onRegister(); + lsnr.onRegister(); - if (lsnrCnt.get() == 1) { - if (cctx.group().sharedGroup() && !cctx.isLocal()) - cctx.group().addCacheWithContinuousQuery(cctx); + if (lsnrCnt.get() == 1) { + if (grp.sharedGroup() && !cctx.isLocal()) + grp.addCacheWithContinuousQuery(cctx); + } } } + finally { + grp.listenerLock().writeLock().unlock(); + } } finally { - cctx.group().listenerLock().writeLock().unlock(); + lsnr.onAfterRegister(); } }
