Repository: ignite Updated Branches: refs/heads/ignite-5960 a7ee714e6 -> de730823b
Minor Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de730823 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de730823 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de730823 Branch: refs/heads/ignite-5960 Commit: de730823bfb91e948fc35ac2fb0e3191b3b27ee9 Parents: a7ee714 Author: Alexey Goncharuk <[email protected]> Authored: Fri Sep 7 16:51:19 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Sep 7 16:51:19 2018 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryManager.java | 23 ++++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/de730823/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 50c54d6..0470fad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -899,8 +899,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param internal Internal flag. * @return Whether listener was actually registered. */ - GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, CacheContinuousQueryListener lsnr, - boolean internal) { + GridContinuousHandler.RegisterStatus registerListener( + UUID lsnrId, + CacheContinuousQueryListener lsnr, + boolean internal + ) { boolean added; if (internal) { @@ -908,8 +911,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (added) intLsnrCnt.incrementAndGet(); - } else { + } + else { listenerLock.writeLock().lock(); + try { if (lsnrCnt.get() == 0) { if (cctx.group().sharedGroup() && !cctx.isLocal()) @@ -920,7 +925,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (added) lsnrCnt.incrementAndGet(); - } finally { + } + finally { listenerLock.writeLock().unlock(); } @@ -929,7 +935,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } return added ? GridContinuousHandler.RegisterStatus.REGISTERED - : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; + : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** @@ -945,8 +951,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.onUnregister(); } - } else { + } + else { listenerLock.writeLock().lock(); + try { if ((lsnr = lsnrs.remove(id)) != null) { int cnt = lsnrCnt.decrementAndGet(); @@ -954,7 +962,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal()) cctx.group().removeCacheWithContinuousQuery(cctx); } - } finally { + } + finally { listenerLock.writeLock().unlock(); }
