This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new b1a6b8d92f4 SOLR-17150: Create MemAllowedLimit (#2708) (#2749)
b1a6b8d92f4 is described below
commit b1a6b8d92f4a6479ebbdd3f86c50717386c9d413
Author: Andrzej BiaĆecki <[email protected]>
AuthorDate: Tue Oct 8 14:08:43 2024 +0200
SOLR-17150: Create MemAllowedLimit (#2708) (#2749)
---
solr/CHANGES.txt | 2 +
.../org/apache/solr/search/MemAllowedLimit.java | 171 ++++++++++++++++++
.../java/org/apache/solr/search/QueryLimits.java | 3 +
.../apache/solr/search/TestMemAllowedLimit.java | 194 +++++++++++++++++++++
.../query-guide/pages/common-query-parameters.adoc | 11 +-
.../apache/solr/common/params/CommonParams.java | 6 +
6 files changed, 386 insertions(+), 1 deletion(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f4f3893ed2f..9d73916329c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -11,6 +11,8 @@ New Features
It will now be released as a fully supported Solr feature.
This feature closes SIP-13: Cross Data Center Replication. (Mark Miller,
Andrzej Bialecki, Jason Gerlowski, Houston Putman)
+* SOLR-17150: Implement `memAllowed` parameter to limit per-thread memory
allocations during request processing. (Andrzej Bialecki, Gus Heck)
+
Improvements
---------------------
* SOLR-17397: SkipExistingDocumentsProcessor now functions correctly with
child documents. (Tim Owens via Eric Pugh)
diff --git a/solr/core/src/java/org/apache/solr/search/MemAllowedLimit.java
b/solr/core/src/java/org/apache/solr/search/MemAllowedLimit.java
new file mode 100644
index 00000000000..1492b112085
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/search/MemAllowedLimit.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.invoke.MethodHandles;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.request.SolrQueryRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enforces a memory-based limit on a given SolrQueryRequest, as specified by
the {@code memAllowed}
+ * query parameter.
+ *
+ * <p>This class tracks per-thread memory allocations during a request using
its own ThreadLocal. It
+ * records the current thread allocation when the instance was created
(typically at the start of
+ * SolrQueryRequest processing) as a starting point, and then on every call to
{@link #shouldExit()}
+ * it accumulates the amount of reported allocated memory since the previous
call, and compares the
+ * accumulated amount to the configured threshold, expressed in mebi-bytes.
+ *
+ * <p>NOTE: this class accesses {@code
+ * com.sun.management.ThreadMXBean#getCurrentThreadAllocatedBytes} using
reflection. On JVM-s where
+ * this implementation is not available an exception will be thrown when
attempting to use the
+ * {@code memAllowed} parameter.
+ */
+public class MemAllowedLimit implements QueryLimit {
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final double MEBI = 1024.0 * 1024.0;
+ private static final ThreadMXBean threadBean =
ManagementFactory.getThreadMXBean();
+ private static final Method GET_BYTES_METHOD;
+ private static final boolean supported;
+
+ static {
+ boolean testSupported;
+ Method getBytesMethod = null;
+ try {
+ Class<?> sunThreadBeanClz =
Class.forName("com.sun.management.ThreadMXBean");
+ if (sunThreadBeanClz.isAssignableFrom(threadBean.getClass())) {
+ Method m =
sunThreadBeanClz.getMethod("isThreadAllocatedMemorySupported");
+ Boolean supported = (Boolean) m.invoke(threadBean);
+ if (supported) {
+ m = sunThreadBeanClz.getMethod("setThreadAllocatedMemoryEnabled",
boolean.class);
+ m.invoke(threadBean, Boolean.TRUE);
+ testSupported = true;
+ getBytesMethod =
sunThreadBeanClz.getMethod("getCurrentThreadAllocatedBytes");
+ } else {
+ testSupported = false;
+ }
+ } else {
+ testSupported = false;
+ }
+ } catch (Exception e) {
+ testSupported = false;
+ }
+ supported = testSupported;
+ GET_BYTES_METHOD = getBytesMethod;
+ }
+
+ private static final ThreadLocal<AtomicLong> threadLocalMem =
+ ThreadLocal.withInitial(() -> new AtomicLong(-1L));
+
+ private long limitBytes;
+ private final AtomicLong accumulatedMem = new AtomicLong();
+ private long exitedAt = 0;
+
+ public MemAllowedLimit(SolrQueryRequest req) {
+ if (!supported) {
+ throw new IllegalArgumentException(
+ "Per-thread memory allocation monitoring not available in this
JVM.");
+ }
+ float reqMemLimit = req.getParams().getFloat(CommonParams.MEM_ALLOWED,
-1.0f);
+ if (reqMemLimit <= 0.0f) {
+ throw new IllegalArgumentException(
+ "Check for limit with hasMemLimit(req) before creating a
MemAllowedLimit!");
+ }
+ limitBytes = Math.round(reqMemLimit * MEBI);
+ // init the thread-local
+ init();
+ }
+
+ @VisibleForTesting
+ MemAllowedLimit(float memLimit) {
+ if (!supported) {
+ throw new IllegalArgumentException(
+ "Per-thread memory allocation monitoring not available in this
JVM.");
+ }
+ limitBytes = Math.round(memLimit * MEBI);
+ // init the thread-local
+ init();
+ }
+
+ private final void init() {
+ long currentAllocatedBytes;
+ try {
+ currentAllocatedBytes = (Long) GET_BYTES_METHOD.invoke(threadBean);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unexpected error checking thread
allocation!", e);
+ }
+ AtomicLong threadMem = threadLocalMem.get();
+ threadMem.compareAndSet(-1L, currentAllocatedBytes);
+ }
+
+ private long getCurrentAllocatedBytes() {
+ try {
+ return (Long) GET_BYTES_METHOD.invoke(threadBean);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unexpected error checking thread
allocation!", e);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean isSupported() {
+ return supported;
+ }
+
+ static boolean hasMemLimit(SolrQueryRequest req) {
+ return req.getParams().getFloat(CommonParams.MEM_ALLOWED, -1.0f) > 0.0f;
+ }
+
+ @Override
+ public boolean shouldExit() {
+ if (exitedAt > 0L) {
+ return true;
+ }
+
+ try {
+ long currentAllocatedBytes = getCurrentAllocatedBytes();
+ AtomicLong threadMem = threadLocalMem.get();
+ long lastAllocatedBytes = threadMem.get();
+ accumulatedMem.addAndGet(currentAllocatedBytes - lastAllocatedBytes);
+ threadMem.set(currentAllocatedBytes);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "mem limit thread {} remaining delta {}",
+ Thread.currentThread().getName(),
+ (limitBytes - accumulatedMem.get()));
+ }
+ if (limitBytes < accumulatedMem.get()) {
+ exitedAt = accumulatedMem.get();
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unexpected error checking thread
allocation!", e);
+ }
+ }
+
+ @Override
+ public Object currentValue() {
+ return exitedAt > 0 ? exitedAt : accumulatedMem.get();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/search/QueryLimits.java
b/solr/core/src/java/org/apache/solr/search/QueryLimits.java
index 86c7f488de3..c2bd205c41d 100644
--- a/solr/core/src/java/org/apache/solr/search/QueryLimits.java
+++ b/solr/core/src/java/org/apache/solr/search/QueryLimits.java
@@ -68,6 +68,9 @@ public class QueryLimits implements QueryTimeout {
if (hasCpuLimit(req)) {
limits.add(new CpuAllowedLimit(req));
}
+ if (MemAllowedLimit.hasMemLimit(req)) {
+ limits.add(new MemAllowedLimit(req));
+ }
}
// for testing
if (TestInjection.queryTimeout != null) {
diff --git a/solr/core/src/test/org/apache/solr/search/TestMemAllowedLimit.java
b/solr/core/src/test/org/apache/solr/search/TestMemAllowedLimit.java
new file mode 100644
index 00000000000..5dac479eb90
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/search/TestMemAllowedLimit.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.index.NoMergePolicyFactory;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.ThreadCpuTimer;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@LogLevel("org.apache.solr.search.MemAllowedLimit=DEBUG")
+public class TestMemAllowedLimit extends SolrCloudTestCase {
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION = "test";
+
+ private static Path createConfigSet() throws Exception {
+ Path configSet = createTempDir();
+ copyMinConf(configSet.toFile());
+ // insert an expensive search component
+ Path solrConfig = configSet.resolve("conf/solrconfig.xml");
+ Files.writeString(
+ solrConfig,
+ Files.readString(solrConfig)
+ .replace(
+ "<requestHandler",
+ "<searchComponent name=\"expensiveSearchComponent\"\n"
+ + "
class=\"org.apache.solr.search.ExpensiveSearchComponent\"/>\n"
+ + "\n"
+ + " <requestHandler")
+ .replace(
+ "class=\"solr.SearchHandler\">",
+ "class=\"solr.SearchHandler\">\n"
+ + " <arr name=\"first-components\">\n"
+ + " <str>expensiveSearchComponent</str>\n"
+ + " </arr>\n"));
+ return configSet.resolve("conf");
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // Using NoMergePolicy and 100 commits we should get 100 segments (across
all shards).
+ // At this point of writing MAX_SEGMENTS_PER_SLICE in lucene is 5, so we
should be
+ // ensured that any multithreaded testing will create 20 executable tasks
for the
+ // executor that was provided to index-searcher.
+
systemSetPropertySolrTestsMergePolicyFactory(NoMergePolicyFactory.class.getName());
+ System.setProperty(ThreadCpuTimer.ENABLE_CPU_TIME, "true");
+ System.setProperty("metricsEnabled", "true");
+ Path configset = createConfigSet();
+ configureCluster(1).addConfig("conf", configset).configure();
+ SolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2);
+ create.process(solrClient);
+ waitForState("active", COLLECTION, clusterShape(3, 6));
+ for (int j = 0; j < 100; j++) {
+ solrClient.add(COLLECTION, sdoc("id", "id-" + j, "val_i", j % 5));
+ solrClient.commit(COLLECTION); // need to commit every doc to create
many segments.
+ }
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ TestInjection.cpuTimerDelayInjectedNS = null;
+ systemClearPropertySolrTestsMergePolicyFactory();
+ }
+
+ @Test
+ public void testLimit() throws Exception {
+ Assume.assumeTrue("Thread memory monitoring is not available",
MemAllowedLimit.isSupported());
+ long limitMs = 100000;
+ // 1 MiB
+ MemAllowedLimit memLimit = new MemAllowedLimit(1f);
+ ArrayList<byte[]> data = new ArrayList<>();
+ long startNs = System.nanoTime();
+ int wakeups = 0;
+ while (!memLimit.shouldExit()) {
+ Thread.sleep(100);
+ // allocate memory
+ for (int i = 0; i < 20; i++) {
+ data.add(new byte[5000]);
+ }
+ wakeups++;
+ }
+ long endNs = System.nanoTime();
+ assertTrue(data.size() > 1);
+ long wallTimeDeltaMs = TimeUnit.MILLISECONDS.convert(endNs - startNs,
TimeUnit.NANOSECONDS);
+ log.info(
+ "Time limit: {} ms, elapsed wall-clock: {} ms, wakeups: {}",
+ limitMs,
+ wallTimeDeltaMs,
+ wakeups);
+ assertTrue("Number of wakeups should be smaller than 100 but was " +
wakeups, wakeups < 100);
+ assertTrue(
+ "Elapsed wall-clock time expected much smaller than 100ms but was " +
wallTimeDeltaMs,
+ limitMs > wallTimeDeltaMs);
+ }
+
+ @Test
+ public void testDistribLimit() throws Exception {
+ Assume.assumeTrue("Thread memory monitoring is not available",
MemAllowedLimit.isSupported());
+ SolrClient solrClient = cluster.getSolrClient();
+ // no limits set - should complete
+ long dataSize = 150; // 150 KiB
+ QueryResponse rsp =
+ solrClient.query(
+ COLLECTION,
+ params("q", "id:*", "sort", "id desc", "dataSize",
String.valueOf(dataSize)));
+ assertEquals(rsp.getHeader().get("status"), 0);
+ assertNull("should not have partial results",
rsp.getHeader().get("partialResults"));
+
+ // memAllowed set with large value, should return full results
+ rsp =
+ solrClient.query(
+ COLLECTION,
+ params(
+ "q",
+ "id:*",
+ "sort",
+ "id asc",
+ "memLoadCount",
+ String.valueOf(dataSize),
+ "stages",
+ "prepare,process",
+ "memAllowed",
+ "1.5"));
+ assertNull("should have full results",
rsp.getHeader().get("partialResults"));
+
+ // memAllowed set, should return partial results
+ rsp =
+ solrClient.query(
+ COLLECTION,
+ params(
+ "q",
+ "id:*",
+ "sort",
+ "id asc",
+ "memLoadCount",
+ String.valueOf(dataSize),
+ "stages",
+ "prepare,process",
+ "memAllowed",
+ "0.2"));
+ assertNotNull("should have partial results",
rsp.getHeader().get("partialResults"));
+
+ // multi-threaded search
+ // memAllowed set, should return partial results
+ rsp =
+ solrClient.query(
+ COLLECTION,
+ params(
+ "q",
+ "id:*",
+ "sort",
+ "id asc",
+ "memLoadCount",
+ String.valueOf(dataSize),
+ "stages",
+ "prepare,process",
+ "multiThreaded",
+ "true",
+ "memAllowed",
+ "0.2"));
+ assertNotNull("should have partial results",
rsp.getHeader().get("partialResults"));
+ }
+}
diff --git
a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc
b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc
index ce6d445bea6..7949d045e06 100644
--- a/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc
+++ b/solr/solr-ref-guide/modules/query-guide/pages/common-query-parameters.adoc
@@ -382,11 +382,20 @@ This parameter specifies the amount of CPU time, in
milliseconds, allowed for a
In contrast to the `timeAllowed` this parameter monitors the actual CPU usage
by the thread that executes the query. The same CPU usage limit is applied to
the query coordinator as to each replica that participates in the distributed
search (although reaching this limit first in the query coordinator is
unlikely).
Should any replica locally exceed the allowed CPU time the whole distributed
search will be terminated (by canceling requests to other shards).
-Note: the same CPU limit is applied to each stage in the distributed query
processing. Typically this involves two or more stages (e.g. getting top
document id-s, retrieving their fields, additional stages may be required for
faceting, grouping, etc).
+Note: the same CPU limit is applied to each stage in the distributed query
processing. Typically this involves two or more stages where the request is
processed by different
+Solr nodes (e.g. getting top document id-s, retrieving their fields,
additional stages may be required for faceting, grouping, etc).
For example, setting `cpuAllowed=500` gives a limit of at most 500 ms of CPU
time for each of these stages - meaning that the total CPU usage by the query
may reach a multiple of the `cpuAllowed` value depending on the number of
stages.
All other considerations regarding partial results listed for the
`timeAllowed` parameter apply here, too.
+== memAllowed Parameter
+
+This parameter specifies the amount of memory (a float value, in MiB) allowed
for a search thread to allocate
+during query execution. This parameter monitors additional memory allocated in
the thread that executes the
+current query. The same limit is applied both to the query coordinator and to
each replica that participates in
+the distributed search. As is the case with the `cpuAllowed` also here the
limit is applied separately to each stage
+of the distributed query processing.
+
== segmentTerminateEarly Parameter
[%autowidth,frame=none]
diff --git
a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
index 198bd7ef79c..3055708d29e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
@@ -177,6 +177,12 @@ public interface CommonParams {
*/
String CPU_ALLOWED = "cpuAllowed";
+ /**
+ * Max query memory allocation value in mebibytes (float). If not set, or
the value is <= 0.0,
+ * there is no limit.
+ */
+ String MEM_ALLOWED = "memAllowed";
+
/** Is the query cancellable? */
String IS_QUERY_CANCELLABLE = "canCancel";