Repository: ignite Updated Branches: refs/heads/master 72335ae9b -> 6a09c4e20
IGNITE-5960 Fixed race on continuous query registration and entry update. Fixes #2728. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a09c4e2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a09c4e2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a09c4e2 Branch: refs/heads/master Commit: 6a09c4e204de2c88580249ca43be3c01fb379163 Parents: 72335ae Author: Sunny Chan <[email protected]> Authored: Mon Sep 10 12:53:28 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Sep 10 12:53:28 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 36 ++++++++++++++++++ .../continuous/CacheContinuousQueryManager.java | 39 +++++++++++++++++--- 2 files changed, 69 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a09c4e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 714d4a0..4806733 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; @@ -184,6 +185,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @GridToStringExclude private final ReentrantLock lock = new ReentrantLock(); + /** Read Lock for continuous query listener */ + @GridToStringExclude + private final Lock listenerLock; + /** * Flags: * <ul> @@ -212,6 +217,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.key = key; this.hash = key.hashCode(); this.cctx = cctx; + this.listenerLock = cctx.continuousQueries().getListenerReadLock(); ver = GridCacheVersionManager.START_VER; } @@ -1371,6 +1377,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridLongList mvccWaitTxs = null; + lockListenerReadLock(); lockEntry(); try { @@ -1524,6 +1531,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + unlockListenerReadLock(); } onUpdateFinished(updateCntr0); @@ -1602,6 +1610,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridLongList mvccWaitTxs = null; + lockListenerReadLock(); lockEntry(); try { @@ -1756,6 +1765,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + unlockListenerReadLock(); } if (deferred) @@ -1818,6 +1828,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessorResult<Object> invokeRes = null; + lockListenerReadLock(); lockEntry(); try { @@ -2133,6 +2144,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + unlockListenerReadLock(); } return new GridTuple3<>(res, @@ -2182,6 +2194,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (!primary && !isNear()) ensureFreeSpace(); + lockListenerReadLock(); lockEntry(); try { @@ -2443,6 +2456,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + unlockListenerReadLock(); } onUpdateFinished(c.updateRes.updateCounter()); @@ -3197,6 +3211,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheVersion oldVer = null; + lockListenerReadLock(); lockEntry(); try { @@ -3380,6 +3395,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } finally { unlockEntry(); + unlockListenerReadLock(); // It is necessary to execute these callbacks outside of lock to avoid deadlocks. @@ -4852,6 +4868,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme lock.unlock(); } + /** + * This method would obtain read lock for continuous query listener setup. This + * is to prevent race condition between entry update and continuous query setup. + * You should make sure you obtain this read lock first before locking the entry + * in order to ensure that the entry update is completed and existing continuous + * query notified before the next cache listener update + */ + private void lockListenerReadLock() { + listenerLock.lock(); + } + + /** + * unlock the listener read lock + * + * @see #lockListenerReadLock() + */ + private void unlockListenerReadLock() { + listenerLock.unlock(); + } + /** {@inheritDoc} */ @Override public boolean lockedByCurrentThread() { return lock.isHeldByCurrentThread(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6a09c4e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 996e7f4..0470fad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import javax.cache.Cache; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; @@ -64,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; @@ -126,6 +128,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** Ordered topic prefix. */ private String topicPrefix; + /** ReadWriteLock to control the continuous query setup - this is to prevent the race between cache update and listener setup */ + private final StripedCompositeReadWriteLock listenerLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ; + /** Cancelable future task for backup cleaner */ private GridTimeoutProcessor.CancelableTask cancelableTask; @@ -195,6 +200,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * Obtain the listener read lock, which must be held if any component need to + * read the list listener (generally caller to updateListener). + * + * @return Read lock for the listener update + */ + public Lock getListenerReadLock() { + return listenerLock.readLock(); + } + + /** * @param lsnrs Listeners to notify. * @param key Entry key. * @param partId Partition id. @@ -884,9 +899,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param internal Internal flag. * @return Whether listener was actually registered. */ - GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, + GridContinuousHandler.RegisterStatus registerListener( + UUID lsnrId, CacheContinuousQueryListener lsnr, - boolean internal) { + boolean internal + ) { boolean added; if (internal) { @@ -896,7 +913,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { intLsnrCnt.incrementAndGet(); } else { - synchronized (this) { + listenerLock.writeLock().lock(); + + try { if (lsnrCnt.get() == 0) { if (cctx.group().sharedGroup() && !cctx.isLocal()) cctx.group().addCacheWithContinuousQuery(cctx); @@ -907,13 +926,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (added) lsnrCnt.incrementAndGet(); } + finally { + listenerLock.writeLock().unlock(); + } if (added) lsnr.onExecution(); } - return added ? GridContinuousHandler.RegisterStatus.REGISTERED : - GridContinuousHandler.RegisterStatus.NOT_REGISTERED; + return added ? GridContinuousHandler.RegisterStatus.REGISTERED + : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** @@ -931,7 +953,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } else { - synchronized (this) { + listenerLock.writeLock().lock(); + + try { if ((lsnr = lsnrs.remove(id)) != null) { int cnt = lsnrCnt.decrementAndGet(); @@ -939,6 +963,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cctx.group().removeCacheWithContinuousQuery(cctx); } } + finally { + listenerLock.writeLock().unlock(); + } if (lsnr != null) lsnr.onUnregister();
