This is an automated email from the ASF dual-hosted git repository.
jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 9e3c473 GEODE-7863: Reduce ServerCQImpl Contention (#4798)
9e3c473 is described below
commit 9e3c473e9cf6d72126d0e84319b1192a5e2b6fdb
Author: Juan José Ramos <[email protected]>
AuthorDate: Mon Mar 16 10:33:52 2020 +0000
GEODE-7863: Reduce ServerCQImpl Contention (#4798)
We don't need to lock the entire internal cache for Partitioned
regions so the implementation is now split by region type, this will
us to improve/change them independently in the future.
- Removed redundant checks.
- Keep current behavior for Replicate Regions.
- Use ConcurrentMap instead of locking the entire internal cache on
every operation for Partition Regions.
- Keep the lock on ServerCQImpl instance only while executing the
query and leave stats operations outside of the synchronized block.
---
.../cache/query/internal/cq/CqServiceProvider.java | 2 +-
.../geode/cache/query/internal/cq/ServerCQ.java | 6 +
.../geode/cache/query/cq/internal/CqQueryImpl.java | 2 -
.../cache/query/cq/internal/CqServiceImpl.java | 38 ++---
.../cache/query/cq/internal/ServerCQImpl.java | 176 +++++----------------
.../query/cq/internal/ServerCQResultsCache.java | 48 ++++++
.../cq/internal/ServerCQResultsCacheNoOpImpl.java | 68 ++++++++
.../ServerCQResultsCachePartitionRegionImpl.java | 133 ++++++++++++++++
.../ServerCQResultsCacheReplicateRegionImpl.java | 176 +++++++++++++++++++++
.../query/cq/internal/command/ExecuteCQ61.java | 2 +-
.../cache/query/cq/internal/ServerCQImplTest.java | 4 +-
11 files changed, 493 insertions(+), 162 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index 28ecabb..e61c27b 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -36,7 +36,7 @@ public class CqServiceProvider {
*/
@MutableForTesting
public static boolean MAINTAIN_KEYS = Boolean
- .valueOf(System.getProperty(GeodeGlossary.GEMFIRE_PREFIX +
"cq.MAINTAIN_KEYS", "true"));
+ .parseBoolean(System.getProperty(GeodeGlossary.GEMFIRE_PREFIX +
"cq.MAINTAIN_KEYS", "true"));
/**
* A debug flag used for testing vMotion during CQ registration
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
index c36cd4b..08bfba3 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/ServerCQ.java
@@ -66,6 +66,12 @@ public interface ServerCQ extends InternalCqQuery {
void setCqResultsCacheInitialized();
/**
+ *
+ * @return true if the CQ Results key cache is initialized.
+ */
+ boolean isCqResultsCacheInitialized();
+
+ /**
* Returns true if old value is required for query processing.
*/
boolean isOldValueRequiredForQueryProcessing(Object key);
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
index eed5568..01c3691 100644
---
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqQueryImpl.java
@@ -56,8 +56,6 @@ public abstract class CqQueryImpl implements InternalCqQuery {
protected String queryString;
- static final Object TOKEN = new Object();
-
LocalRegion cqBaseRegion;
protected Query query = null;
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
index cb54c99..e93ed98 100644
---
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/CqServiceImpl.java
@@ -1347,15 +1347,16 @@ public class CqServiceImpl implements CqService {
boolean error = false;
{
try {
- synchronized (cQuery) {
- // Apply query on new value.
- if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
- executionStartTime = this.stats.startCqQueryExecution();
+ // Apply query on new value.
+ if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
+ executionStartTime = this.stats.startCqQueryExecution();
+ synchronized (cQuery) {
b_cqResults_newValue =
evaluateQuery(cQuery, new Object[]
{cqUnfilteredEventsSet_newValue});
- this.stats.endCqQueryExecution(executionStartTime);
}
+
+ this.stats.endCqQueryExecution(executionStartTime);
}
// In case of Update, destroy and invalidate.
@@ -1365,7 +1366,7 @@ public class CqServiceImpl implements CqService {
// value. Currently the CQ Results are not cached for the
// Partitioned Regions. Once this is added remove the check
// with PR region.
- if (cQuery.cqResultKeysInitialized) {
+ if (cQuery.isCqResultsCacheInitialized()) {
b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
// For PR if not found in cache, apply the query on old
value.
// Also apply if the query was not executed during cq execute
@@ -1390,23 +1391,24 @@ public class CqServiceImpl implements CqService {
}
}
- synchronized (cQuery) {
- // Apply query on old value.
- if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
- executionStartTime = this.stats.startCqQueryExecution();
+ // Apply query on old value.
+ if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
+ executionStartTime = this.stats.startCqQueryExecution();
+
+ synchronized (cQuery) {
b_cqResults_oldValue =
evaluateQuery(cQuery, new Object[]
{cqUnfilteredEventsSet_oldValue});
- this.stats.endCqQueryExecution(executionStartTime);
- } else {
- if (isDebugEnabled) {
- logger.debug(
- "old value for event with key {} is null - query
execution not performed",
- eventKey);
- }
+ }
+
+ this.stats.endCqQueryExecution(executionStartTime);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug(
+ "old value for event with key {} is null - query
execution not performed",
+ eventKey);
}
}
} // Query oldValue
-
}
} catch (Exception ex) {
// Any exception in running the query should be caught here and
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
index 5e614a2..e48ec2b 100644
---
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQImpl.java
@@ -17,15 +17,13 @@ package org.apache.geode.cache.query.cq.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.query.CqAttributes;
@@ -35,7 +33,6 @@ import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqResults;
import org.apache.geode.cache.query.Query;
-import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.internal.CompiledBindArgument;
import org.apache.geode.cache.query.internal.CompiledIteratorDef;
@@ -47,7 +44,6 @@ import
org.apache.geode.cache.query.internal.cq.CqServiceProvider;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -55,26 +51,14 @@ import
org.apache.geode.logging.internal.log4j.api.LogService;
public class ServerCQImpl extends CqQueryImpl implements DataSerializable,
ServerCQ {
private static final Logger logger = LogService.getLogger();
+ private static final ServerCQResultsCache NO_OP_CACHE = new
ServerCQResultsCacheNoOpImpl();
/**
- * This holds the keys that are part of the CQ query results. Using this CQ
engine can determine
- * whether to execute query on old value from EntryEvent, which is an
expensive operation.
- *
- * NOTE: In case of RR this map is populated and used as intended. In case
of PR this map will not
- * be populated. If executeCQ happens after update operations this map will
remain empty.
- */
- private volatile HashMap<Object, Object> cqResultKeys;
-
- /**
- * This maintains the keys that are destroyed while the Results Cache is
getting constructed. This
- * avoids any keys that are destroyed (after query execution) but is still
part of the CQs result.
- */
- private HashSet<Object> destroysWhileCqResultsInProgress;
-
- /**
- * To indicate if the CQ results key cache is initialized.
+ * NOTE: In case of Replicated Regions this cache is populated and used as
intended. In case of
+ * Partition Regions this cache will not be populated. If executeCQ happens
after update
+ * operations this cache will remain empty.
*/
- public volatile boolean cqResultKeysInitialized = false;
+ private volatile ServerCQResultsCache serverCQResultsCache = NO_OP_CACHE;
/** Boolean flag to see if the CQ is on Partitioned Region */
volatile boolean isPR = false;
@@ -155,6 +139,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
if (isDebugEnabled) {
logger.debug(s, t);
}
+
throw new CqException(s);
}
}
@@ -165,8 +150,8 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
this.cqBaseRegion = (LocalRegion)
cqService.getCache().getRegion(regionName);
if (this.cqBaseRegion == null) {
throw new RegionNotFoundException(
- String.format("Region : %s specified with cq not found. CqName: %s",
- new Object[] {regionName, this.cqName}));
+ String.format("Region : %s specified with cq not found. CqName: %s",
regionName,
+ this.cqName));
}
// Make sure that the region is partitioned or
@@ -232,7 +217,6 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
// Initialize CQ results (key) cache.
if (CqServiceProvider.MAINTAIN_KEYS) {
- this.cqResultKeys = new HashMap<>();
// Currently the CQ Result keys are not cached for the Partitioned
// Regions. Supporting this with PR needs more work like forcing
// query execution on primary buckets only; and handling the bucket
@@ -240,9 +224,10 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
// Only the events which are seen during event processing is
// added to the results cache (not from the CQ Results).
if (this.isPR) {
- this.setCqResultsCacheInitialized();
+ serverCQResultsCache = new ServerCQResultsCachePartitionRegionImpl();
+ setCqResultsCacheInitialized();
} else {
- this.destroysWhileCqResultsInProgress = new HashSet<>();
+ serverCQResultsCache = new ServerCQResultsCacheReplicateRegionImpl();
}
}
@@ -251,9 +236,10 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
cqService.addToCqMap(this);
} catch (CqExistsException cqe) {
// Should not happen.
- throw new CqException(String.format("Unable to create cq %s Error :
%s",
- new Object[] {cqName, cqe.getMessage()}));
+ throw new CqException(
+ String.format("Unable to create cq %s Error : %s", cqName,
cqe.getMessage()));
}
+
this.cqBaseRegion.getFilterProfile().registerCq(this);
}
}
@@ -263,14 +249,9 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
*
* @return CQ Results Cache.
*/
+ @VisibleForTesting
public Set<Object> getCqResultKeyCache() {
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- return Collections.synchronizedSet(new
HashSet<>(this.cqResultKeys.keySet()));
- }
- } else {
- return null;
- }
+ return serverCQResultsCache.getKeys();
}
/**
@@ -280,7 +261,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
*
* @return String modified query.
*/
- Query constructServerSideQuery() throws QueryException {
+ Query constructServerSideQuery() {
InternalCache cache = cqService.getInternalCache();
DefaultQuery locQuery = (DefaultQuery)
cache.getLocalQueryService().newQuery(this.queryString);
CompiledSelect select = locQuery.getSimpleSelect();
@@ -292,6 +273,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
// CompiledRegion
this.regionName = ((CompiledRegion)
from.getCollectionExpr()).getRegionPath();
from.setCollectionExpr(new CompiledBindArgument(1));
+
return locQuery;
}
@@ -302,108 +284,39 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
* @return true if key is in the Results Cache.
*/
public boolean isPartOfCqResult(Object key) {
- // Handle events that may have been deleted,
- // but added by result caching.
- if (this.cqResultKeys == null) {
- logger.warn(
- "The CQ Result key cache is Null. This should not happen as the call
to isPartOfCqResult() is based on the condition cqResultsCacheInitialized.");
- return false;
- }
-
- synchronized (this.cqResultKeys) {
- if (this.destroysWhileCqResultsInProgress != null) {
- // this.logger.fine("Removing keys from Destroy Cache For CQ :" +
- // this.cqName + " Keys :" + this.destroysWhileCqResultsInProgress);
- for (Object k : this.destroysWhileCqResultsInProgress) {
- this.cqResultKeys.remove(k);
- }
- this.destroysWhileCqResultsInProgress = null;
- }
- return this.cqResultKeys.containsKey(key);
- }
+ return serverCQResultsCache.contains(key);
}
@Override
public void addToCqResultKeys(Object key) {
- if (!CqServiceProvider.MAINTAIN_KEYS) {
- return;
- }
-
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- this.cqResultKeys.put(key, TOKEN);
- if (!this.cqResultKeysInitialized) {
- // This key could be coming after add, destroy.
- // Remove this from destroy queue.
- if (this.destroysWhileCqResultsInProgress != null) {
- this.destroysWhileCqResultsInProgress.remove(key);
- }
- }
- }
- }
+ serverCQResultsCache.add(key);
}
@Override
public void removeFromCqResultKeys(Object key, boolean isTokenMode) {
- if (!CqServiceProvider.MAINTAIN_KEYS) {
- return;
- }
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED) {
- return;
- }
- this.cqResultKeys.remove(key);
- if (!this.cqResultKeysInitialized) {
- if (this.destroysWhileCqResultsInProgress != null) {
- this.destroysWhileCqResultsInProgress.add(key);
- }
- }
- }
- }
+ serverCQResultsCache.remove(key, isTokenMode);
}
@Override
public void invalidateCqResultKeys() {
- if (!CqServiceProvider.MAINTAIN_KEYS) {
- return;
- }
-
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- this.cqResultKeys.clear();
- this.cqResultKeysInitialized = false;
- }
- }
+ serverCQResultsCache.invalidate();
}
/**
* Marks the key as destroyed in the CQ Results key cache.
*/
void markAsDestroyedInCqResultKeys(Object key) {
- if (!CqServiceProvider.MAINTAIN_KEYS) {
- return;
- }
-
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- this.cqResultKeys.put(key, Token.DESTROYED);
- if (!this.cqResultKeysInitialized) {
- // this.logger.fine("Adding key to Destroy Cache For CQ :" +
- // this.cqName + " key :" + key);
- if (this.destroysWhileCqResultsInProgress != null) {
- this.destroysWhileCqResultsInProgress.add(key);
- }
- }
- }
- }
+ serverCQResultsCache.markAsDestroyed(key);
}
@Override
public void setCqResultsCacheInitialized() {
- if (CqServiceProvider.MAINTAIN_KEYS) {
- this.cqResultKeysInitialized = true;
- }
+ serverCQResultsCache.setInitialized();
+ }
+
+ @Override
+ public boolean isCqResultsCacheInitialized() {
+ return serverCQResultsCache.isInitialized();
}
/**
@@ -412,17 +325,12 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
* @return size of CQ Result key cache.
*/
public int getCqResultKeysSize() {
- if (this.cqResultKeys == null) {
- return 0;
- }
- synchronized (this.cqResultKeys) {
- return this.cqResultKeys.size();
- }
+ return serverCQResultsCache.size();
}
@Override
public boolean isOldValueRequiredForQueryProcessing(Object key) {
- return !this.cqResultKeysInitialized || !this.isPartOfCqResult(key);
+ return serverCQResultsCache.isOldValueRequiredForQueryProcessing(key);
}
/**
@@ -454,7 +362,6 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
int stateBeforeClosing = this.cqState.getState();
this.cqState.setState(CqStateImpl.CLOSING);
- boolean isClosed = false;
// Cleanup the resource used by cq.
this.removeFromCqMap();
@@ -467,11 +374,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
}
// Clean-up the CQ Results Cache.
- if (this.cqResultKeys != null) {
- synchronized (this.cqResultKeys) {
- this.cqResultKeys.clear();
- }
- }
+ serverCQResultsCache.clear();
// Set the state to close, and update stats
this.cqState.setState(CqStateImpl.CLOSED);
@@ -503,7 +406,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
* Clears the resource used by CQ.
*/
@Override
- protected void cleanup() throws CqException {
+ protected void cleanup() {
// CqBaseRegion
try {
if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) {
@@ -526,8 +429,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
* Stop or pause executing the query.
*/
@Override
- public void stop() throws CqClosedException, CqException {
- boolean isStopped = false;
+ public void stop() throws CqClosedException {
synchronized (this.cqState) {
if (this.isClosed()) {
throw new CqClosedException(
@@ -551,7 +453,7 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
}
@Override
- public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
+ public void fromData(DataInput in) throws IOException {
synchronized (cqState) {
this.cqState.setState(DataSerializer.readInteger(in));
}
@@ -584,14 +486,12 @@ public class ServerCQImpl extends CqQueryImpl implements
DataSerializable, Serve
}
@Override
- public <E> CqResults<E> executeWithInitialResults()
- throws CqClosedException, RegionNotFoundException, CqException {
+ public <E> CqResults<E> executeWithInitialResults() throws CqClosedException
{
throw new IllegalStateException("Execute cannot be called on a CQ on the
server");
}
@Override
- public void execute() throws CqClosedException, RegionNotFoundException,
CqException {
+ public void execute() throws CqClosedException {
throw new IllegalStateException("Execute cannot be called on a CQ on the
server");
}
-
}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
new file mode 100644
index 0000000..87c9693
--- /dev/null
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Set;
+
+/**
+ * Holds the keys that are part of the CQ query results.
+ * Using this, the CQ engine can determine whether to execute query on an old
value from EntryEvent
+ * or not,which is an expensive operation.
+ */
+interface ServerCQResultsCache {
+ Object TOKEN = new Object();
+
+ void setInitialized();
+
+ boolean isInitialized();
+
+ void add(Object key);
+
+ void remove(Object key, boolean isTokenMode);
+
+ void invalidate();
+
+ boolean contains(Object key);
+
+ void markAsDestroyed(Object key);
+
+ int size();
+
+ Set<Object> getKeys();
+
+ boolean isOldValueRequiredForQueryProcessing(Object key);
+
+ void clear();
+}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
new file mode 100644
index 0000000..4f0f1cf
--- /dev/null
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * No op implementation, used when the property
CqServiceProvider.MAINTAIN_KEYS is set as false.
+ */
+class ServerCQResultsCacheNoOpImpl implements ServerCQResultsCache {
+ private static final Set<Object> EMPTY_CACHE = Collections.emptySet();
+
+ @Override
+ public void setInitialized() {}
+
+ @Override
+ public boolean isInitialized() {
+ return false;
+ }
+
+ @Override
+ public void add(Object key) {}
+
+ @Override
+ public void remove(Object key, boolean isTokenMode) {}
+
+ @Override
+ public void invalidate() {}
+
+ @Override
+ public boolean contains(Object key) {
+ return false;
+ }
+
+ @Override
+ public void markAsDestroyed(Object key) {}
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public Set<Object> getKeys() {
+ return EMPTY_CACHE;
+ }
+
+ @Override
+ public boolean isOldValueRequiredForQueryProcessing(Object key) {
+ return true;
+ }
+
+ @Override
+ public void clear() {}
+}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
new file mode 100644
index 0000000..882a923
--- /dev/null
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Internal CQ cache implementation for CQs operating on partitioned regions.
+ */
+class ServerCQResultsCachePartitionRegionImpl implements ServerCQResultsCache {
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * To indicate if the CQ results key cache is initialized.
+ */
+ public volatile boolean cqResultKeysInitialized = false;
+
+ /**
+ * This holds the keys that are part of the CQ query results. Using this CQ
engine can determine
+ * whether to execute query on old value from EntryEvent, which is an
expensive operation.
+ *
+ * NOTE: In case of RR this map is populated and used as intended. In case
of PR this map will not
+ * be populated. If executeCQ happens after update operations this map will
remain empty.
+ */
+ private final ConcurrentMap<Object, Object> cqResultKeys;
+
+ public ServerCQResultsCachePartitionRegionImpl() {
+ cqResultKeys = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void setInitialized() {
+ cqResultKeysInitialized = true;
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return cqResultKeysInitialized;
+ }
+
+ @Override
+ public void add(Object key) {
+ cqResultKeys.put(key, TOKEN);
+ }
+
+ @Override
+ public void remove(Object key, boolean isTokenMode) {
+ if (isTokenMode && cqResultKeys.get(key) != Token.DESTROYED) {
+ return;
+ }
+
+ cqResultKeys.remove(key);
+ }
+
+ @Override
+ public void invalidate() {
+ cqResultKeys.clear();
+ cqResultKeysInitialized = false;
+ }
+
+ /**
+ * Returns if the passed key is part of the CQs result set. This method
needs to be called once
+ * the CQ result key caching is completed (cqResultsCacheInitialized is
true).
+ *
+ * @return true if key is in the Results Cache.
+ */
+ @Override
+ public boolean contains(Object key) {
+ // Handle events that may have been deleted,
+ // but added by result caching.
+ if (!isInitialized()) {
+ logger.warn(
+ "The CQ Result key cache is not initialized. This should not happen
as the call to isPartOfCqResult() is based on the condition
cqResultsCacheInitialized.");
+ return false;
+ }
+
+ return cqResultKeys.containsKey(key);
+ }
+
+ /**
+ * Marks the key as destroyed in the CQ Results key cache.
+ */
+ @Override
+ public void markAsDestroyed(Object key) {
+ cqResultKeys.put(key, Token.DESTROYED);
+ }
+
+ @Override
+ public int size() {
+ return cqResultKeys.size();
+ }
+
+ /**
+ * For Test use only.
+ *
+ * @return CQ Results Cache.
+ */
+ @Override
+ public Set<Object> getKeys() {
+ return Collections.synchronizedSet(new HashSet<>(cqResultKeys.keySet()));
+ }
+
+ @Override
+ public boolean isOldValueRequiredForQueryProcessing(Object key) {
+ return !isInitialized() || !contains(key);
+ }
+
+ @Override
+ public void clear() {
+ cqResultKeys.clear();
+ }
+}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
new file mode 100644
index 0000000..3d987b8
--- /dev/null
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.internal;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Internal CQ cache implementation for CQs operating on replicate regions.
+ */
+class ServerCQResultsCacheReplicateRegionImpl implements ServerCQResultsCache {
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * To indicate if the CQ results key cache is initialized.
+ */
+ public volatile boolean cqResultKeysInitialized = false;
+
+ /**
+ * This holds the keys that are part of the CQ query results. Using this CQ
engine can determine
+ * whether to execute query on old value from EntryEvent, which is an
expensive operation.
+ *
+ * NOTE: In case of RR this map is populated and used as intended. In case
of PR this map will not
+ * be populated. If executeCQ happens after update operations this map will
remain empty.
+ */
+ private final Map<Object, Object> cqResultKeys;
+
+ /**
+ * This maintains the keys that are destroyed while the Results Cache is
getting constructed. This
+ * avoids any keys that are destroyed (after query execution) but is still
part of the CQs result.
+ */
+ private final Set<Object> destroysWhileCqResultsInProgress;
+
+ // Synchronize operations on cqResultKeys & destroysWhileCqResultsInProgress
+ private final Object LOCK = new Object();
+
+ public ServerCQResultsCacheReplicateRegionImpl() {
+ cqResultKeys = new HashMap<>();
+ destroysWhileCqResultsInProgress = new HashSet<>();
+ }
+
+ @Override
+ public void setInitialized() {
+ cqResultKeysInitialized = true;
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return cqResultKeysInitialized;
+ }
+
+ @Override
+ public void add(Object key) {
+ synchronized (LOCK) {
+ cqResultKeys.put(key, TOKEN);
+
+ if (!isInitialized()) {
+ // This key could be coming after add, destroy.
+ // Remove this from destroy queue.
+ destroysWhileCqResultsInProgress.remove(key);
+ }
+ }
+ }
+
+ @Override
+ public void remove(Object key, boolean isTokenMode) {
+ synchronized (LOCK) {
+ if (isTokenMode && cqResultKeys.get(key) != Token.DESTROYED) {
+ return;
+ }
+
+ cqResultKeys.remove(key);
+ if (!isInitialized()) {
+ destroysWhileCqResultsInProgress.add(key);
+ }
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ synchronized (LOCK) {
+ cqResultKeys.clear();
+ cqResultKeysInitialized = false;
+ }
+ }
+
+ /**
+ * Returns if the passed key is part of the CQs result set. This method
needs to be called once
+ * the CQ result key caching is completed (cqResultsCacheInitialized is
true).
+ *
+ * @return true if key is in the Results Cache.
+ */
+ @Override
+ public boolean contains(Object key) {
+ // Handle events that may have been deleted,
+ // but added by result caching.
+ if (!isInitialized()) {
+ logger.warn(
+ "The CQ Result key cache is not initialized. This should not happen
as the call to isPartOfCqResult() is based on the condition
cqResultsCacheInitialized.");
+ return false;
+ }
+
+ synchronized (LOCK) {
+ destroysWhileCqResultsInProgress.forEach(cqResultKeys::remove);
+ destroysWhileCqResultsInProgress.clear();
+ }
+
+ return cqResultKeys.containsKey(key);
+ }
+
+ /**
+ * Marks the key as destroyed in the CQ Results key cache.
+ */
+ @Override
+ public void markAsDestroyed(Object key) {
+ synchronized (LOCK) {
+ cqResultKeys.put(key, Token.DESTROYED);
+
+ if (!isInitialized()) {
+ destroysWhileCqResultsInProgress.add(key);
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ synchronized (LOCK) {
+ return cqResultKeys.size();
+ }
+ }
+
+ /**
+ * For Test use only.
+ *
+ * @return CQ Results Cache.
+ */
+ @Override
+ public Set<Object> getKeys() {
+ synchronized (LOCK) {
+ return Collections.synchronizedSet(new HashSet<>(cqResultKeys.keySet()));
+ }
+ }
+
+ @Override
+ public boolean isOldValueRequiredForQueryProcessing(Object key) {
+ return !isInitialized() || !contains(key);
+ }
+
+ @Override
+ public void clear() {
+ // Clean-up the CQ Results Cache.
+ synchronized (LOCK) {
+ cqResultKeys.clear();
+ }
+ }
+}
diff --git
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
index e3d0c60..7fa064f 100755
---
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
+++
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
@@ -202,7 +202,7 @@ public class ExecuteCQ61 extends BaseCQCommand {
} else {
// Don't execute query for cq.execute and
// if it is a PR query with execute query and maintain keys flags not set
- cqQuery.cqResultKeysInitialized = true;
+ cqQuery.setCqResultsCacheInitialized();
successQuery = true;
}
diff --git
a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
index 3fdf818..2e186db 100644
---
a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
+++
b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/internal/ServerCQImplTest.java
@@ -65,12 +65,12 @@ public class ServerCQImplTest {
// Initialize cache
serverCq.addToCqResultKeys("key1");
serverCq.setCqResultsCacheInitialized();
- assertThat(serverCq.cqResultKeysInitialized).isTrue();
+ assertThat(serverCq.isCqResultsCacheInitialized()).isTrue();
assertThat(serverCq.isPartOfCqResult("key1")).isTrue();
// Invalidate and assert results
serverCq.invalidateCqResultKeys();
- assertThat(serverCq.cqResultKeysInitialized).isFalse();
+ assertThat(serverCq.isCqResultsCacheInitialized()).isFalse();
assertThat(serverCq.isPartOfCqResult("key1")).isFalse();
}
}