This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3101fc2bc8 Modified Monitor Scans page to use new v2 api (#6393)
3101fc2bc8 is described below
commit 3101fc2bc824fab64751f012c71c1bebd133486c
Author: Dave Marion <[email protected]>
AuthorDate: Thu May 28 15:19:04 2026 -0400
Modified Monitor Scans page to use new v2 api (#6393)
---
.../apache/accumulo/monitor/next/Endpoints.java | 9 ++
.../accumulo/monitor/next/InformationFetcher.java | 103 ++++++++++++++++++---
.../accumulo/monitor/next/SystemInformation.java | 26 +++++-
.../accumulo/monitor/resources/js/functions.js | 3 +-
.../apache/accumulo/monitor/resources/js/scans.js | 54 ++++++++---
.../apache/accumulo/monitor/templates/scans.ftl | 23 +++--
6 files changed, 177 insertions(+), 41 deletions(-)
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java
index d3bbd20147..23c6211a38 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/Endpoints.java
@@ -63,6 +63,7 @@ import
org.apache.accumulo.monitor.next.SystemInformation.CompactionTableSummary
import org.apache.accumulo.monitor.next.SystemInformation.FateTransaction;
import org.apache.accumulo.monitor.next.SystemInformation.FetchCycleTimes;
import org.apache.accumulo.monitor.next.SystemInformation.RecoveryInformation;
+import org.apache.accumulo.monitor.next.SystemInformation.Scan;
import org.apache.accumulo.monitor.next.SystemInformation.TableSummary;
import
org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
@@ -237,6 +238,14 @@ public class Endpoints {
return
monitor.getInformationFetcher().getSummaryForEndpoint().getCompactorAllMetricSummary();
}
+ @GET
+ @Path("scans")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Description("Returns a list of active scans")
+ public Set<Scan> getScans() {
+ return
monitor.getInformationFetcher().getSummaryForEndpoint().getActiveScans();
+ }
+
@GET
@Path("sservers/detail/{" + GROUP_PARAM_KEY + "}")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java
index b606a738b8..3014993ac0 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java
@@ -75,6 +75,8 @@ import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.process.thrift.ServerProcessService.Client;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -182,7 +184,7 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
}
enum UpdateType {
- COMPACTION, COMPACTION_RGS, FATE, METRIC, TABLE;
+ COMPACTION, COMPACTION_RGS, FATE, METRIC, SCANS, TABLE;
}
interface UpdateTask<T extends Object> extends Runnable,
Comparable<UpdateTask<T>> {
@@ -200,14 +202,11 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
private final ServerContext ctx;
private final ServerId server;
private final SystemInformation summary;
- private final UpdateTasks tasks;
- private MetricFetcher(ServerContext ctx, ServerId server,
SystemInformation summary,
- UpdateTasks tasks) {
+ private MetricFetcher(ServerContext ctx, ServerId server,
SystemInformation summary) {
this.ctx = ctx;
this.server = server;
this.summary = summary;
- this.tasks = tasks;
}
@Override
@@ -267,7 +266,7 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
try {
MetricResponse response =
metricsClient.getMetrics(TraceUtil.traceInfo(), ctx.rpcCreds());
retainedProblemServers.invalidate(server);
- summary.processResponse(server, response, tasks);
+ summary.processResponse(server, response);
} finally {
ThriftUtil.returnClient(metricsClient, ctx);
}
@@ -550,6 +549,80 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
}
+ class ActiveScansFetcher implements UpdateTask<ServerId> {
+
+ private final ServerContext ctx;
+ private final ServerId server;
+ private final SystemInformation summary;
+
+ private ActiveScansFetcher(ServerContext ctx, ServerId server,
SystemInformation summary) {
+ this.ctx = ctx;
+ this.server = server;
+ this.summary = summary;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Objects.hash(getType());
+ result = prime * result + Objects.hash(getResource());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ActiveScansFetcher other = (ActiveScansFetcher) obj;
+ return Objects.equals(getType(), other.getType())
+ && Objects.equals(getResource(), other.getResource());
+ }
+
+ @Override
+ public void run() {
+ final HostAndPort parsedServer =
HostAndPort.fromString(server.toHostPortString());
+ TabletScanClientService.Client client = null;
+ try {
+ client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN,
parsedServer, ctx);
+ List<ActiveScan> activeScans = client.getActiveScans(null,
ctx.rpcCreds());
+ summary.processActiveScans(server, activeScans);
+ } catch (Exception ex) {
+ LOG.error("Failed to get active scans from {}", server, ex);
+ } finally {
+ ThriftUtil.returnClient(client, ctx);
+ }
+ }
+
+ @Override
+ public int compareTo(UpdateTask<ServerId> o) {
+ return 0;
+ }
+
+ @Override
+ public UpdateType getType() {
+ return UpdateType.SCANS;
+ }
+
+ @Override
+ public ServerId getResource() {
+ return server;
+ }
+
+ @Override
+ public String getFailureMessage() {
+ return "Failed to get active scans from server: " + server;
+ }
+
+ }
+
private final String poolName = "MonitorMetricsThreadPool";
private final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools()
.getPoolBuilder(poolName).numCoreThreads(10).withTimeOut(30,
SECONDS).build();
@@ -744,10 +817,6 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
final UpdateTasks futures = new UpdateTasks();
final SystemInformation summary = new SystemInformation(allMetrics,
this.ctx);
- // Fetch set of registered compactors
- Set<ServerId> compactors =
this.ctx.instanceOperations().getServers(Type.COMPACTOR);
- summary.processExternalCompactionInventory(compactors);
-
// Fetch Fate transaction information
FateTransactionFetcher fateFetcher = new FateTransactionFetcher(summary);
Future<?> fff = this.pool.submit(fateFetcher);
@@ -759,10 +828,20 @@ public class InformationFetcher implements
RemovalListener<ServerId,MetricRespon
if (type == Type.MONITOR) {
continue;
}
- for (ServerId server : this.ctx.instanceOperations().getServers(type))
{
- MetricFetcher mf = new MetricFetcher(this.ctx, server, summary,
futures);
+ Set<ServerId> servers = this.ctx.instanceOperations().getServers(type);
+ if (type == Type.COMPACTOR) {
+ summary.processExternalCompactionInventory(servers);
+ }
+ for (ServerId server : servers) {
+ MetricFetcher mf = new MetricFetcher(this.ctx, server, summary);
Future<?> mff = this.pool.submit(mf);
futures.add(new UpdateTaskFuture(mff, mf));
+
+ if (server.getType() == Type.SCAN_SERVER || server.getType() ==
Type.TABLET_SERVER) {
+ ActiveScansFetcher asf = new ActiveScansFetcher(this.ctx, server,
summary);
+ Future<?> asff = this.pool.submit(asf);
+ futures.add(new UpdateTaskFuture(asff, asf));
+ }
}
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
index 49e586e5fb..efd7ab0590 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
@@ -82,11 +82,11 @@ import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.metrics.flatbuffers.FTag;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
+import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.next.InformationFetcher.MetricFetcher;
import
org.apache.accumulo.monitor.next.InformationFetcher.TableInformationFetcher;
import org.apache.accumulo.monitor.next.InformationFetcher.UpdateTaskFuture;
-import org.apache.accumulo.monitor.next.InformationFetcher.UpdateTasks;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.views.ColumnFactory;
import org.apache.accumulo.monitor.next.views.Status;
@@ -469,6 +469,11 @@ public class SystemInformation {
long created, List<String> heldLocks, List<String> waitingLocks,
LockRangeType lockRange) {
}
+ public record Scan(String server, String type, String resourceGroup, String
tableId,
+ long sessionId, String client, String user, String state, String
scanType, long age,
+ long idleTime) {
+ }
+
public record FetchCycleTimes(long durationMs, long finishTime) {
}
@@ -530,6 +535,9 @@ public class SystemInformation {
private final Map<TableId,List<TabletInformation>> tablets = new
ConcurrentHashMap<>();
private final RecoveryInformation recoveries = new RecoveryInformation();
+ // Scan Information
+ private final Set<Scan> activeScans = ConcurrentHashMap.newKeySet();
+
// Deployment Overview
private final Map<ResourceGroupId,Map<ServerId.Type,ProcessSummary>>
deployment =
new ConcurrentHashMap<>();
@@ -590,6 +598,7 @@ public class SystemInformation {
managerGoalState = null;
serverMetricsView.clear();
fateTransactions.clear();
+ activeScans.clear();
resetAlertCounts();
}
@@ -799,8 +808,7 @@ public class SystemInformation {
return TableDataFactory.forColumns(Set.of(), Map.of(), cols);
}
- public void processResponse(final ServerId server, final MetricResponse
response,
- final UpdateTasks callback) {
+ public void processResponse(final ServerId server, final MetricResponse
response) {
problemHosts.remove(server);
metricProblemHosts.remove(server);
retainedProblemHosts.remove(server);
@@ -968,6 +976,14 @@ public class SystemInformation {
allMetrics.invalidate(server);
}
+ public void processActiveScans(ServerId server, List<ActiveScan> scans) {
+ scans.forEach(s -> {
+ activeScans.add(new Scan(server.toHostPortString(),
server.getType().name(),
+ server.getResourceGroup().canonical(), s.getTableId(),
s.getScanId(), s.getClient(),
+ s.getUser(), s.getState().name(), s.getType().name(), s.getAge(),
s.getIdleTime()));
+ });
+ }
+
public void retainProblemServer(ServerId server) {
problemHosts.add(server);
metricProblemHosts.add(server);
@@ -1256,6 +1272,10 @@ public class SystemInformation {
timing = new FetchCycleTimes((fetchCycleFinish - fetchCycleStart),
fetchCycleFinish);
}
+ public Set<Scan> getActiveScans() {
+ return this.activeScans;
+ }
+
public Set<String> getResourceGroups() {
return this.resourceGroups;
}
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/functions.js
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/functions.js
index 416a14cd58..b8c265ed4e 100644
---
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/functions.js
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/functions.js
@@ -43,6 +43,7 @@ const ALERT_CATEGORIES = 'alertCategories';
const ALERTS = 'alerts';
const ALERT_COUNTS = 'alertCounts';
const RECOVERY = 'recovery';
+const SCANS = 'scans';
const LAST_UPDATE = 'lastUpdate';
// Override Length Menu options for dataTables
@@ -488,7 +489,7 @@ function getTServer(server) {
* REST GET call for the scans, stores it on a sessionStorage variable
*/
function getScans() {
- return getJSONForTable(contextPath + 'rest/scans', 'scans');
+ return getJSONForTable(REST_V2_PREFIX + '/scans', SCANS);
}
/**
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/scans.js
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/scans.js
index d7f69b0064..002afcbdca 100644
---
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/scans.js
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/scans.js
@@ -24,15 +24,23 @@ var scansList;
* Creates scans initial table
*/
$(function () {
+
+ sessionStorage[SCANS] = JSON.stringify([]);
+
// Create a table for scans list
scansList = $('#scansList').DataTable({
- "ajax": {
- "url": contextPath + 'rest/scans',
- "dataSrc": "scans"
+ "ajax": function (data, callback) {
+ callback({
+ data: getStoredArray(SCANS)
+ });
},
"stateSave": true,
- "dom": 't<"align-left"l>p',
+ "colReorder": true,
"columnDefs": [{
+ targets: '_all',
+ defaultContent: '—'
+ },
+ {
"targets": "duration",
"render": function (data, type, row) {
if (type === 'display') data = timeDuration(data);
@@ -49,23 +57,37 @@ $(function () {
],
"columns": [{
"data": "server",
- "type": "html",
- "render": function (data, type, row, meta) {
- if (type === 'display') {
- data = '<a class="link-body-emphasis" href="tservers?s=' +
row.server + '">' + row.server + '</a>';
- }
- return data;
- }
},
{
- "data": "scanCount"
+ "data": "type"
},
{
- "data": "oldestScan"
+ "data": "resourceGroup"
},
{
- "data": "fetched"
+ "data": "tableId"
},
+ {
+ "data": "sessionId"
+ },
+ {
+ "data": "client"
+ },
+ {
+ "data": "user"
+ },
+ {
+ "data": "state"
+ },
+ {
+ "data": "scanType"
+ },
+ {
+ "data": "age"
+ },
+ {
+ "data": "idleTime"
+ }
]
});
});
@@ -75,7 +97,9 @@ $(function () {
* Used to redraw the page
*/
function refresh() {
- refreshScansTable();
+ getScans().then(function () {
+ refreshScansTable();
+ });
}
/**
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/scans.ftl
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/scans.ftl
index 27e9b94f75..0609885d9c 100644
---
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/scans.ftl
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/scans.ftl
@@ -20,19 +20,22 @@
-->
<div class="row">
<div class="col-xs-12">
- <h3>${title}</h3>
- </div>
- </div>
- <div class="row">
- <div class="col-xs-12">
+ <span class="table-caption">Scans</span>
+ <br />
<table id="scansList" class="table caption-top table-bordered
table-striped table-condensed">
- <caption><span class="table-caption">Scans</span><br /></caption>
<thead>
<tr>
- <th class="firstcell">Server </th>
- <th title="Number of scans presently running"># </th>
- <th class="duration" title="The age of the oldest scan on this
server.">Oldest Age </th>
- <th class="date" title="Last time data was fetched. Server
fetches on page refresh, at most every minute.">Fetched </th>
+ <th class="firstcell">Server</th>
+ <th>Server Type</th>
+ <th>Resource Group</th>
+ <th>Table ID</th>
+ <th>Session ID</th>
+ <th>Client</th>
+ <th>User</th>
+ <th>Scan State</th>
+ <th>Scan Type</th>
+ <th>Age</th>
+ <th>Idle Time</th>
</tr>
</thead>
<tbody></tbody>