This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9e3c473  GEODE-7863: Reduce ServerCQImpl Contention (#4798)
9e3c473 is described below

commit 9e3c473e9cf6d72126d0e84319b1192a5e2b6fdb
Author: Juan José Ramos <[email protected]>
AuthorDate: Mon Mar 16 10:33:52 2020 +0000

    GEODE-7863: Reduce ServerCQImpl Contention (#4798)
    
    We don't need to lock the entire internal cache for Partitioned
    regions so the implementation is now split by region type, this will
    us to improve/change them independently in the future.
    
    - Removed redundant checks.
    - Keep current behavior for Replicate Regions.
    - Use ConcurrentMap instead of locking the entire internal cache on
    every operation for Partition Regions.
    - Keep the lock on ServerCQImpl instance only while executing the
    query and leave stats operations outside of the synchronized block.
---
 .../cache/query/internal/cq/CqServiceProvider.java |   2 +-
 .../geode/cache/query/internal/cq/ServerCQ.java    |   6 +
 .../geode/cache/query/cq/internal/CqQueryImpl.java |   2 -
 .../cache/query/cq/internal/CqServiceImpl.java     |  38 ++---
 .../cache/query/cq/internal/ServerCQImpl.java      | 176 +++++----------------
 .../query/cq/internal/ServerCQResultsCache.java    |  48 ++++++
 .../cq/internal/ServerCQResultsCacheNoOpImpl.java  |  68 ++++++++
 .../ServerCQResultsCachePartitionRegionImpl.java   | 133 ++++++++++++++++
 .../ServerCQResultsCacheReplicateRegionImpl.java   | 176 +++++++++++++++++++++
 .../query/cq/internal/command/ExecuteCQ61.java     |   2 +-
 .../cache/query/cq/internal/ServerCQImplTest.java  |   4 +-
 11 files changed, 493 insertions(+), 162 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index 28ecabb..e61c27b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -36,7 +36,7 @@ public class CqServiceProvider {
    */
   @MutableForTesting
   public static boolean MAINTAIN_KEYS = Boolean
-      .valueOf(System.getProperty(GeodeGlossary.GEMFIRE_PREFIX + 
"cq.MAINTAIN_KEYS", "true"));
+      .parseBoolean(System.getProperty(GeodeGlossary.GEMFIRE_PREFIX + 
"cq.MAINTAIN_KEYS", "true"));
 
   /**
    * A debug flag used for testing vMotion during CQ registration
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
index c36cd4b..08bfba3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
@@ -66,6 +66,12 @@ public interface ServerCQ extends InternalCqQuery {
   void setCqResultsCacheInitialized();
 
   /**
+   *
+   * @return true if the CQ Results key cache is initialized.
+   */
+  boolean isCqResultsCacheInitialized();
+
+  /**
    * Returns true if old value is required for query processing.
    */
   boolean isOldValueRequiredForQueryProcessing(Object key);
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
index eed5568..01c3691 100644
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
@@ -56,8 +56,6 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   protected String queryString;
 
-  static final Object TOKEN = new Object();
-
   LocalRegion cqBaseRegion;
 
   protected Query query = null;
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index cb54c99..e93ed98 100644
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -1347,15 +1347,16 @@ public class CqServiceImpl implements CqService {
           boolean error = false;
           {
             try {
-              synchronized (cQuery) {
-                // Apply query on new value.
-                if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
-                  executionStartTime = this.stats.startCqQueryExecution();
+              // Apply query on new value.
+              if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
+                executionStartTime = this.stats.startCqQueryExecution();
 
+                synchronized (cQuery) {
                   b_cqResults_newValue =
                       evaluateQuery(cQuery, new Object[] 
{cqUnfilteredEventsSet_newValue});
-                  this.stats.endCqQueryExecution(executionStartTime);
                 }
+
+                this.stats.endCqQueryExecution(executionStartTime);
               }
 
               // In case of Update, destroy and invalidate.
@@ -1365,7 +1366,7 @@ public class CqServiceImpl implements CqService {
                 // value. Currently the CQ Results are not cached for the
                 // Partitioned Regions. Once this is added remove the check
                 // with PR region.
-                if (cQuery.cqResultKeysInitialized) {
+                if (cQuery.isCqResultsCacheInitialized()) {
                   b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
                   // For PR if not found in cache, apply the query on old 
value.
                   // Also apply if the query was not executed during cq execute
@@ -1390,23 +1391,24 @@ public class CqServiceImpl implements CqService {
                     }
                   }
 
-                  synchronized (cQuery) {
-                    // Apply query on old value.
-                    if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
-                      executionStartTime = this.stats.startCqQueryExecution();
+                  // Apply query on old value.
+                  if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
+                    executionStartTime = this.stats.startCqQueryExecution();
+
+                    synchronized (cQuery) {
                       b_cqResults_oldValue =
                           evaluateQuery(cQuery, new Object[] 
{cqUnfilteredEventsSet_oldValue});
-                      this.stats.endCqQueryExecution(executionStartTime);
-                    } else {
-                      if (isDebugEnabled) {
-                        logger.debug(
-                            "old value for event with key {} is null - query 
execution not performed",
-                            eventKey);
-                      }
+                    }
+
+                    this.stats.endCqQueryExecution(executionStartTime);
+                  } else {
+                    if (isDebugEnabled) {
+                      logger.debug(
+                          "old value for event with key {} is null - query 
execution not performed",
+                          eventKey);
                     }
                   }
                 } // Query oldValue
-
               }
             } catch (Exception ex) {
               // Any exception in running the query should be caught here and
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
index 5e614a2..e48ec2b 100644
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
@@ -17,15 +17,13 @@ package org.apache.geode.cache.query.cq.internal;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializable;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.query.CqAttributes;
@@ -35,7 +33,6 @@ import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.Query;
-import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.geode.cache.query.internal.CompiledBindArgument;
 import org.apache.geode.cache.query.internal.CompiledIteratorDef;
@@ -47,7 +44,6 @@ import 
org.apache.geode.cache.query.internal.cq.CqServiceProvider;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -55,26 +51,14 @@ import 
org.apache.geode.logging.internal.log4j.api.LogService;
 
 public class ServerCQImpl extends CqQueryImpl implements DataSerializable, 
ServerCQ {
   private static final Logger logger = LogService.getLogger();
+  private static final ServerCQResultsCache NO_OP_CACHE = new 
ServerCQResultsCacheNoOpImpl();
 
   /**
-   * This holds the keys that are part of the CQ query results. Using this CQ 
engine can determine
-   * whether to execute query on old value from EntryEvent, which is an 
expensive operation.
-   *
-   * NOTE: In case of RR this map is populated and used as intended. In case 
of PR this map will not
-   * be populated. If executeCQ happens after update operations this map will 
remain empty.
-   */
-  private volatile HashMap<Object, Object> cqResultKeys;
-
-  /**
-   * This maintains the keys that are destroyed while the Results Cache is 
getting constructed. This
-   * avoids any keys that are destroyed (after query execution) but is still 
part of the CQs result.
-   */
-  private HashSet<Object> destroysWhileCqResultsInProgress;
-
-  /**
-   * To indicate if the CQ results key cache is initialized.
+   * NOTE: In case of Replicated Regions this cache is populated and used as 
intended. In case of
+   * Partition Regions this cache will not be populated. If executeCQ happens 
after update
+   * operations this cache will remain empty.
    */
-  public volatile boolean cqResultKeysInitialized = false;
+  private volatile ServerCQResultsCache serverCQResultsCache = NO_OP_CACHE;
 
   /** Boolean flag to see if the CQ is on Partitioned Region */
   volatile boolean isPR = false;
@@ -155,6 +139,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
         if (isDebugEnabled) {
           logger.debug(s, t);
         }
+
         throw new CqException(s);
       }
     }
@@ -165,8 +150,8 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
     this.cqBaseRegion = (LocalRegion) 
cqService.getCache().getRegion(regionName);
     if (this.cqBaseRegion == null) {
       throw new RegionNotFoundException(
-          String.format("Region : %s specified with cq not found. CqName: %s",
-              new Object[] {regionName, this.cqName}));
+          String.format("Region : %s specified with cq not found. CqName: %s", 
regionName,
+              this.cqName));
     }
 
     // Make sure that the region is partitioned or
@@ -232,7 +217,6 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
 
     // Initialize CQ results (key) cache.
     if (CqServiceProvider.MAINTAIN_KEYS) {
-      this.cqResultKeys = new HashMap<>();
       // Currently the CQ Result keys are not cached for the Partitioned
       // Regions. Supporting this with PR needs more work like forcing
       // query execution on primary buckets only; and handling the bucket
@@ -240,9 +224,10 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
       // Only the events which are seen during event processing is
       // added to the results cache (not from the CQ Results).
       if (this.isPR) {
-        this.setCqResultsCacheInitialized();
+        serverCQResultsCache = new ServerCQResultsCachePartitionRegionImpl();
+        setCqResultsCacheInitialized();
       } else {
-        this.destroysWhileCqResultsInProgress = new HashSet<>();
+        serverCQResultsCache = new ServerCQResultsCacheReplicateRegionImpl();
       }
     }
 
@@ -251,9 +236,10 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
         cqService.addToCqMap(this);
       } catch (CqExistsException cqe) {
         // Should not happen.
-        throw new CqException(String.format("Unable to create cq %s Error : 
%s",
-            new Object[] {cqName, cqe.getMessage()}));
+        throw new CqException(
+            String.format("Unable to create cq %s Error : %s", cqName, 
cqe.getMessage()));
       }
+
       this.cqBaseRegion.getFilterProfile().registerCq(this);
     }
   }
@@ -263,14 +249,9 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    *
    * @return CQ Results Cache.
    */
+  @VisibleForTesting
   public Set<Object> getCqResultKeyCache() {
-    if (this.cqResultKeys != null) {
-      synchronized (this.cqResultKeys) {
-        return Collections.synchronizedSet(new 
HashSet<>(this.cqResultKeys.keySet()));
-      }
-    } else {
-      return null;
-    }
+    return serverCQResultsCache.getKeys();
   }
 
   /**
@@ -280,7 +261,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    *
    * @return String modified query.
    */
-  Query constructServerSideQuery() throws QueryException {
+  Query constructServerSideQuery() {
     InternalCache cache = cqService.getInternalCache();
     DefaultQuery locQuery = (DefaultQuery) 
cache.getLocalQueryService().newQuery(this.queryString);
     CompiledSelect select = locQuery.getSimpleSelect();
@@ -292,6 +273,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
     // CompiledRegion
     this.regionName = ((CompiledRegion) 
from.getCollectionExpr()).getRegionPath();
     from.setCollectionExpr(new CompiledBindArgument(1));
+
     return locQuery;
   }
 
@@ -302,108 +284,39 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    * @return true if key is in the Results Cache.
    */
   public boolean isPartOfCqResult(Object key) {
-    // Handle events that may have been deleted,
-    // but added by result caching.
-    if (this.cqResultKeys == null) {
-      logger.warn(
-          "The CQ Result key cache is Null. This should not happen as the call 
to isPartOfCqResult() is based on the condition cqResultsCacheInitialized.");
-      return false;
-    }
-
-    synchronized (this.cqResultKeys) {
-      if (this.destroysWhileCqResultsInProgress != null) {
-        // this.logger.fine("Removing keys from Destroy Cache For CQ :" +
-        // this.cqName + " Keys :" + this.destroysWhileCqResultsInProgress);
-        for (Object k : this.destroysWhileCqResultsInProgress) {
-          this.cqResultKeys.remove(k);
-        }
-        this.destroysWhileCqResultsInProgress = null;
-      }
-      return this.cqResultKeys.containsKey(key);
-    }
+    return serverCQResultsCache.contains(key);
   }
 
   @Override
   public void addToCqResultKeys(Object key) {
-    if (!CqServiceProvider.MAINTAIN_KEYS) {
-      return;
-    }
-
-    if (this.cqResultKeys != null) {
-      synchronized (this.cqResultKeys) {
-        this.cqResultKeys.put(key, TOKEN);
-        if (!this.cqResultKeysInitialized) {
-          // This key could be coming after add, destroy.
-          // Remove this from destroy queue.
-          if (this.destroysWhileCqResultsInProgress != null) {
-            this.destroysWhileCqResultsInProgress.remove(key);
-          }
-        }
-      }
-    }
+    serverCQResultsCache.add(key);
   }
 
   @Override
   public void removeFromCqResultKeys(Object key, boolean isTokenMode) {
-    if (!CqServiceProvider.MAINTAIN_KEYS) {
-      return;
-    }
-    if (this.cqResultKeys != null) {
-      synchronized (this.cqResultKeys) {
-        if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED) {
-          return;
-        }
-        this.cqResultKeys.remove(key);
-        if (!this.cqResultKeysInitialized) {
-          if (this.destroysWhileCqResultsInProgress != null) {
-            this.destroysWhileCqResultsInProgress.add(key);
-          }
-        }
-      }
-    }
+    serverCQResultsCache.remove(key, isTokenMode);
   }
 
   @Override
   public void invalidateCqResultKeys() {
-    if (!CqServiceProvider.MAINTAIN_KEYS) {
-      return;
-    }
-
-    if (this.cqResultKeys != null) {
-      synchronized (this.cqResultKeys) {
-        this.cqResultKeys.clear();
-        this.cqResultKeysInitialized = false;
-      }
-    }
+    serverCQResultsCache.invalidate();
   }
 
   /**
    * Marks the key as destroyed in the CQ Results key cache.
    */
   void markAsDestroyedInCqResultKeys(Object key) {
-    if (!CqServiceProvider.MAINTAIN_KEYS) {
-      return;
-    }
-
-    if (this.cqResultKeys != null) {
-      synchronized (this.cqResultKeys) {
-        this.cqResultKeys.put(key, Token.DESTROYED);
-        if (!this.cqResultKeysInitialized) {
-          // this.logger.fine("Adding key to Destroy Cache For CQ :" +
-          // this.cqName + " key :" + key);
-          if (this.destroysWhileCqResultsInProgress != null) {
-            this.destroysWhileCqResultsInProgress.add(key);
-          }
-        }
-      }
-    }
+    serverCQResultsCache.markAsDestroyed(key);
   }
 
   @Override
   public void setCqResultsCacheInitialized() {
-    if (CqServiceProvider.MAINTAIN_KEYS) {
-      this.cqResultKeysInitialized = true;
-    }
+    serverCQResultsCache.setInitialized();
+  }
+
+  @Override
+  public boolean isCqResultsCacheInitialized() {
+    return serverCQResultsCache.isInitialized();
   }
 
   /**
@@ -412,17 +325,12 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    * @return size of CQ Result key cache.
    */
   public int getCqResultKeysSize() {
-    if (this.cqResultKeys == null) {
-      return 0;
-    }
-    synchronized (this.cqResultKeys) {
-      return this.cqResultKeys.size();
-    }
+    return serverCQResultsCache.size();
   }
 
   @Override
   public boolean isOldValueRequiredForQueryProcessing(Object key) {
-    return !this.cqResultKeysInitialized || !this.isPartOfCqResult(key);
+    return serverCQResultsCache.isOldValueRequiredForQueryProcessing(key);
   }
 
   /**
@@ -454,7 +362,6 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
 
       int stateBeforeClosing = this.cqState.getState();
       this.cqState.setState(CqStateImpl.CLOSING);
-      boolean isClosed = false;
 
       // Cleanup the resource used by cq.
       this.removeFromCqMap();
@@ -467,11 +374,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
       }
 
       // Clean-up the CQ Results Cache.
-      if (this.cqResultKeys != null) {
-        synchronized (this.cqResultKeys) {
-          this.cqResultKeys.clear();
-        }
-      }
+      serverCQResultsCache.clear();
 
       // Set the state to close, and update stats
       this.cqState.setState(CqStateImpl.CLOSED);
@@ -503,7 +406,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    * Clears the resource used by CQ.
    */
   @Override
-  protected void cleanup() throws CqException {
+  protected void cleanup() {
     // CqBaseRegion
     try {
       if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) {
@@ -526,8 +429,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
    * Stop or pause executing the query.
    */
   @Override
-  public void stop() throws CqClosedException, CqException {
-    boolean isStopped = false;
+  public void stop() throws CqClosedException {
     synchronized (this.cqState) {
       if (this.isClosed()) {
         throw new CqClosedException(
@@ -551,7 +453,7 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
   }
 
   @Override
-  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+  public void fromData(DataInput in) throws IOException {
     synchronized (cqState) {
       this.cqState.setState(DataSerializer.readInteger(in));
     }
@@ -584,14 +486,12 @@ public class ServerCQImpl extends CqQueryImpl implements 
DataSerializable, Serve
   }
 
   @Override
-  public <E> CqResults<E> executeWithInitialResults()
-      throws CqClosedException, RegionNotFoundException, CqException {
+  public <E> CqResults<E> executeWithInitialResults() throws CqClosedException 
{
     throw new IllegalStateException("Execute cannot be called on a CQ on the 
server");
   }
 
   @Override
-  public void execute() throws CqClosedException, RegionNotFoundException, 
CqException {
+  public void execute() throws CqClosedException {
     throw new IllegalStateException("Execute cannot be called on a CQ on the 
server");
   }
-
 }
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
new file mode 100644
index 0000000..87c9693
--- /dev/null
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Set;
+
+/**
+ * Holds the keys that are part of the CQ query results.
+ * Using this, the CQ engine can determine whether to execute query on an old 
value from EntryEvent
+ * or not,which is an expensive operation.
+ */
+interface ServerCQResultsCache {
+  Object TOKEN = new Object();
+
+  void setInitialized();
+
+  boolean isInitialized();
+
+  void add(Object key);
+
+  void remove(Object key, boolean isTokenMode);
+
+  void invalidate();
+
+  boolean contains(Object key);
+
+  void markAsDestroyed(Object key);
+
+  int size();
+
+  Set<Object> getKeys();
+
+  boolean isOldValueRequiredForQueryProcessing(Object key);
+
+  void clear();
+}
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
new file mode 100644
index 0000000..4f0f1cf
--- /dev/null
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * No op implementation, used when the property 
CqServiceProvider.MAINTAIN_KEYS is set as false.
+ */
+class ServerCQResultsCacheNoOpImpl implements ServerCQResultsCache {
+  private static final Set<Object> EMPTY_CACHE = Collections.emptySet();
+
+  @Override
+  public void setInitialized() {}
+
+  @Override
+  public boolean isInitialized() {
+    return false;
+  }
+
+  @Override
+  public void add(Object key) {}
+
+  @Override
+  public void remove(Object key, boolean isTokenMode) {}
+
+  @Override
+  public void invalidate() {}
+
+  @Override
+  public boolean contains(Object key) {
+    return false;
+  }
+
+  @Override
+  public void markAsDestroyed(Object key) {}
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public Set<Object> getKeys() {
+    return EMPTY_CACHE;
+  }
+
+  @Override
+  public boolean isOldValueRequiredForQueryProcessing(Object key) {
+    return true;
+  }
+
+  @Override
+  public void clear() {}
+}
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
new file mode 100644
index 0000000..882a923
--- /dev/null
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Internal CQ cache implementation for CQs operating on partitioned regions.
+ */
+class ServerCQResultsCachePartitionRegionImpl implements ServerCQResultsCache {
+  private static final Logger logger = LogService.getLogger();
+
+  /**
+   * To indicate if the CQ results key cache is initialized.
+   */
+  public volatile boolean cqResultKeysInitialized = false;
+
+  /**
+   * This holds the keys that are part of the CQ query results. Using this CQ 
engine can determine
+   * whether to execute query on old value from EntryEvent, which is an 
expensive operation.
+   *
+   * NOTE: In case of RR this map is populated and used as intended. In case 
of PR this map will not
+   * be populated. If executeCQ happens after update operations this map will 
remain empty.
+   */
+  private final ConcurrentMap<Object, Object> cqResultKeys;
+
+  public ServerCQResultsCachePartitionRegionImpl() {
+    cqResultKeys = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void setInitialized() {
+    cqResultKeysInitialized = true;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return cqResultKeysInitialized;
+  }
+
+  @Override
+  public void add(Object key) {
+    cqResultKeys.put(key, TOKEN);
+  }
+
+  @Override
+  public void remove(Object key, boolean isTokenMode) {
+    if (isTokenMode && cqResultKeys.get(key) != Token.DESTROYED) {
+      return;
+    }
+
+    cqResultKeys.remove(key);
+  }
+
+  @Override
+  public void invalidate() {
+    cqResultKeys.clear();
+    cqResultKeysInitialized = false;
+  }
+
+  /**
+   * Returns if the passed key is part of the CQs result set. This method 
needs to be called once
+   * the CQ result key caching is completed (cqResultsCacheInitialized is 
true).
+   *
+   * @return true if key is in the Results Cache.
+   */
+  @Override
+  public boolean contains(Object key) {
+    // Handle events that may have been deleted,
+    // but added by result caching.
+    if (!isInitialized()) {
+      logger.warn(
+          "The CQ Result key cache is not initialized. This should not happen 
as the call to isPartOfCqResult() is based on the condition 
cqResultsCacheInitialized.");
+      return false;
+    }
+
+    return cqResultKeys.containsKey(key);
+  }
+
+  /**
+   * Marks the key as destroyed in the CQ Results key cache.
+   */
+  @Override
+  public void markAsDestroyed(Object key) {
+    cqResultKeys.put(key, Token.DESTROYED);
+  }
+
+  @Override
+  public int size() {
+    return cqResultKeys.size();
+  }
+
+  /**
+   * For Test use only.
+   *
+   * @return CQ Results Cache.
+   */
+  @Override
+  public Set<Object> getKeys() {
+    return Collections.synchronizedSet(new HashSet<>(cqResultKeys.keySet()));
+  }
+
+  @Override
+  public boolean isOldValueRequiredForQueryProcessing(Object key) {
+    return !isInitialized() || !contains(key);
+  }
+
+  @Override
+  public void clear() {
+    cqResultKeys.clear();
+  }
+}
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
new file mode 100644
index 0000000..3d987b8
--- /dev/null
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Internal CQ cache implementation for CQs operating on replicate regions.
+ */
+class ServerCQResultsCacheReplicateRegionImpl implements ServerCQResultsCache {
+  private static final Logger logger = LogService.getLogger();
+
+  /**
+   * To indicate if the CQ results key cache is initialized.
+   */
+  public volatile boolean cqResultKeysInitialized = false;
+
+  /**
+   * This holds the keys that are part of the CQ query results. Using this CQ 
engine can determine
+   * whether to execute query on old value from EntryEvent, which is an 
expensive operation.
+   *
+   * NOTE: In case of RR this map is populated and used as intended. In case 
of PR this map will not
+   * be populated. If executeCQ happens after update operations this map will 
remain empty.
+   */
+  private final Map<Object, Object> cqResultKeys;
+
+  /**
+   * This maintains the keys that are destroyed while the Results Cache is 
getting constructed. This
+   * avoids any keys that are destroyed (after query execution) but is still 
part of the CQs result.
+   */
+  private final Set<Object> destroysWhileCqResultsInProgress;
+
+  // Synchronize operations on cqResultKeys & destroysWhileCqResultsInProgress
+  private final Object LOCK = new Object();
+
+  public ServerCQResultsCacheReplicateRegionImpl() {
+    cqResultKeys = new HashMap<>();
+    destroysWhileCqResultsInProgress = new HashSet<>();
+  }
+
+  @Override
+  public void setInitialized() {
+    cqResultKeysInitialized = true;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return cqResultKeysInitialized;
+  }
+
+  @Override
+  public void add(Object key) {
+    synchronized (LOCK) {
+      cqResultKeys.put(key, TOKEN);
+
+      if (!isInitialized()) {
+        // This key could be coming after add, destroy.
+        // Remove this from destroy queue.
+        destroysWhileCqResultsInProgress.remove(key);
+      }
+    }
+  }
+
+  @Override
+  public void remove(Object key, boolean isTokenMode) {
+    synchronized (LOCK) {
+      if (isTokenMode && cqResultKeys.get(key) != Token.DESTROYED) {
+        return;
+      }
+
+      cqResultKeys.remove(key);
+      if (!isInitialized()) {
+        destroysWhileCqResultsInProgress.add(key);
+      }
+    }
+  }
+
+  @Override
+  public void invalidate() {
+    synchronized (LOCK) {
+      cqResultKeys.clear();
+      cqResultKeysInitialized = false;
+    }
+  }
+
+  /**
+   * Returns if the passed key is part of the CQs result set. This method 
needs to be called once
+   * the CQ result key caching is completed (cqResultsCacheInitialized is 
true).
+   *
+   * @return true if key is in the Results Cache.
+   */
+  @Override
+  public boolean contains(Object key) {
+    // Handle events that may have been deleted,
+    // but added by result caching.
+    if (!isInitialized()) {
+      logger.warn(
+          "The CQ Result key cache is not initialized. This should not happen 
as the call to isPartOfCqResult() is based on the condition 
cqResultsCacheInitialized.");
+      return false;
+    }
+
+    synchronized (LOCK) {
+      destroysWhileCqResultsInProgress.forEach(cqResultKeys::remove);
+      destroysWhileCqResultsInProgress.clear();
+    }
+
+    return cqResultKeys.containsKey(key);
+  }
+
+  /**
+   * Marks the key as destroyed in the CQ Results key cache.
+   */
+  @Override
+  public void markAsDestroyed(Object key) {
+    synchronized (LOCK) {
+      cqResultKeys.put(key, Token.DESTROYED);
+
+      if (!isInitialized()) {
+        destroysWhileCqResultsInProgress.add(key);
+      }
+    }
+  }
+
+  @Override
+  public int size() {
+    synchronized (LOCK) {
+      return cqResultKeys.size();
+    }
+  }
+
+  /**
+   * For Test use only.
+   *
+   * @return CQ Results Cache.
+   */
+  @Override
+  public Set<Object> getKeys() {
+    synchronized (LOCK) {
+      return Collections.synchronizedSet(new HashSet<>(cqResultKeys.keySet()));
+    }
+  }
+
+  @Override
+  public boolean isOldValueRequiredForQueryProcessing(Object key) {
+    return !isInitialized() || !contains(key);
+  }
+
+  @Override
+  public void clear() {
+    // Clean-up the CQ Results Cache.
+    synchronized (LOCK) {
+      cqResultKeys.clear();
+    }
+  }
+}
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
index e3d0c60..7fa064f 100755
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
@@ -202,7 +202,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
     } else {
       // Don't execute query for cq.execute and
       // if it is a PR query with execute query and maintain keys flags not set
-      cqQuery.cqResultKeysInitialized = true;
+      cqQuery.setCqResultsCacheInitialized();
       successQuery = true;
     }
 
diff --git 
a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
 
b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
index 3fdf818..2e186db 100644
--- 
a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
+++ 
b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
@@ -65,12 +65,12 @@ public class ServerCQImplTest {
     // Initialize cache
     serverCq.addToCqResultKeys("key1");
     serverCq.setCqResultsCacheInitialized();
-    assertThat(serverCq.cqResultKeysInitialized).isTrue();
+    assertThat(serverCq.isCqResultsCacheInitialized()).isTrue();
     assertThat(serverCq.isPartOfCqResult("key1")).isTrue();
 
     // Invalidate and assert results
     serverCq.invalidateCqResultKeys();
-    assertThat(serverCq.cqResultKeysInitialized).isFalse();
+    assertThat(serverCq.isCqResultsCacheInitialized()).isFalse();
     assertThat(serverCq.isPartOfCqResult("key1")).isFalse();
   }
 }

Reply via email to