This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new aab0198 GEODE-5993: Eliminate race in monitorQueryThread() (#2818)
aab0198 is described below
commit aab0198e8478d4246042b2eb889c8ce7e28bb52e
Author: Bill Burcham <[email protected]>
AuthorDate: Fri Nov 9 15:12:03 2018 -0800
GEODE-5993: Eliminate race in monitorQueryThread() (#2818)
A race existed between setting low memory in the heap monitor thread,
and checking the low memory state in query monitoring thread.
The cancelation executor was shut down and no longer accepting new
tasks when this race occurred, causing a RejectedExecutionException.
This commit solves that problem by encapsulating the scheduling
behavior using the state design pattern.
Co-authored-by: Ryan McMahon <[email protected]>
Co-authored-by: Bill Burcham <[email protected]>
---
.../internal/QueryMonitorIntegrationTest.java | 10 +-
.../apache/geode/codeAnalysis/excludedClasses.txt | 5 +-
.../MonitorQueryUnderContentionBenchmark.java | 2 +-
.../cache/query/internal/DefaultQueryService.java | 2 +-
.../geode/cache/query/internal/QueryMonitor.java | 349 +++++++++++++--------
.../geode/internal/cache/GemFireCacheImpl.java | 2 +-
.../internal/cache/partitioned/QueryMessage.java | 6 +-
.../cache/query/internal/QueryMonitorTest.java | 8 +-
8 files changed, 234 insertions(+), 150 deletions(-)
diff --git
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
index ff7035a..41add0e 100644
---
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
+++
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
@@ -19,6 +19,8 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -27,6 +29,7 @@ import org.junit.Test;
import org.mockito.stubbing.Answer;
import org.apache.geode.cache.CacheRuntimeException;
+import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -66,7 +69,7 @@ public class QueryMonitorIntegrationTest {
try {
queryMonitor = new QueryMonitor(
- () -> scheduledThreadPoolExecutor,
+ scheduledThreadPoolExecutor,
cache,
NEVER_EXPIRE_MILLIS);
@@ -74,6 +77,9 @@ public class QueryMonitorIntegrationTest {
queryMonitor.setLowMemory(true, 1);
+ verify(query, times(1))
+
.setQueryCanceledException(any(QueryExecutionLowMemoryException.class));
+
assertThatThrownBy(QueryMonitor::throwExceptionIfQueryOnCurrentThreadIsCanceled,
"Expected setLowMemory(true,_) to cancel query immediately, but it
didn't.",
QueryExecutionCanceledException.class);
@@ -95,7 +101,7 @@ public class QueryMonitorIntegrationTest {
public void
monitorQueryThreadCancelsLongRunningQueriesAndSetsExceptionAndThrowsException()
{
QueryMonitor queryMonitor = new QueryMonitor(
- () -> new ScheduledThreadPoolExecutor(1),
+ new ScheduledThreadPoolExecutor(1),
cache,
EXPIRE_QUICK_MILLIS);
diff --git
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index f3c3476..f67da94 100644
---
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -85,6 +85,10 @@
org/apache/geode/distributed/internal/RuntimeDistributionConfigImpl
org/apache/geode/pdx/internal/PdxInstanceEnum
org/apache/geode/pdx/internal/PdxInstanceImpl
org/apache/geode/pdx/internal/WritablePdxInstanceImpl
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl$1
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl$2
+org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$1
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$2
@@ -92,6 +96,5 @@
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$3
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$4
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList
-org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage$ReplyType
org/apache/geode/internal/cache/AfterCompletion$Action
diff --git
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
index b6d12dd..f65b801 100644
---
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
+++
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
@@ -97,7 +97,7 @@ public class MonitorQueryUnderContentionBenchmark {
LogService.setBaseLogLevel(org.apache.logging.log4j.Level.OFF);
queryMonitor =
- new QueryMonitor(() -> (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1),
+ new QueryMonitor((ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1),
mock(InternalCache.class), QUERY_MAX_EXECUTION_TIME);
final int numberOfThreads =
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index b5d85bb..7b107ae 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -140,7 +140,7 @@ public class DefaultQueryService implements
InternalQueryService {
if (QueryMonitor.isLowMemory()) {
String reason = String.format(
"Query execution canceled due to memory threshold crossed in system,
memory used: %s bytes.",
- QueryMonitor.getMemoryUsedDuringLowMemory());
+ QueryMonitor.getMemoryUsedBytes());
throw new QueryExecutionLowMemoryException(reason);
}
if (queryString == null)
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index 7103006..94b90ab 100755
---
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.query.internal;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -32,32 +33,33 @@ import org.apache.geode.internal.logging.LogService;
/**
* {@link QueryMonitor} class, monitors the query execution time. In typical
usage, the maximum
- * query execution time might be set (upon construction) via the system
property
- * {@link GemFireCacheImpl#MAX_QUERY_EXECUTION_TIME}. The number of threads
allocated to query
- * monitoring is determined by the instance of {@link
ScheduledThreadPoolExecutorFactory} passed
- * to the constructor.
+ * query execution time might be set (upon construction) via the system
property {@link
+ * GemFireCacheImpl#MAX_QUERY_EXECUTION_TIME}. The number of threads allocated
to query monitoring
+ * is determined by the instance of {@link ScheduledThreadPoolExecutor} passed
to the
+ * constructor.
*
- * This class supports a low-memory mode, established by {@link
#setLowMemory(boolean, long)}. \
- * In that mode, any attempt to monitor a (new) query will throw an exception.
+ * This class supports a low-memory mode, established by {@link
#setLowMemory(boolean, long)}
+ * with {@code isLowMemory=true}. In that mode, any attempt to monitor a (new)
query will
+ * throw an exception.
*
- * The {@link #monitorQueryThread(DefaultQuery)} method initiates monitoring
of a query.
- * {@link #stopMonitoringQueryThread(DefaultQuery)} stops monitoring a query.
+ * The {@link #monitorQueryThread(DefaultQuery)} method initiates monitoring
of a query. {@link
+ * #stopMonitoringQueryThread(DefaultQuery)} stops monitoring a query.
*
- * If the {@link QueryMonitor} determines a query needs to be canceled: either
because it is
- * taking too long, or because memory is running low, it does two things:
+ * If the {@link QueryMonitor} determines a query needs to be canceled: either
because it is taking
+ * too long, or because memory is running low, it does two things:
*
* <ul>
* <li>registers an exception on the query via
* {@link DefaultQuery#setQueryCanceledException(CacheRuntimeException)}</li>
* <li>sets the {@link DefaultQuery#queryCanceled} thread-local variable to
{@code true}
- * so that subsequent calls to {@link
#throwExceptionIfQueryOnCurrentThreadIsCanceled()}
- * will throw an exception</li>
+ * so that subsequent calls to {@link
#throwExceptionIfQueryOnCurrentThreadIsCanceled()} will throw
+ * an exception</li>
* </ul>
*
- * Code outside this class, that wishes to participate in cooperative
cancelation of queries
- * calls {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} at various
yield points.
- * In catch blocks, {@link DefaultQuery#getQueryCanceledException()} is
interrogated to learn
- * the cancelation cause.
+ * Code outside this class, that wishes to participate in cooperative
cancelation of queries calls
+ * {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} at various yield
points. In catch
+ * blocks, {@link DefaultQuery#getQueryCanceledException()} is interrogated to
learn the cancelation
+ * cause.
*
* @since GemFire 6.0
*/
@@ -68,72 +70,59 @@ public class QueryMonitor {
private final long defaultMaxQueryExecutionTime;
- private final ScheduledThreadPoolExecutorFactory executorFactory;
+ private final ScheduledThreadPoolExecutor executor;
- private volatile ScheduledThreadPoolExecutor executor;
+ private static volatile MemoryState memoryState =
MemoryStateImpl.HEAP_AVAILABLE;
- private volatile boolean cancelingDueToLowMemory;
-
- private static volatile Boolean LOW_MEMORY = Boolean.FALSE;
-
- private static volatile long LOW_MEMORY_USED_BYTES = 0;
-
- @FunctionalInterface
- public interface ScheduledThreadPoolExecutorFactory {
- ScheduledThreadPoolExecutor create();
- }
+ private static volatile long memoryUsedBytes = 0;
/**
- * This class will call {@link
ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}
- * on {@link ScheduledThreadPoolExecutor} instances returned by the
- * {@link ScheduledThreadPoolExecutorFactory} to set that property to {@code
true}.
+ * This class will call {@link
ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)} on
+ * {@code executor} to set that property to {@code true}.
*
- * The default behavior of a {@link ScheduledThreadPoolExecutor} is to keep
canceled
- * tasks in the queue, relying on the timeout processing loop to remove them
- * when their time is up. That behaviour would result in tasks for completed
- * queries to remain in the queue until their timeout deadline was reached,
- * resulting in queue growth.
+ * The default behavior of a {@link ScheduledThreadPoolExecutor} is to keep
canceled tasks in the
+ * queue, relying on the timeout processing loop to remove them when their
time is up. That
+ * behaviour would cause tasks for completed queries to remain in the queue
until their
+ * timeout deadline was reached, resulting in queue growth.
*
- * Setting the remove-on-cancel-policy to {@code true} changes that behavior
so tasks are
- * removed immediately upon cancelation (via {@link
#stopMonitoringQueryThread(DefaultQuery)}).
+ * Setting the remove-on-cancel-policy to {@code true} changes that behavior
so tasks are removed
+ * immediately upon cancelation (via {@link
#stopMonitoringQueryThread(DefaultQuery)}).
*
- * @param executorFactory is called to construct the initial executor. It's
called subsequently
- * every time the QueryMonitor moves out of the low-memory state, to
create a new executor.
+ * @param executor is responsible for processing scheduled cancelation tasks
* @param cache is interrogated via {@link
InternalCache#isQueryMonitorDisabledForLowMemory} at
* each low-memory state change
- * @param defaultMaxQueryExecutionTime is the maximum time, in milliseconds,
that any query
- * is allowed to run
+ * @param defaultMaxQueryExecutionTime is the maximum time, in milliseconds,
that any query is
+ * allowed to run
*/
- public QueryMonitor(final ScheduledThreadPoolExecutorFactory executorFactory,
+ public QueryMonitor(final ScheduledThreadPoolExecutor executor,
final InternalCache cache,
final long defaultMaxQueryExecutionTime) {
- Objects.requireNonNull(executorFactory);
+ Objects.requireNonNull(executor);
Objects.requireNonNull(cache);
this.cache = cache;
this.defaultMaxQueryExecutionTime = defaultMaxQueryExecutionTime;
- this.executorFactory = executorFactory;
- this.executor = executorFactory.create();
+ this.executor = executor;
this.executor.setRemoveOnCancelPolicy(true);
}
/**
- * Add query to be monitored.
+ * Start monitoring the query.
*
- * Must not be called from a thread that is not the query thread,
- * because this class uses a ThreadLocal on the query thread!
+ * Must not be called from a thread that is not the query thread, because
this class uses a
+ * ThreadLocal on the query thread!
*/
public void monitorQueryThread(final DefaultQuery query) {
monitorQueryThread(query, defaultMaxQueryExecutionTime);
}
/**
- * Each query can have a different maxQueryExecution time. Make this method
public to
- * expose that feature to callers.
+ * Each query can have a different maxQueryExecution time. Make this method
public to expose that
+ * feature to callers.
*
- * Must not be called from a thread that is not the query thread,
- * because this class uses a ThreadLocal on the query thread!
+ * Must not be called from a thread that is not the query thread, because
this class uses a
+ * ThreadLocal on the query thread!
*/
private void monitorQueryThread(final DefaultQuery query,
final long maxQueryExecutionTime) {
@@ -143,12 +132,6 @@ public class QueryMonitor {
return;
}
- if (LOW_MEMORY) {
- final QueryExecutionLowMemoryException lowMemoryException =
createLowMemoryException();
- query.setQueryCanceledException(lowMemoryException);
- throw lowMemoryException;
- }
-
query.setCancelationTask(scheduleCancelationTask(query,
maxQueryExecutionTime));
if (logger.isDebugEnabled()) {
@@ -157,10 +140,10 @@ public class QueryMonitor {
}
/**
- * Stops monitoring the query.
+ * Stop monitoring the query.
*
- * Must not be called from a thread that is not the query thread,
- * because this class uses a ThreadLocal on the query thread!
+ * Must not be called from a thread that is not the query thread, because
this class uses a
+ * ThreadLocal on the query thread!
*/
public void stopMonitoringQueryThread(final DefaultQuery query) {
query.getCancelationTask().ifPresent(task -> task.cancel(false));
@@ -171,9 +154,9 @@ public class QueryMonitor {
}
/**
- * Throw an exception if the query has been canceled. The {@link
QueryMonitor} cancels the
- * query if it takes more than the max query execution time or in low memory
situations where
- * critical heap percentage has been set on the resource manager.
+ * Throw an exception if the query has been canceled. The {@link
QueryMonitor} cancels the query
+ * if it takes more than the max query execution time or in low memory
situations where critical
+ * heap percentage has been set on the resource manager.
*
* @throws QueryExecutionCanceledException if the query has been canceled
*/
@@ -190,71 +173,174 @@ public class QueryMonitor {
executor.shutdownNow();
}
+ public static boolean isLowMemory() {
+ return memoryState.isLowMemory();
+ }
+
+ public static long getMemoryUsedBytes() {
+ return memoryUsedBytes;
+ }
+
/**
- * Assumes LOW_MEMORY will only be set if query monitor is enabled
+ * Caller must not call this method concurrently from multiple threads.
+ * In addition to causing data inconsistency, concurrent calls will result in
+ * lost updates e.g. transitions to low-memory status could be missed,
+ * resulting in a failure to cancel queries.
*/
- public static boolean isLowMemory() {
- return LOW_MEMORY;
+ public void setLowMemory(final boolean isLowMemory, final long usedBytes) {
+ memoryState.setLowMemory(executor, isLowMemory, usedBytes, cache);
}
- public static long getMemoryUsedDuringLowMemory() {
- return LOW_MEMORY_USED_BYTES;
+ /**
+ * This interface plays the role of the "State" interface in the GoF "State"
design pattern.
+ * Its implementations embodied in the {@link MemoryStateImpl} enum (an
abstract base class,
+ * or ABC) and its enum constants (subclasses of the ABC) play the role of
"ConcreteState"
+ * classes in that design pattern.
+ *
+ * The "Context" role is fulfilled by the melange of behavior
+ * and state embodied in the (static) {@link #isLowMemory()} and
+ * {@link #getMemoryUsedBytes()} methods and the {@link
#setLowMemory(boolean, long)}
+ * method and the static fields they manipulate.
+ */
+ private interface MemoryState {
+ void setLowMemory(ScheduledThreadPoolExecutor executor,
+ boolean isLowMemory,
+ long usedBytes,
+ InternalCache cache);
+
+ ScheduledFuture<?> schedule(Runnable command,
+ long delay,
+ TimeUnit unit,
+ ScheduledExecutorService scheduledExecutorService,
+ DefaultQuery query);
+
+ boolean isLowMemory();
+
+ CacheRuntimeException createCancelationException(long timeLimitMillis,
+ DefaultQuery query);
}
/**
- * Caller should not call this method concurrently from multiple threads.
Doing so can
- * result in lost low memory state updates due to lock unfairness.
+ * This enum (an abstract base class or ABC) and its enum constants
(subclasses of the ABC)
+ * play the role of "ConcreteState" classes in the GoF "State" pattern.
+ *
+ * See {@link MemoryState} for details.
*/
- public synchronized void setLowMemory(final boolean isLowMemory, final long
usedBytes) {
- if (!cache.isQueryMonitorDisabledForLowMemory()) {
- QueryMonitor.LOW_MEMORY_USED_BYTES = usedBytes;
- final boolean memoryStateChanged = isLowMemory !=
QueryMonitor.LOW_MEMORY;
- if (memoryStateChanged) {
+ private enum MemoryStateImpl implements MemoryState {
+ HEAP_AVAILABLE {
+ @Override
+ public void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+ final boolean isLowMemory,
+ final long usedBytes,
+ final InternalCache cache) {
if (isLowMemory) {
- cancelAllQueriesDueToMemory();
- } else {
+ memoryState = HEAP_EXHAUSTED;
+
/*
- * Executor was shut down and made permanently unusable when we went
into
- * the low-memory state. We have to make a new executor now that
we're monitoring
- * queries again.
+ * We need to already be in the HEAP_EXHAUSTED state because we want
the
+ * cancelation behavior associated with that state.
*/
- executor = executorFactory.create();
+ cancelAllQueries(executor);
}
+ // Otherwise, no state change
}
- QueryMonitor.LOW_MEMORY = isLowMemory;
- }
- }
- /**
- * Stop accepting new monitoring requests. Run all cancelation tasks with
- * {@link #cancelingDueToLowMemory} set. Leave the executor's task queue
empty.
- */
- private synchronized void cancelAllQueriesDueToMemory() {
+ @Override
+ public boolean isLowMemory() {
+ return false;
+ }
- /*
- * A cancelation task is dual-purpose. Its primary purpose is to cancel
- * a query if the query runs too long. Alternately, if this flag
- * {@link #cancelingDueToLowMemory} is set, the cancelation task will
cancel the query
- * due to low memory.
- */
- cancelingDueToLowMemory = true;
-
- try {
- /*
- * It's tempting to try to process the list of tasks returned from
shutdownNow().
- * Unfortunately, that call leaves the executor in a state that causes
the task's
- * run() to cancel the task, instead of actually running it. By calling
shutdown()
- * we block new task additions and put the executor in a state that
allows the
- * task's run() to actually run the task logic.
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long
delay,
+ final TimeUnit unit,
+ final ScheduledExecutorService scheduledExecutorService,
+ final DefaultQuery query) {
+ return scheduledExecutorService.schedule(command, delay, unit);
+ }
+
+ @Override
+ public CacheRuntimeException createCancelationException(final long
timeLimitMillis,
+ final DefaultQuery query) {
+ final String message = String.format(
+ "Query execution canceled after exceeding max execution time
%sms.",
+ timeLimitMillis);
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("%s %s", message, query));
+ }
+ return new QueryExecutionTimeoutException(message);
+ }
+
+ /**
+ * Run all cancelation tasks. Leave the executor's task queue empty.
*/
- executor.shutdown(); // executor won't accept new work ever again
- final BlockingQueue<Runnable> expirationTaskQueue = executor.getQueue();
- for (final Runnable cancelationTask : expirationTaskQueue) {
- cancelationTask.run();
+ private void cancelAllQueries(final ScheduledThreadPoolExecutor
executor) {
+ final BlockingQueue<Runnable> expirationTaskQueue =
executor.getQueue();
+ for (final Runnable cancelationTask : expirationTaskQueue) {
+ if (expirationTaskQueue.remove(cancelationTask)) {
+ cancelationTask.run();
+ }
+ }
+ }
+
+ },
+ HEAP_EXHAUSTED {
+ @Override
+ public void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+ final boolean isLowMemory,
+ final long usedBytes,
+ final InternalCache cache) {
+ if (!isLowMemory) {
+ memoryState = HEAP_AVAILABLE;
+ }
+ // Otherwise, no state change
+ }
+
+ @Override
+ public boolean isLowMemory() {
+ return true;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long
timeLimitMillis,
+ final TimeUnit unit,
+ final ScheduledExecutorService scheduledExecutorService,
+ final DefaultQuery query) {
+ final CacheRuntimeException lowMemoryException =
+ createCancelationException(timeLimitMillis, query);
+ query.setQueryCanceledException(lowMemoryException);
+ throw lowMemoryException;
}
- expirationTaskQueue.clear();
- } finally {
- cancelingDueToLowMemory = false;
+
+ @Override
+ public CacheRuntimeException createCancelationException(final long
timeLimitMillis,
+ final DefaultQuery query) {
+ return new QueryExecutionLowMemoryException(
+ String.format(
+ "Query execution canceled due to memory threshold crossed in
system, memory used: %s bytes.",
+ memoryUsedBytes));
+ }
+
+ };
+
+ @Override
+ public void setLowMemory(final ScheduledThreadPoolExecutor executor,
+ final boolean isLowMemory,
+ final long usedBytes,
+ final InternalCache cache) {
+ if (cache.isQueryMonitorDisabledForLowMemory()) {
+ return;
+ }
+
+ memoryUsedBytes = usedBytes;
+
+ _setLowMemory(executor, isLowMemory, usedBytes, cache);
+ }
+
+ void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+ final boolean isLowMemory,
+ final long usedBytes,
+ final InternalCache cache) {
+ throw new IllegalStateException("subclass must override");
}
}
@@ -266,31 +352,26 @@ public class QueryMonitor {
final AtomicBoolean queryCanceledThreadLocal =
DefaultQuery.queryCanceled.get();
- return executor.schedule(() -> {
- final CacheRuntimeException exception = cancelingDueToLowMemory ?
createLowMemoryException()
- : createExpirationException(timeLimitMillis);
+ /*
+ * This is where the GoF "State" design pattern comes home to roost.
+ *
+ * memoryState.schedule() is going to either schedule or throw an
exception depending on what
+ * state we are _currently_ in. Remember the switching of that state
(reference) happens
+ * in a separate thread, up in the setLowMemory() method, generally called
by the
+ * HeapMemoryMonitor.
+ *
+ * The first line of the lambda/closure, when it _eventually_ runs (in yet
another thread--
+ * a thread from the executor), will access what is _then_ the current
state, through
+ * memoryState, to createCancelationException().
+ */
+ return memoryState.schedule(() -> {
+ final CacheRuntimeException exception = memoryState
+ .createCancelationException(timeLimitMillis, query);
query.setQueryCanceledException(exception);
queryCanceledThreadLocal.set(true);
- if (logger.isInfoEnabled() && !cancelingDueToLowMemory) {
- logger.info(String.format("%s %s", exception.getMessage(), query));
- }
- }, timeLimitMillis, TimeUnit.MILLISECONDS);
- }
-
- private QueryExecutionTimeoutException createExpirationException(final long
timeLimitMillis) {
- return new QueryExecutionTimeoutException(
- String.format(
- "Query execution canceled after exceeding max execution time
%sms.",
- timeLimitMillis));
- }
-
- private QueryExecutionLowMemoryException createLowMemoryException() {
- return new QueryExecutionLowMemoryException(
- String.format(
- "Query execution canceled due to memory threshold crossed in
system, memory used: %s bytes.",
- LOW_MEMORY_USED_BYTES));
+ }, timeLimitMillis, TimeUnit.MILLISECONDS, executor, query);
}
private void logDebug(final DefaultQuery query, final String message) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 069adc1..e4fa2c3 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4468,7 +4468,7 @@ public class GemFireCacheImpl implements InternalCache,
InternalClientCache, Has
}
this.queryMonitor =
- new QueryMonitor(() -> (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(
+ new QueryMonitor((ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(
QUERY_MONITOR_THREAD_POOL_SIZE,
(runnable) -> new LoggingThread("QueryMonitor Thread",
runnable)),
this,
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index da999e1..21f1c9a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -97,7 +97,7 @@ public class QueryMessage extends
StreamingPartitionOperation.StreamingPartition
if (QueryMonitor.isLowMemory()) {
String reason = String.format(
"Query execution canceled due to memory threshold crossed in system,
memory used: %s bytes.",
- QueryMonitor.getMemoryUsedDuringLowMemory());
+ QueryMonitor.getMemoryUsedBytes());
throw new QueryExecutionLowMemoryException(reason);
}
if (Thread.interrupted()) {
@@ -162,7 +162,7 @@ public class QueryMessage extends
StreamingPartitionOperation.StreamingPartition
if (QueryMonitor.isLowMemory()) {
String reason = String.format(
"Query execution canceled due to memory threshold crossed in system,
memory used: %s bytes.",
- QueryMonitor.getMemoryUsedDuringLowMemory());
+ QueryMonitor.getMemoryUsedBytes());
// throw query exception to piggyback on existing error handling as
qp.executeQuery also
// throws the same error for low memory
throw new QueryExecutionLowMemoryException(reason);
@@ -246,7 +246,7 @@ public class QueryMessage extends
StreamingPartitionOperation.StreamingPartition
if (QueryMonitor.isLowMemory()) {
String reason = String.format(
"Query execution canceled due to memory threshold crossed in
system, memory used: %s bytes.",
- QueryMonitor.getMemoryUsedDuringLowMemory());
+ QueryMonitor.getMemoryUsedBytes());
throw new QueryExecutionLowMemoryException(reason);
} else if (query.isCanceled()) {
throw query.getQueryCanceledException();
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
index 1681509..36716b9 100644
---
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
@@ -55,7 +55,7 @@ public class QueryMonitorTest {
public void setUp() {
scheduledThreadPoolExecutor = mock(ScheduledThreadPoolExecutor.class);
when(scheduledThreadPoolExecutor.getQueue()).thenReturn(new
ArrayBlockingQueue<>(1));
- monitor = new QueryMonitor(() -> scheduledThreadPoolExecutor,
mock(InternalCache.class),
+ monitor = new QueryMonitor(scheduledThreadPoolExecutor,
mock(InternalCache.class),
max_execution_time);
captor = ArgumentCaptor.forClass(Runnable.class);
}
@@ -105,12 +105,6 @@ public class QueryMonitorTest {
}
@Test
- public void setLowMemoryTrueShutsDownExecutor() {
- monitor.setLowMemory(true, 1);
- Mockito.verify(scheduledThreadPoolExecutor, times(1)).shutdown();
- }
-
- @Test
public void setLowMemoryTrueThenFalseAllowsSubsequentMonitoring() {
monitor.setLowMemory(true, 1);
monitor.setLowMemory(false, 1);