This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c475dbd02f Fixup major compactions on the Monitor (#4970)
c475dbd02f is described below
commit c475dbd02f323aafdcafb2ce37872ed3de2144dc
Author: Dave Marion <[email protected]>
AuthorDate: Tue Oct 15 07:45:36 2024 -0400
Fixup major compactions on the Monitor (#4970)
---
.../java/org/apache/accumulo/monitor/Monitor.java | 84 +++++++++++-----------
.../monitor/rest/compactions/CompactionInfo.java | 7 +-
.../rest/compactions/CompactionsResource.java | 9 +--
.../rest/statistics/StatisticsResource.java | 11 +++
4 files changed, 59 insertions(+), 52 deletions(-)
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 48ee64dc51..99855f806e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -78,7 +78,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Cl
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.Threads;
import
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
import
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactions;
@@ -90,6 +89,7 @@ import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.util.TableInfoUtil;
+import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -162,6 +162,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
private final List<Pair<Long,Double>> ingestRateOverTime = newMaxList();
private final List<Pair<Long,Double>> ingestByteRateOverTime = newMaxList();
private final List<Pair<Long,Integer>> minorCompactionsOverTime =
newMaxList();
+ private final List<Pair<Long,Integer>> majorCompactionsOverTime =
newMaxList();
private final List<Pair<Long,Double>> lookupsOverTime = newMaxList();
private final List<Pair<Long,Long>> queryRateOverTime = newMaxList();
private final List<Pair<Long,Long>> scanRateOverTime = newMaxList();
@@ -179,13 +180,9 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
private Map<TableId,Map<ProblemType,Integer>> problemSummary =
Collections.emptyMap();
private Exception problemException;
private GCStatus gcStatus;
- private Optional<HostAndPort> coordinatorHost = Optional.empty();
- private long coordinatorCheckNanos = 0L;
- private CompactionCoordinatorService.Client coordinatorClient;
+ private volatile Optional<HostAndPort> coordinatorHost = Optional.empty();
private final String coordinatorMissingMsg =
- "Error getting the compaction coordinator. Check that it is running. It
is not "
- + "started automatically with other cluster processes so must be
started by running "
- + "'accumulo compaction-coordinator'.";
+ "Error getting the compaction coordinator client. Check that the Manager
is running.";
private EmbeddedWebServer server;
private int livePort = 0;
@@ -282,11 +279,25 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
if (client != null) {
mmi = client.getManagerStats(TraceUtil.traceInfo(),
context.rpcCreds());
retry = false;
+ // Now that Manager is up, set the coordinator host
+ Set<ServerId> managers =
context.instanceOperations().getServers(ServerId.Type.MANAGER);
+ if (managers == null || managers.isEmpty()) {
+ throw new IllegalStateException(
+ "io.getServers returned nothing for Manager, but it's up.");
+ }
+ ServerId manager = managers.iterator().next();
+ Optional<HostAndPort> nextCoordinatorHost =
+
Optional.of(HostAndPort.fromString(manager.toHostPortString()));
+ if (coordinatorHost.isEmpty()
+ ||
!coordinatorHost.orElseThrow().equals(nextCoordinatorHost.orElseThrow())) {
+ coordinatorHost = nextCoordinatorHost;
+ }
} else {
mmi = null;
log.error("Unable to get info from Manager");
}
gcStatus = fetchGcStatus();
+
} catch (Exception e) {
mmi = null;
log.info("Error fetching stats: ", e);
@@ -299,6 +310,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
+
if (mmi != null) {
int minorCompactions = 0;
@@ -364,6 +376,8 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
loadOverTime.add(new Pair<>(currentTime, totalLoad));
minorCompactionsOverTime.add(new Pair<>(currentTime,
minorCompactions));
+ majorCompactionsOverTime
+ .add(new Pair<>(currentTime,
getRunnningCompactions().running.size()));
lookupsOverTime.add(new Pair<>(currentTime,
lookupRateTracker.calculateRate()));
@@ -386,22 +400,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
this.problemException = e;
}
- // check for compaction coordinator host and only notify its discovery
- Optional<HostAndPort> previousHost;
- if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) {
- previousHost = coordinatorHost;
- coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
- coordinatorCheckNanos = System.nanoTime();
- if (previousHost.isEmpty() && coordinatorHost.isPresent()) {
- log.info("External Compaction Coordinator found at {}",
coordinatorHost.orElseThrow());
- }
- }
-
} finally {
- if (coordinatorClient != null) {
- ThriftUtil.returnClient(coordinatorClient, context);
- coordinatorClient = null;
- }
lastRecalc.set(currentTime);
// stop fetching; log an error if this thread wasn't already fetching
if (!fetching.compareAndSet(true, false)) {
@@ -698,15 +697,25 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
var ccHost = coordinatorHost.orElseThrow();
log.info("User initiated fetch of running External Compactions from " +
ccHost);
- var client = getCoordinator(ccHost);
- TExternalCompactionList running;
try {
- running = client.getRunningCompactions(TraceUtil.traceInfo(),
getContext().rpcCreds());
- } catch (Exception e) {
- throw new IllegalStateException("Unable to get running compactions from
" + ccHost, e);
+ CompactionCoordinatorService.Client client =
+ ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost,
getContext());
+ TExternalCompactionList running;
+ try {
+ running = client.getRunningCompactions(TraceUtil.traceInfo(),
getContext().rpcCreds());
+ return new ExternalCompactionsSnapshot(
+ running.getCompactions() == null ? Map.of() :
running.getCompactions());
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to get running compactions
from " + ccHost, e);
+ } finally {
+ if (client != null) {
+ ThriftUtil.returnClient(client, getContext());
+ }
+ }
+ } catch (TTransportException e) {
+ log.error("Unable to get Compaction coordinator at {}", ccHost);
+ throw new IllegalStateException(coordinatorMissingMsg, e);
}
-
- return new ExternalCompactionsSnapshot(running.getCompactions());
}
public RunningCompactions getRunnningCompactions() {
@@ -722,19 +731,6 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
return new RunningCompactorDetails(extCompaction);
}
- private CompactionCoordinatorService.Client getCoordinator(HostAndPort
address) {
- if (coordinatorClient == null) {
- try {
- coordinatorClient =
- ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, address,
getContext());
- } catch (Exception e) {
- log.error("Unable to get Compaction coordinator at {}", address);
- throw new IllegalStateException(coordinatorMissingMsg, e);
- }
- }
- return coordinatorClient;
- }
-
private void fetchScans() {
final ServerContext context = getContext();
final Set<ServerId> servers = new HashSet<>();
@@ -998,6 +994,10 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
return new ArrayList<>(minorCompactionsOverTime);
}
+ public List<Pair<Long,Integer>> getMajorCompactionsOverTime() {
+ return new ArrayList<>(majorCompactionsOverTime);
+ }
+
public List<Pair<Long,Double>> getLookupsOverTime() {
return new ArrayList<>(lookupsOverTime);
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
index 5a71fa65e5..f222bc42ed 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionInfo.java
@@ -18,9 +18,10 @@
*/
package org.apache.accumulo.monitor.rest.compactions;
-import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
import org.apache.accumulo.monitor.Monitor;
+import com.google.common.net.HostAndPort;
+
/**
* Generates a compaction info JSON object
*
@@ -40,8 +41,8 @@ public class CompactionInfo {
/**
* Stores new compaction information
*/
- public CompactionInfo(TabletServerStatus tserverInfo,
Monitor.CompactionStats stats) {
- this.server = tserverInfo.getName();
+ public CompactionInfo(HostAndPort address, Monitor.CompactionStats stats) {
+ this.server = address.toString();
this.fetched = stats.fetched;
this.count = stats.count;
this.oldest = stats.oldest;
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
index 8ceff7f971..b96f9f774d 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/CompactionsResource.java
@@ -27,7 +27,6 @@ import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
-import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
import org.apache.accumulo.monitor.Monitor;
import com.google.common.net.HostAndPort;
@@ -59,12 +58,8 @@ public class CompactionsResource {
Map<HostAndPort,Monitor.CompactionStats> entry = monitor.getCompactions();
- for (TabletServerStatus tserverInfo : mmi.getTServerInfo()) {
- var stats = entry.get(HostAndPort.fromString(tserverInfo.name));
- if (stats != null) {
- compactions.addCompaction(new CompactionInfo(tserverInfo, stats));
- }
- }
+ entry.forEach((k, v) -> compactions.addCompaction(new CompactionInfo(k,
v)));
+
return compactions;
}
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
index 13573379ec..8e43e33567 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/statistics/StatisticsResource.java
@@ -251,6 +251,17 @@ public class StatisticsResource {
return monitor.getMinorCompactionsOverTime();
}
+ /**
+ * Generates a list with the major compactions over time
+ *
+ * @return Major compactions over time
+ */
+ @GET
+ @Path("time/majorCompactions")
+ public List<Pair<Long,Integer>> getMajorCompactions() {
+ return monitor.getMajorCompactionsOverTime();
+ }
+
/**
* Generates a list with the lookups over time
*