This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d6f823c24f Improve/standardize rate limiting logic in Monitor (#4894)
d6f823c24f is described below
commit d6f823c24f4ecfcdc4a0584c72ef9911de1c6b6c
Author: Dom G. <[email protected]>
AuthorDate: Wed Jan 15 15:45:02 2025 -0500
Improve/standardize rate limiting logic in Monitor (#4894)
* Improve/standardize rate limiting logic in monitor
---
.../java/org/apache/accumulo/monitor/Monitor.java | 261 +++++++++------------
.../rest/compactions/external/ECResource.java | 2 +-
2 files changed, 117 insertions(+), 146 deletions(-)
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 17ed173714..548d2a985c 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.monitor;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import java.net.InetAddress;
@@ -27,10 +28,10 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -75,6 +76,7 @@ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.Threads;
import
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
@@ -178,7 +180,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
private Exception problemException;
private GCStatus gcStatus;
private Optional<HostAndPort> coordinatorHost = Optional.empty();
- private long coordinatorCheckNanos = 0L;
+ private Timer coordinatorCheck = null;
private CompactionCoordinatorService.Client coordinatorClient;
private final String coordinatorMissingMsg =
"Error getting the compaction coordinator. Check that it is running. It
is not "
@@ -388,11 +390,10 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
// check for compaction coordinator host and only notify its discovery
- Optional<HostAndPort> previousHost;
- if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) {
- previousHost = coordinatorHost;
+ if (coordinatorCheck == null ||
coordinatorCheck.hasElapsed(expirationTimeMinutes, MINUTES)) {
+ Optional<HostAndPort> previousHost = coordinatorHost;
coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
- coordinatorCheckNanos = System.nanoTime();
+ coordinatorCheck = Timer.startNew();
if (previousHost.isEmpty() && coordinatorHost.isPresent()) {
log.info("External Compaction Coordinator found at {}",
coordinatorHost.orElseThrow());
}
@@ -611,112 +612,78 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
}
- private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>();
- private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>();
- private final Map<HostAndPort,CompactionStats> allCompactions = new
HashMap<>();
+ private final long expirationTimeMinutes = 1;
+
+ // Use Suppliers.memoizeWithExpiration() to cache the results of expensive
fetch operations. This
+ // avoids unnecessary repeated fetches within the expiration period and
ensures that multiple
+ // requests around the same time use the same cached data.
+ private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier =
+ Suppliers.memoizeWithExpiration(this::fetchTServerScans,
expirationTimeMinutes, MINUTES);
+
+ private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier =
+ Suppliers.memoizeWithExpiration(this::fetchSServerScans,
expirationTimeMinutes, MINUTES);
+
+ private final Supplier<Map<HostAndPort,CompactionStats>> compactionsSupplier
=
+ Suppliers.memoizeWithExpiration(this::fetchCompactions,
expirationTimeMinutes, MINUTES);
+
+ private final Supplier<ExternalCompactionInfo> compactorInfoSupplier =
+ Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo,
expirationTimeMinutes, MINUTES);
+
+ private final Supplier<ExternalCompactionsSnapshot>
externalCompactionsSupplier =
+ Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot,
+ expirationTimeMinutes, MINUTES);
+
private final RecentLogs recentLogs = new RecentLogs();
- private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
-
- private long scansFetchedNanos = System.nanoTime();
- private long compactsFetchedNanos = System.nanoTime();
- private long ecInfoFetchedNanos = System.nanoTime();
- private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
- private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
- // When there are a large amount of external compactions running the list of
external compactions
- // could consume a lot of memory. The purpose of this memoizing supplier is
to try to avoid
- // creating the list of running external compactions in memory per web
request. If multiple
- // request come in around the same time they should use the same list. It is
still possible to
- // have multiple list in memory if one request obtains a copy and then
another request comes in
- // after the timeout and the supplier recomputes the list. The longer the
timeout on the supplier
- // is the less likely we are to have multiple list of external compactions
in memory, however
- // increasing the timeout will make the monitor less responsive.
- private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot =
- Suppliers.memoizeWithExpiration(() ->
computeExternalCompactionsSnapshot(), fetchTimeNanos,
- TimeUnit.NANOSECONDS);
/**
- * Fetch the active scans but only if fetchTimeNanos has elapsed.
+ * @return active tablet server scans. Values are cached and refresh after
+ * {@link #expirationTimeMinutes}.
*/
- public synchronized Map<HostAndPort,ScanStats> getScans() {
- if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
- log.info("User initiated fetch of Active TabletServer Scans");
- fetchScans();
- }
- return Map.copyOf(tserverScans);
+ public Map<HostAndPort,ScanStats> getScans() {
+ return tserverScansSupplier.get();
}
- public synchronized Map<HostAndPort,ScanStats> getScanServerScans() {
- if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
- log.info("User initiated fetch of Active ScanServer Scans");
- fetchScans();
- }
- return Map.copyOf(sserverScans);
+ /**
+ * @return active scan server scans. Values are cached and refresh after
+ * {@link #expirationTimeMinutes}.
+ */
+ public Map<HostAndPort,ScanStats> getScanServerScans() {
+ return sserverScansSupplier.get();
}
/**
- * Fetch the active compactions but only if fetchTimeNanos has elapsed.
+ * @return active compactions. Values are cached and refresh after {@link
#expirationTimeMinutes}.
*/
- public synchronized Map<HostAndPort,CompactionStats> getCompactions() {
- if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) {
- log.info("User initiated fetch of Active Compactions");
- fetchCompactions();
- }
- return Map.copyOf(allCompactions);
+ public Map<HostAndPort,CompactionStats> getCompactions() {
+ return compactionsSupplier.get();
}
- public synchronized ExternalCompactionInfo getCompactorsInfo() {
+ /**
+ * @return external compaction information. Values are cached and refresh
after
+ * {@link #expirationTimeMinutes}.
+ */
+ public ExternalCompactionInfo getCompactorsInfo() {
if (coordinatorHost.isEmpty()) {
throw new IllegalStateException("Tried fetching from compaction
coordinator that's missing");
}
- if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
- log.info("User initiated fetch of External Compaction info");
- Map<String,List<HostAndPort>> compactors =
- ExternalCompactionUtil.getCompactorAddrs(getContext());
- log.debug("Found compactors: " + compactors);
- ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
- ecInfo.setCompactors(compactors);
- ecInfo.setCoordinatorHost(coordinatorHost);
-
- ecInfoFetchedNanos = System.nanoTime();
- }
- return ecInfo;
+ return compactorInfoSupplier.get();
}
- private static class ExternalCompactionsSnapshot {
- public final RunningCompactions runningCompactions;
- public final Map<String,TExternalCompaction> ecRunningMap;
-
- private
ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>>
ecRunningMapOpt) {
- this.ecRunningMap =
-
ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap());
- this.runningCompactions = new RunningCompactions(this.ecRunningMap);
- }
- }
-
- private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
- if (coordinatorHost.isEmpty()) {
- throw new IllegalStateException(coordinatorMissingMsg);
- }
- var ccHost = coordinatorHost.orElseThrow();
- log.info("User initiated fetch of running External Compactions from " +
ccHost);
- var client = getCoordinator(ccHost);
- TExternalCompactionList running;
- try {
- running = client.getRunningCompactions(TraceUtil.traceInfo(),
getContext().rpcCreds());
- } catch (Exception e) {
- throw new IllegalStateException("Unable to get running compactions from
" + ccHost, e);
- }
-
- return new
ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions()));
- }
-
- public RunningCompactions getRunnningCompactions() {
- return extCompactionSnapshot.get().runningCompactions;
+ /**
+ * @return running compactions. Values are cached and refresh after
+ * {@link #expirationTimeMinutes}.
+ */
+ public RunningCompactions getRunningCompactions() {
+ return externalCompactionsSupplier.get().runningCompactions;
}
+ /**
+ * @return running compactor details. Values are cached and refresh after
+ * {@link #expirationTimeMinutes}.
+ */
public RunningCompactorDetails
getRunningCompactorDetails(ExternalCompactionId ecid) {
TExternalCompaction extCompaction =
- extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical());
+ externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical());
if (extCompaction == null) {
return null;
}
@@ -736,61 +703,36 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
return coordinatorClient;
}
- private void fetchScans() {
+ private Map<HostAndPort,ScanStats> fetchScans(Collection<String> servers) {
ServerContext context = getContext();
- for (String server : context.instanceOperations().getTabletServers()) {
+ Map<HostAndPort,ScanStats> scans = new HashMap<>();
+ for (String server : servers) {
final HostAndPort parsedServer = HostAndPort.fromString(server);
- TabletScanClientService.Client tserver = null;
+ TabletScanClientService.Client client = null;
try {
- tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, context);
- List<ActiveScan> scans = tserver.getActiveScans(null,
context.rpcCreds());
- tserverScans.put(parsedServer, new ScanStats(scans));
- scansFetchedNanos = System.nanoTime();
+ client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, context);
+ List<ActiveScan> activeScans = client.getActiveScans(null,
context.rpcCreds());
+ scans.put(parsedServer, new ScanStats(activeScans));
} catch (Exception ex) {
log.error("Failed to get active scans from {}", server, ex);
} finally {
- ThriftUtil.returnClient(tserver, context);
- }
- }
- // Age off old scan information
- Iterator<Entry<HostAndPort,ScanStats>> tserverIter =
tserverScans.entrySet().iterator();
- // clock time used for fetched for date friendly display
- long now = System.currentTimeMillis();
- while (tserverIter.hasNext()) {
- Entry<HostAndPort,ScanStats> entry = tserverIter.next();
- if (now - entry.getValue().fetched > ageOffEntriesMillis) {
- tserverIter.remove();
- }
- }
- // Scan Servers
- for (String server : context.instanceOperations().getScanServers()) {
- final HostAndPort parsedServer = HostAndPort.fromString(server);
- TabletScanClientService.Client sserver = null;
- try {
- sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, context);
- List<ActiveScan> scans = sserver.getActiveScans(null,
context.rpcCreds());
- sserverScans.put(parsedServer, new ScanStats(scans));
- scansFetchedNanos = System.nanoTime();
- } catch (Exception ex) {
- log.error("Failed to get active scans from {}", server, ex);
- } finally {
- ThriftUtil.returnClient(sserver, context);
- }
- }
- // Age off old scan information
- Iterator<Entry<HostAndPort,ScanStats>> sserverIter =
sserverScans.entrySet().iterator();
- // clock time used for fetched for date friendly display
- now = System.currentTimeMillis();
- while (sserverIter.hasNext()) {
- Entry<HostAndPort,ScanStats> entry = sserverIter.next();
- if (now - entry.getValue().fetched > ageOffEntriesMillis) {
- sserverIter.remove();
+ ThriftUtil.returnClient(client, context);
}
}
+ return Collections.unmodifiableMap(scans);
}
- private void fetchCompactions() {
+ private Map<HostAndPort,ScanStats> fetchTServerScans() {
+ return fetchScans(getContext().instanceOperations().getTabletServers());
+ }
+
+ private Map<HostAndPort,ScanStats> fetchSServerScans() {
+ return fetchScans(getContext().instanceOperations().getScanServers());
+ }
+
+ private Map<HostAndPort,CompactionStats> fetchCompactions() {
ServerContext context = getContext();
+ Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
for (String server : context.instanceOperations().getTabletServers()) {
final HostAndPort parsedServer = HostAndPort.fromString(server);
Client tserver = null;
@@ -798,23 +740,52 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
parsedServer, context);
var compacts = tserver.getActiveCompactions(null, context.rpcCreds());
allCompactions.put(parsedServer, new CompactionStats(compacts));
- compactsFetchedNanos = System.nanoTime();
} catch (Exception ex) {
log.debug("Failed to get active compactions from {}", server, ex);
} finally {
ThriftUtil.returnClient(tserver, context);
}
}
- // Age off old compaction information
- var entryIter = allCompactions.entrySet().iterator();
- // clock time used for fetched for date friendly display
- long now = System.currentTimeMillis();
- while (entryIter.hasNext()) {
- var entry = entryIter.next();
- if (now - entry.getValue().fetched > ageOffEntriesMillis) {
- entryIter.remove();
- }
+ return Collections.unmodifiableMap(allCompactions);
+ }
+
+ private ExternalCompactionInfo fetchCompactorsInfo() {
+ ServerContext context = getContext();
+ Map<String,List<HostAndPort>> compactors =
ExternalCompactionUtil.getCompactorAddrs(context);
+ log.debug("Found compactors: {}", compactors);
+ ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+ ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
+ ecInfo.setCompactors(compactors);
+ ecInfo.setCoordinatorHost(coordinatorHost);
+ return ecInfo;
+ }
+
+ private static class ExternalCompactionsSnapshot {
+ public final RunningCompactions runningCompactions;
+ public final Map<String,TExternalCompaction> ecRunningMap;
+
+ private
ExternalCompactionsSnapshot(Optional<Map<String,TExternalCompaction>>
ecRunningMapOpt) {
+ this.ecRunningMap =
+
ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap());
+ this.runningCompactions = new RunningCompactions(this.ecRunningMap);
+ }
+ }
+
+ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
+ if (coordinatorHost.isEmpty()) {
+ throw new IllegalStateException(coordinatorMissingMsg);
+ }
+ var ccHost = coordinatorHost.orElseThrow();
+ log.info("User initiated fetch of running External Compactions from " +
ccHost);
+ var client = getCoordinator(ccHost);
+ TExternalCompactionList running;
+ try {
+ running = client.getRunningCompactions(TraceUtil.traceInfo(),
getContext().rpcCreds());
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to get running compactions from
" + ccHost, e);
}
+
+ return new
ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions()));
}
/**
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
index 72d54d70a4..5fcecef349 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -60,7 +60,7 @@ public class ECResource {
@Path("running")
@GET
public RunningCompactions getRunning() {
- return monitor.getRunnningCompactions();
+ return monitor.getRunningCompactions();
}
@Path("details")