Repository: ignite
Updated Branches:
  refs/heads/master 72335ae9b -> 6a09c4e20


IGNITE-5960 Fixed race on continuous query registration and entry update. Fixes 
#2728.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a09c4e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a09c4e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a09c4e2

Branch: refs/heads/master
Commit: 6a09c4e204de2c88580249ca43be3c01fb379163
Parents: 72335ae
Author: Sunny Chan <[email protected]>
Authored: Mon Sep 10 12:53:28 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Sep 10 12:53:28 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 36 ++++++++++++++++++
 .../continuous/CacheContinuousQueryManager.java | 39 +++++++++++++++++---
 2 files changed, 69 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a09c4e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 714d4a0..4806733 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
@@ -184,6 +185,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     @GridToStringExclude
     private final ReentrantLock lock = new ReentrantLock();
 
+    /** Read Lock for continuous query listener */
+    @GridToStringExclude
+    private final Lock listenerLock;
+
     /**
      * Flags:
      * <ul>
@@ -212,6 +217,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         this.key = key;
         this.hash = key.hashCode();
         this.cctx = cctx;
+        this.listenerLock = cctx.continuousQueries().getListenerReadLock();
 
         ver = GridCacheVersionManager.START_VER;
     }
@@ -1371,6 +1377,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         GridLongList mvccWaitTxs = null;
 
+        lockListenerReadLock();
         lockEntry();
 
         try {
@@ -1524,6 +1531,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+            unlockListenerReadLock();
         }
 
         onUpdateFinished(updateCntr0);
@@ -1602,6 +1610,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         GridLongList mvccWaitTxs = null;
 
+        lockListenerReadLock();
         lockEntry();
 
         try {
@@ -1756,6 +1765,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+            unlockListenerReadLock();
         }
 
         if (deferred)
@@ -1818,6 +1828,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         EntryProcessorResult<Object> invokeRes = null;
 
+        lockListenerReadLock();
         lockEntry();
 
         try {
@@ -2133,6 +2144,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+            unlockListenerReadLock();
         }
 
         return new GridTuple3<>(res,
@@ -2182,6 +2194,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (!primary && !isNear())
             ensureFreeSpace();
 
+        lockListenerReadLock();
         lockEntry();
 
         try {
@@ -2443,6 +2456,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+            unlockListenerReadLock();
         }
 
         onUpdateFinished(c.updateRes.updateCounter());
@@ -3197,6 +3211,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         GridCacheVersion oldVer = null;
 
+        lockListenerReadLock();
         lockEntry();
 
         try {
@@ -3380,6 +3395,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         }
         finally {
             unlockEntry();
+            unlockListenerReadLock();
 
             // It is necessary to execute these callbacks outside of lock to 
avoid deadlocks.
 
@@ -4852,6 +4868,26 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         lock.unlock();
     }
 
+    /**
+     * This method would obtain read lock for continuous query listener setup. 
This
+     * is to prevent race condition between entry update and continuous query 
setup.
+     * You should make sure you obtain this read lock first before locking the 
entry
+     * in order to ensure that the entry update is completed and existing 
continuous
+     * query notified before the next cache listener update
+     */
+    private void lockListenerReadLock() {
+        listenerLock.lock();
+    }
+
+    /**
+     * unlock the listener read lock
+     *
+     * @see #lockListenerReadLock()
+     */
+    private void unlockListenerReadLock() {
+        listenerLock.unlock();
+    }
+
     /** {@inheritDoc} */
     @Override public boolean lockedByCurrentThread() {
         return lock.isHeldByCurrentThread();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a09c4e2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 996e7f4..0470fad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
@@ -64,6 +65,7 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
@@ -126,6 +128,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     /** Ordered topic prefix. */
     private String topicPrefix;
 
+    /** ReadWriteLock to control the continuous query setup - this is to 
prevent the race between cache update and listener setup */
+    private final StripedCompositeReadWriteLock listenerLock = new 
StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ;
+
     /** Cancelable future task for backup cleaner */
     private GridTimeoutProcessor.CancelableTask cancelableTask;
 
@@ -195,6 +200,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     }
 
     /**
+     * Obtain the listener read lock, which must be held if any component need 
to
+     * read the list listener (generally caller to updateListener).
+     *
+     * @return Read lock for the listener update
+     */
+    public Lock getListenerReadLock() {
+        return listenerLock.readLock();
+    }
+
+    /**
      * @param lsnrs Listeners to notify.
      * @param key Entry key.
      * @param partId Partition id.
@@ -884,9 +899,11 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
      * @param internal Internal flag.
      * @return Whether listener was actually registered.
      */
-    GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId,
+    GridContinuousHandler.RegisterStatus registerListener(
+        UUID lsnrId,
         CacheContinuousQueryListener lsnr,
-        boolean internal) {
+        boolean internal
+    ) {
         boolean added;
 
         if (internal) {
@@ -896,7 +913,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 intLsnrCnt.incrementAndGet();
         }
         else {
-            synchronized (this) {
+            listenerLock.writeLock().lock();
+
+            try {
                 if (lsnrCnt.get() == 0) {
                     if (cctx.group().sharedGroup() && !cctx.isLocal())
                         cctx.group().addCacheWithContinuousQuery(cctx);
@@ -907,13 +926,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                 if (added)
                     lsnrCnt.incrementAndGet();
             }
+            finally {
+                listenerLock.writeLock().unlock();
+            }
 
             if (added)
                 lsnr.onExecution();
         }
 
-        return added ? GridContinuousHandler.RegisterStatus.REGISTERED :
-            GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
+        return added ? GridContinuousHandler.RegisterStatus.REGISTERED
+            : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
     }
 
     /**
@@ -931,7 +953,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             }
         }
         else {
-            synchronized (this) {
+            listenerLock.writeLock().lock();
+
+            try {
                 if ((lsnr = lsnrs.remove(id)) != null) {
                     int cnt = lsnrCnt.decrementAndGet();
 
@@ -939,6 +963,9 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                         cctx.group().removeCacheWithContinuousQuery(cctx);
                 }
             }
+            finally {
+                listenerLock.writeLock().unlock();
+            }
 
             if (lsnr != null)
                 lsnr.onUnregister();

Reply via email to