This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 02de510 Improvements to monitor external compactions page (#2385) 02de510 is described below commit 02de5104c4c115d913e5547e1dd6f0b6d26390d1 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Dec 15 17:14:50 2021 -0500 Improvements to monitor external compactions page (#2385) * Add endpoint to monitor for external compactions details and modify page to make ajax calls when getting the details of a running compaction * Cache up to 50 of the details JSON objects in session storage * Use ConcurrentHashMap for storing running compactions in Monitor --- .../java/org/apache/accumulo/monitor/Monitor.java | 14 ++-- .../rest/compactions/external/CompactorInfo.java | 8 +- .../rest/compactions/external/ECResource.java | 28 ++++++- .../external/RunningCompactorDetails.java | 51 ++++++++++++ .../compactions/external/RunningCompactorInfo.java | 28 +++---- .../org/apache/accumulo/monitor/resources/js/ec.js | 91 ++++++++++++++++++---- .../compaction/ExternalCompactionProgressIT.java | 2 +- 7 files changed, 178 insertions(+), 44 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index dcd7b37..25e6fbd 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -37,6 +37,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -587,7 +588,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>(); private final RecentLogs recentLogs = new RecentLogs(); private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>(); + private final Map<String,TExternalCompaction> ecRunningMap = new ConcurrentHashMap<>(); private long scansFetchedNanos = 0L; private long compactsFetchedNanos = 0L; private long ecInfoFetchedNanos = 0L; @@ -639,7 +640,7 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { * user fetches since RPC calls are going to the coordinator. This allows for fine grain updates * of external compaction progress. */ - public synchronized Map<String,TExternalCompaction> getRunningInfo() { + public synchronized Map<String,TExternalCompaction> fetchRunningInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException(coordinatorMissingMsg); } @@ -655,15 +656,16 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { ecRunningMap.clear(); if (running.getCompactions() != null) { - running.getCompactions().forEach((queue, ec) -> { - log.trace("Found Compactions running on queue {} -> {}", queue, ec); - ecRunningMap.put(queue, ec); - }); + ecRunningMap.putAll(running.getCompactions()); } return ecRunningMap; } + public Map<String,TExternalCompaction> getEcRunningMap() { + return ecRunningMap; + } + private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) { if (coordinatorClient == null) { try { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java index 1363ece..a7dc56a 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java @@ -21,9 +21,11 @@ package org.apache.accumulo.monitor.rest.compactions.external; public class CompactorInfo { // Variable names become JSON keys - public long lastContact; - public String server; - public String queueName; + public long lastContact = 0L; + public String server = ""; + public String queueName = ""; + + public CompactorInfo() {} public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) { lastContact = System.currentTimeMillis() - fetchedTimeMillis; diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 6c1a050..d7035d6 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -19,9 +19,11 @@ package org.apache.accumulo.monitor.rest.compactions.external; import jakarta.inject.Inject; +import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import org.apache.accumulo.monitor.Monitor; @@ -36,7 +38,7 @@ import org.slf4j.LoggerFactory; @Path("/ec") @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public class ECResource { - private static Logger log = LoggerFactory.getLogger(ECResource.class); + private static final Logger log = LoggerFactory.getLogger(ECResource.class); @Inject private Monitor monitor; @@ -57,6 +59,28 @@ public class ECResource { @Path("running") @GET public RunningCompactions getRunning() { - return new RunningCompactions(monitor.getRunningInfo()); + return new RunningCompactions(monitor.fetchRunningInfo()); + } + + @Path("details") + @GET + public RunningCompactorDetails getDetails(@QueryParam("ecid") @NotNull String ecid) { + // make parameter more user-friendly by ensuring the ecid prefix is present + ecid = ecid.replace("ecid:", "ECID:"); + if (!ecid.startsWith("ECID:")) + ecid = "ECID:" + ecid; + + var ecMap = monitor.getEcRunningMap(); + var externalCompaction = ecMap.get(ecid); + if (externalCompaction == null) { + // map could be old so fetch all running compactions and try again + ecMap = monitor.fetchRunningInfo(); + externalCompaction = ecMap.get(ecid); + if (externalCompaction == null) { + log.warn("Failed to find details for ECID: {}", ecid); + return new RunningCompactorDetails(); + } + } + return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction); } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java new file mode 100644 index 0000000..c6c3af2 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.monitor.rest.compactions.external; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; + +public class RunningCompactorDetails extends RunningCompactorInfo { + // Variable names become JSON keys + public List<CompactionInputFile> inputFiles = new ArrayList<>(); + public String outputFile; + + public RunningCompactorDetails() { + super(); + } + + public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) { + super(fetchedTime, ecid, ec); + var job = ec.getJob(); + inputFiles = convertInputFiles(job.files); + outputFile = job.outputFile; + } + + private List<CompactionInputFile> convertInputFiles(List<InputFile> files) { + List<CompactionInputFile> list = new ArrayList<>(); + files.forEach(f -> list + .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp))); + // sorted largest to smallest + list.sort((o1, o2) -> Long.compare(o2.size, o1.size)); + return list; + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java index 711c135..7ef070f 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java @@ -18,57 +18,47 @@ */ package org.apache.accumulo.monitor.rest.compactions.external; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import jakarta.validation.constraints.NotNull; + import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RunningCompactorInfo extends CompactorInfo { - private static Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class); + private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class); // Variable names become JSON keys public String ecid; public String kind; public String tableId; - public List<CompactionInputFile> inputFiles; public int numFiles; - public String outputFile; public float progress = 0f; public long duration; public String status; public long lastUpdate; - public RunningCompactorInfo(long fetchedTime, String ecid, TExternalCompaction ec) { + public RunningCompactorInfo() { + super(); + } + + public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) { super(fetchedTime, ec.getQueueName(), ec.getCompactor()); this.ecid = ecid; var updates = ec.getUpdates(); var job = ec.getJob(); kind = job.getKind().name(); tableId = KeyExtent.fromThrift(job.extent).tableId().canonical(); - inputFiles = convertInputFiles(job.files); - numFiles = inputFiles.size(); - outputFile = job.outputFile; + numFiles = job.files.size(); updateProgress(updates); log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress); } - private List<CompactionInputFile> convertInputFiles(List<InputFile> files) { - List<CompactionInputFile> list = new ArrayList<>(); - files.forEach(f -> list - .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries, f.timestamp))); - // sorted largest to smallest - list.sort((o1, o2) -> Long.compare(o2.size, o1.size)); - return list; - } - /** * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update. * Also update the status. diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js index 201edbd..dac238d 100644 --- a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js +++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js @@ -27,6 +27,9 @@ * Creates active compactions table */ $(document).ready(function() { + if (sessionStorage.ecDetailsJSON === undefined) { + sessionStorage.ecDetailsJSON = JSON.stringify([]); + } compactorsTable = $('#compactorsTable').DataTable({ "ajax": { "url": '/rest/ec/compactors', @@ -124,23 +127,26 @@ // Remove from the 'open' array detailRows.splice( idx, 1 ); - } - else { + } else { var rci = row.data(); + var ecid = rci.ecid; + var idSuffix = ecid.substring(ecid.length-5, ecid.length); tr.addClass( 'details' ); // put all the information into html for a single row - var htmlRow = "<table class='table table-bordered table-striped table-condensed'>" + var htmlRow = "<table class='table table-bordered table-striped table-condensed' id='table"+idSuffix+"'>" htmlRow += "<thead><tr><th>#</th><th>Input Files</th><th>Size</th><th>Entries</th></tr></thead>"; - $.each( rci.inputFiles, function( key, value ) { - htmlRow += "<tr><td>" + key + "</td>"; - htmlRow += "<td>" + value.metadataFileEntry + "</td>"; - htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>"; - htmlRow += "<td>" + bigNumberForQuantity(value.entries) + "</td></tr>"; - }); - htmlRow += "</table>"; - htmlRow += "Output File: " + rci.outputFile + "<br>"; - htmlRow += rci.ecid; + htmlRow += "<tbody></tbody></table>"; + htmlRow += "Output File: <span id='outputFile" + idSuffix + "'></span><br>"; + htmlRow += ecid; row.child(htmlRow).show(); + // show the row then populate the table + var ecDetails = getDetailsFromStorage(idSuffix); + if (ecDetails.length === 0) { + getRunningDetails(ecid, idSuffix); + } else { + console.log("Got cached details for " + idSuffix); + populateDetails(ecDetails, idSuffix); + } // Add to the 'open' array if ( idx === -1 ) { @@ -163,7 +169,11 @@ */ function refreshECTables() { getCompactionCoordinator(); - var ecInfo = JSON.parse(sessionStorage.ecInfo); + var ecInfo = sessionStorage.ecInfo === undefined ? [] : + JSON.parse(sessionStorage.ecInfo); + if (ecInfo.length === 0) { + return; + } var ccAddress = ecInfo.server; var numCompactors = ecInfo.numCompactors; var lastContactTime = timeDuration(ecInfo.lastContact); @@ -188,6 +198,61 @@ }); } + function getRunningDetails(ecid, idSuffix) { + var ajaxUrl = '/rest/ec/details?ecid=' + ecid; + console.log("Ajax call to " + ajaxUrl); + $.getJSON(ajaxUrl, function(data) { + populateDetails(data, idSuffix); + var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON); + if (detailsJSON === undefined) { + detailsJSON = []; + } else if (detailsJSON.length >= 50) { + // drop the oldest 25 from the sessionStorage to limit size of the cache + var newDetailsJSON = []; + $.each( detailsJSON, function( num, val ) { + if (num > 24) { + newDetailsJSON.push(val); + } + }); + detailsJSON = newDetailsJSON; + } + detailsJSON.push({ key : idSuffix, value : data }); + sessionStorage.ecDetailsJSON = JSON.stringify(detailsJSON); + }); + } + + function getDetailsFromStorage(idSuffix) { + var details = []; + var detailsJSON = JSON.parse(sessionStorage.ecDetailsJSON); + if (detailsJSON.length === 0) { + return details; + } else { + // details are stored as key value pairs in the JSON val + $.each( detailsJSON, function( num, val ) { + if (val.key === idSuffix) { + details = val.value; + } + }); + return details; + } + } + + function populateDetails(data, idSuffix) { + var tableId = 'table' + idSuffix; + clearTableBody(tableId); + $.each( data.inputFiles, function( key, value ) { + var items = []; + items.push(createCenterCell(key, key)); + items.push(createCenterCell(value.metadataFileEntry, value.metadataFileEntry)); + items.push(createCenterCell(value.size, bigNumberForSize(value.size))); + items.push(createCenterCell(value.entries, bigNumberForQuantity(value.entries))); + $('<tr/>', { + html: items.join('') + }).appendTo('#' + tableId + ' tbody'); + }); + $('#outputFile' + idSuffix).text(data.outputFile); + } + function refreshCompactors() { console.log("Refresh compactors table."); // user paging is not reset on reload diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 305344c..20f974c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -131,7 +131,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec); RunningCompactorInfo previousRci = runningMap.put(ecid, rci); if (previousRci == null) { - log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles); + log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles); } else { if (rci.progress <= previousRci.progress) { log.warn("{} did not progress. It went from {} to {}", ecid, previousRci.progress,