This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 2bce893 Create page in Monitor for external compactions (#2358)
2bce893 is described below
commit 2bce8939145b49a765f998f5dbe2b6242a64e3e6
Author: Mike Miller <[email protected]>
AuthorDate: Fri Nov 19 10:42:48 2021 -0500
Create page in Monitor for external compactions (#2358)
* Create multiple new classes for displaying 3 different tables of data
in the new external compaction page in the monitor
* Create 3 new ajax endpoints in ECResource
* Modify Compactor and ExternalCompactionUtil to return Optional for the
compaction coordinator instead of null
* Add check for compaction coordinator to Monitor.fetchData()
* New ExternalCompactionProgressIT for testing progress
* Use new bootstrap panel and badges for coordinator info
* Closes #2290
Co-authored-by: Dom G. <[email protected]>
---
.../util/compaction/ExternalCompactionUtil.java | 10 +-
.../org/apache/accumulo/compactor/Compactor.java | 8 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 89 +++++++++
.../compactions/external/CompactionInputFile.java | 38 ++++
.../rest/compactions/external/CompactorInfo.java | 33 ++++
.../rest/compactions/external/Compactors.java | 40 ++++
.../rest/compactions/external/CoordinatorInfo.java | 41 +++++
.../rest/compactions/external/ECResource.java | 62 +++++++
.../external/ExternalCompactionInfo.java | 60 ++++++
.../compactions/external/RunningCompactions.java | 39 ++++
.../compactions/external/RunningCompactorInfo.java | 133 ++++++++++++++
.../org/apache/accumulo/monitor/view/WebViews.java | 25 +++
.../org/apache/accumulo/monitor/resources/js/ec.js | 201 +++++++++++++++++++++
.../org/apache/accumulo/monitor/templates/ec.ftl | 80 ++++++++
.../apache/accumulo/monitor/templates/navbar.ftl | 1 +
.../compaction/ExternalCompactionProgressIT.java | 166 +++++++++++++++++
.../compaction/ExternalCompactionTestUtils.java | 42 +++--
17 files changed, 1044 insertions(+), 24 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index a5538b6..24dff44 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -93,19 +94,18 @@ public class ExternalCompactionUtil {
/**
*
- * @return null if Coordinator node not found, else HostAndPort
+ * @return Optional HostAndPort of Coordinator node if found
*/
- public static HostAndPort findCompactionCoordinator(ClientContext context) {
+ public static Optional<HostAndPort> findCompactionCoordinator(ClientContext
context) {
final String lockPath = context.getZooKeeperRoot() +
Constants.ZCOORDINATOR_LOCK;
try {
var zk = ZooSession.getAnonymousSession(context.getZooKeepers(),
context.getZooKeepersSessionTimeOut());
byte[] address = ServiceLock.getLockData(zk, ServiceLock.path(lockPath));
if (null == address) {
- return null;
+ return Optional.empty();
}
- String coordinatorAddress = new String(address);
- return HostAndPort.fromString(coordinatorAddress);
+ return Optional.of(HostAndPort.fromString(new String(address)));
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 898177e..6565280 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -474,12 +474,12 @@ public class Compactor extends AbstractServer implements
CompactorService.Iface
* when unable to get client
*/
protected CompactionCoordinatorService.Client getCoordinatorClient() throws
TTransportException {
- HostAndPort coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(getContext());
- if (null == coordinatorHost) {
+ var coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(getContext());
+ if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
}
- LOG.trace("CompactionCoordinator address is: {}", coordinatorHost);
- return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost,
getContext());
+ LOG.trace("CompactionCoordinator address is: {}", coordinatorHost.get());
+ return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY,
coordinatorHost.get(), getContext());
}
/**
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 3fca610..dcd7b37 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -34,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -44,6 +45,9 @@ import jakarta.inject.Singleton;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ManagerClient;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
@@ -62,12 +66,14 @@ import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import
org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo;
import org.apache.accumulo.monitor.util.logging.RecentLogs;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.HighlyAvailableService;
@@ -163,6 +169,12 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
private Map<TableId,Map<ProblemType,Integer>> problemSummary =
Collections.emptyMap();
private Exception problemException;
private GCStatus gcStatus;
+ private Optional<HostAndPort> coordinatorHost = Optional.empty();
+ private CompactionCoordinatorService.Client coordinatorClient;
+ private final String coordinatorMissingMsg =
+ "Error getting the compaction coordinator. Check that it is running. It
is not "
+ + "started automatically with other cluster processes so must be
started by running "
+ + "'accumulo compaction-coordinator'.";
private EmbeddedWebServer server;
@@ -365,7 +377,17 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
this.problemException = e;
}
+ if (coordinatorHost.isEmpty()) {
+ coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
+ } else {
+ log.info("External Compaction Coordinator found at {}",
coordinatorHost.get());
+ }
+
} finally {
+ if (coordinatorClient != null) {
+ ThriftUtil.returnClient(coordinatorClient, context);
+ coordinatorClient = null;
+ }
lastRecalc.set(currentTime);
// stop fetching; log an error if this thread wasn't already fetching
if (!fetching.compareAndSet(true, false)) {
@@ -564,8 +586,11 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
private final Map<HostAndPort,ScanStats> allScans = new HashMap<>();
private final Map<HostAndPort,CompactionStats> allCompactions = new
HashMap<>();
private final RecentLogs recentLogs = new RecentLogs();
+ private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+ private final Map<String,TExternalCompaction> ecRunningMap = new HashMap<>();
private long scansFetchedNanos = 0L;
private long compactsFetchedNanos = 0L;
+ private long ecInfoFetchedNanos = 0L;
private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
@@ -591,6 +616,67 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
return Map.copyOf(allCompactions);
}
+ public synchronized ExternalCompactionInfo getCompactorsInfo() {
+ if (coordinatorHost.isEmpty()) {
+ throw new IllegalStateException("Tried fetching from compaction
coordinator that's missing");
+ }
+ if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
+ log.info("User initiated fetch of External Compaction info");
+ Map<String,List<HostAndPort>> compactors =
+ ExternalCompactionUtil.getCompactorAddrs(getContext());
+ log.debug("Found compactors: " + compactors);
+ ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
+ ecInfo.setCompactors(compactors);
+ ecInfo.setCoordinatorHost(coordinatorHost);
+
+ ecInfoFetchedNanos = System.nanoTime();
+ }
+ return ecInfo;
+ }
+
+ /**
+ * Fetch running compactions from Compaction Coordinator. Chose not to
restrict the frequency of
+ * user fetches since RPC calls are going to the coordinator. This allows
for fine grain updates
+ * of external compaction progress.
+ */
+ public synchronized Map<String,TExternalCompaction> getRunningInfo() {
+ if (coordinatorHost.isEmpty()) {
+ throw new IllegalStateException(coordinatorMissingMsg);
+ }
+ var ccHost = coordinatorHost.get();
+ log.info("User initiated fetch of running External Compactions from " +
ccHost);
+ var client = getCoordinator(ccHost);
+ TExternalCompactionList running;
+ try {
+ running = client.getRunningCompactions(TraceUtil.traceInfo(),
getContext().rpcCreds());
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to get running compactions from
" + ccHost, e);
+ }
+
+ ecRunningMap.clear();
+ if (running.getCompactions() != null) {
+ running.getCompactions().forEach((queue, ec) -> {
+ log.trace("Found Compactions running on queue {} -> {}", queue, ec);
+ ecRunningMap.put(queue, ec);
+ });
+ }
+
+ return ecRunningMap;
+ }
+
+ private CompactionCoordinatorService.Client getCoordinator(HostAndPort
address) {
+ if (coordinatorClient == null) {
+ try {
+ coordinatorClient = ThriftUtil.getClient(new
CompactionCoordinatorService.Client.Factory(),
+ address, getContext());
+ } catch (Exception e) {
+ log.error("Unable to get Compaction coordinator at {}", address);
+ throw new IllegalStateException(coordinatorMissingMsg, e);
+ }
+ }
+ return coordinatorClient;
+ }
+
private void fetchScans() {
ServerContext context = getContext();
for (String server : context.instanceOperations().getTabletServers()) {
@@ -869,4 +955,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
return recentLogs;
}
+ public Optional<HostAndPort> getCoordinatorHost() {
+ return coordinatorHost;
+ }
}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
new file mode 100644
index 0000000..4ae5e23
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactionInputFile.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+/**
+ * Class for displaying input files
+ */
+public class CompactionInputFile {
+
+ // Variable names become JSON keys
+ public String metadataFileEntry;
+ public long size;
+ public long entries;
+ public long timestamp;
+
+ public CompactionInputFile(String metadataFileEntry, long size, long
entries, long timestamp) {
+ this.metadataFileEntry = metadataFileEntry;
+ this.size = size;
+ this.entries = entries;
+ this.timestamp = timestamp;
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
new file mode 100644
index 0000000..1363ece
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+public class CompactorInfo {
+
+ // Variable names become JSON keys
+ public long lastContact;
+ public String server;
+ public String queueName;
+
+ public CompactorInfo(long fetchedTimeMillis, String queue, String
hostAndPort) {
+ lastContact = System.currentTimeMillis() - fetchedTimeMillis;
+ queueName = queue;
+ server = hostAndPort;
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
new file mode 100644
index 0000000..089368e
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * JSON Object for displaying External Compactions. Variable names become JSON
Keys.
+ */
+public class Compactors {
+
+ // Variable names become JSON keys
+ public final int numCompactors;
+ public final List<CompactorInfo> compactors = new ArrayList<>();
+
+ public Compactors(ExternalCompactionInfo ecInfo) {
+ ecInfo.getCompactors().forEach((q, c) -> {
+ var fetchedTime = ecInfo.getFetchedTimeMillis();
+ c.forEach(hp -> compactors.add(new CompactorInfo(fetchedTime, q,
hp.toString())));
+ });
+ numCompactors = compactors.size();
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
new file mode 100644
index 0000000..45d10d8
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.accumulo.core.util.HostAndPort;
+
+public class CoordinatorInfo {
+
+ // Variable names become JSON keys
+ public long lastContact;
+ public String server;
+ public int numQueues;
+ public int numCompactors;
+
+ public CoordinatorInfo(Optional<HostAndPort> serverOpt,
ExternalCompactionInfo ecInfo) {
+ server = serverOpt.map(HostAndPort::toString).orElse("none");
+ var queueToCompactors = ecInfo.getCompactors();
+ numQueues = queueToCompactors.size();
+ numCompactors =
queueToCompactors.values().stream().mapToInt(List::size).sum();
+ lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis();
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
new file mode 100644
index 0000000..6c1a050
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+
+import org.apache.accumulo.monitor.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generate a new External compactions resource
+ *
+ * @since 2.1.0
+ */
+@Path("/ec")
+@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+public class ECResource {
+ private static Logger log = LoggerFactory.getLogger(ECResource.class);
+
+ @Inject
+ private Monitor monitor;
+
+ @GET
+ public CoordinatorInfo getCoordinator() {
+ var cc = monitor.getCompactorsInfo();
+ log.info("Got coordinator from monitor = {}", cc);
+ return new CoordinatorInfo(cc.getCoordinatorHost(), cc);
+ }
+
+ @Path("compactors")
+ @GET
+ public Compactors getCompactors() {
+ return new Compactors(monitor.getCompactorsInfo());
+ }
+
+ @Path("running")
+ @GET
+ public RunningCompactions getRunning() {
+ return new RunningCompactions(monitor.getRunningInfo());
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
new file mode 100644
index 0000000..3706f13
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.util.HostAndPort;
+
+/**
+ * Bag of everything going on with external compactions.
+ */
+public class ExternalCompactionInfo {
+
+ private Optional<HostAndPort> coordinatorHost;
+ private Map<String,List<HostAndPort>> compactors = new HashMap<>();
+ private long fetchedTimeMillis;
+
+ public void setCoordinatorHost(Optional<HostAndPort> coordinatorHost) {
+ this.coordinatorHost = coordinatorHost;
+ }
+
+ public Optional<HostAndPort> getCoordinatorHost() {
+ return coordinatorHost;
+ }
+
+ public Map<String,List<HostAndPort>> getCompactors() {
+ return compactors;
+ }
+
+ public void setCompactors(Map<String,List<HostAndPort>> compactors) {
+ this.compactors = compactors;
+ }
+
+ public long getFetchedTimeMillis() {
+ return fetchedTimeMillis;
+ }
+
+ public void setFetchedTimeMillis(long fetchedTimeMillis) {
+ this.fetchedTimeMillis = fetchedTimeMillis;
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
new file mode 100644
index 0000000..294b91c
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+
+public class RunningCompactions {
+
+ public final List<RunningCompactorInfo> running = new ArrayList<>();
+
+ public RunningCompactions(Map<String,TExternalCompaction> rMap) {
+ if (rMap != null) {
+ var fetchedTime = System.currentTimeMillis();
+ for (var entry : rMap.entrySet()) {
+ running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(),
entry.getValue()));
+ }
+ }
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
new file mode 100644
index 0000000..711c135
--- /dev/null
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.monitor.rest.compactions.external;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.tabletserver.thrift.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunningCompactorInfo extends CompactorInfo {
+ private static Logger log =
LoggerFactory.getLogger(RunningCompactorInfo.class);
+
+ // Variable names become JSON keys
+ public String ecid;
+ public String kind;
+ public String tableId;
+ public List<CompactionInputFile> inputFiles;
+ public int numFiles;
+ public String outputFile;
+ public float progress = 0f;
+ public long duration;
+ public String status;
+ public long lastUpdate;
+
+ public RunningCompactorInfo(long fetchedTime, String ecid,
TExternalCompaction ec) {
+ super(fetchedTime, ec.getQueueName(), ec.getCompactor());
+ this.ecid = ecid;
+ var updates = ec.getUpdates();
+ var job = ec.getJob();
+ kind = job.getKind().name();
+ tableId = KeyExtent.fromThrift(job.extent).tableId().canonical();
+ inputFiles = convertInputFiles(job.files);
+ numFiles = inputFiles.size();
+ outputFile = job.outputFile;
+ updateProgress(updates);
+ log.debug("Parsed running compaction {} for {} with progress = {}%",
status, ecid, progress);
+ }
+
+ private List<CompactionInputFile> convertInputFiles(List<InputFile> files) {
+ List<CompactionInputFile> list = new ArrayList<>();
+ files.forEach(f -> list
+ .add(new CompactionInputFile(f.metadataFileEntry, f.size, f.entries,
f.timestamp)));
+ // sorted largest to smallest
+ list.sort((o1, o2) -> Long.compare(o2.size, o1.size));
+ return list;
+ }
+
+ /**
+ * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted
of the last update.
+ * Also update the status.
+ */
+ private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) {
+ if (updates.isEmpty()) {
+ progress = 0f;
+ status = "na";
+ }
+ long nowMillis = System.currentTimeMillis();
+ long startedMillis = nowMillis;
+ long updateMillis;
+ TCompactionStatusUpdate last;
+
+ // sort updates by key, which is a timestamp
+ TreeMap<Long,TCompactionStatusUpdate> sorted = new TreeMap<>(updates);
+ var firstEntry = sorted.firstEntry();
+ var lastEntry = sorted.lastEntry();
+ if (firstEntry != null) {
+ startedMillis = firstEntry.getKey();
+ }
+ duration = nowMillis - startedMillis;
+ long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(duration);
+ if (durationMinutes > 15) {
+ log.warn("Compaction {} has been running for {} minutes", ecid,
durationMinutes);
+ }
+
+ // last entry is all we care about so bail if null
+ if (lastEntry != null) {
+ last = lastEntry.getValue();
+ updateMillis = lastEntry.getKey();
+ } else {
+ log.debug("No updates found for {}", ecid);
+ return;
+ }
+
+ long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis -
updateMillis);
+ log.debug("Time since Last update {} - {} = {} seconds", nowMillis,
updateMillis,
+ sinceLastUpdateSeconds);
+ if (sinceLastUpdateSeconds > 30) {
+ log.debug("Compaction hasn't progressed from {} in {} seconds.",
progress,
+ sinceLastUpdateSeconds);
+ }
+
+ float percent;
+ var total = last.getEntriesToBeCompacted();
+ if (total <= 0) {
+ percent = 0f;
+ } else {
+ percent = (last.getEntriesRead() / (float) total) * 100;
+ }
+
+ lastUpdate = nowMillis - updateMillis;
+ status = last.state.name();
+ progress = percent;
+ }
+
+ @Override
+ public String toString() {
+ return ecid + ": " + status + " progress: " + progress;
+ }
+}
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
index dbbb6dc..c545da2 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/view/WebViews.java
@@ -210,6 +210,31 @@ public class WebViews {
}
/**
+ * Returns the compactions template
+ *
+ * @return Scans model
+ */
+ @GET
+ @Path("ec")
+ @Template(name = "/default.ftl")
+ public Map<String,Object> getExternalCompactions() {
+ var ccHost = monitor.getCoordinatorHost();
+
+ Map<String,Object> model = getModel();
+ model.put("title", "External Compactions");
+ model.put("template", "ec.ftl");
+
+ if (ccHost.isPresent()) {
+ model.put("coordinatorRunning", true);
+ model.put("js", "ec.js");
+ } else {
+ model.put("coordinatorRunning", false);
+ }
+
+ return model;
+ }
+
+ /**
* Returns the bulk import template
*
* @return Bulk Import model
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
new file mode 100644
index 0000000..201edbd
--- /dev/null
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/ec.js
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ var coordinatorTable;
+ var compactorsTable;
+ var compactorsTableData;
+ var runningTable;
+ var runningTableData;
+
+ /**
+ * Creates active compactions table
+ */
+ $(document).ready(function() {
+ compactorsTable = $('#compactorsTable').DataTable({
+ "ajax": {
+ "url": '/rest/ec/compactors',
+ "dataSrc": "compactors"
+ },
+ "stateSave": true,
+ "dom": 't<"align-left"l>p',
+ "columnDefs": [
+ { "targets": "duration",
+ "render": function ( data, type, row ) {
+ if(type === 'display') data = timeDuration(data);
+ return data;
+ }
+ },
+ { "targets": "date",
+ "render": function ( data, type, row ) {
+ if(type === 'display') data = dateFormat(data);
+ return data;
+ }
+ }
+ ],
+ "columns": [
+ { "data": "server" },
+ { "data": "queueName"},
+ { "data": "lastContact"}
+ ]
+ });
+
+ // Create a table for compactors
+ runningTable = $('#runningTable').DataTable({
+ "ajax": {
+ "url": '/rest/ec/running',
+ "dataSrc": "running"
+ },
+ "stateSave": true,
+ "dom": 't<"align-left"l>p',
+ "columnDefs": [
+ { "targets": "duration",
+ "render": function ( data, type, row ) {
+ if(type === 'display') data = timeDuration(data);
+ return data;
+ }
+ },
+ { "targets": "date",
+ "render": function ( data, type, row ) {
+ if(type === 'display') data = dateFormat(data);
+ return data;
+ }
+ }
+ ],
+ "columns": [
+ { "data": "server" },
+ { "data": "kind" },
+ { "data": "status" },
+ { "data": "queueName" },
+ { "data": "tableId" },
+ { "data": "numFiles" },
+ { "data": "progress",
+ "type": "html",
+ "render": function ( data, type, row, meta ) {
+ if(type === 'display') {
+ if (row.progress < 0) {
+ data = '--';
+ } else {
+ var p = Math.round(Number(row.progress));
+ console.log("Compaction progress = %" + p);
+ data = '<div class="progress"><div class="progress-bar"
role="progressbar" style="min-width: 2em; width:' +
+ p + '%;">' + p + '%</div></div>';
+ }
+ }
+ return data;
+ }
+ },
+ { "data": "lastUpdate"},
+ { "data": "duration"},
+ { // more column settings
+ "class": "details-control",
+ "orderable": false,
+ "data": null,
+ "defaultContent": ""
+ }
+ ]
+ });
+
+ // Array to track the ids of the details displayed rows
+ var detailRows = [];
+ $("#runningTable tbody").on( 'click', 'tr td.details-control', function
() {
+ var tr = $(this).closest('tr');
+ var row = runningTable.row( tr );
+ var idx = $.inArray( tr.attr('id'), detailRows );
+
+ if ( row.child.isShown() ) {
+ tr.removeClass( 'details' );
+ row.child.hide();
+
+ // Remove from the 'open' array
+ detailRows.splice( idx, 1 );
+ }
+ else {
+ var rci = row.data();
+ tr.addClass( 'details' );
+ // put all the information into html for a single row
+ var htmlRow = "<table class='table table-bordered table-striped
table-condensed'>"
+ htmlRow += "<thead><tr><th>#</th><th>Input
Files</th><th>Size</th><th>Entries</th></tr></thead>";
+ $.each( rci.inputFiles, function( key, value ) {
+ htmlRow += "<tr><td>" + key + "</td>";
+ htmlRow += "<td>" + value.metadataFileEntry + "</td>";
+ htmlRow += "<td>" + bigNumberForSize(value.size) + "</td>";
+ htmlRow += "<td>" + bigNumberForQuantity(value.entries) +
"</td></tr>";
+ });
+ htmlRow += "</table>";
+ htmlRow += "Output File: " + rci.outputFile + "<br>";
+ htmlRow += rci.ecid;
+ row.child(htmlRow).show();
+
+ // Add to the 'open' array
+ if ( idx === -1 ) {
+ detailRows.push( tr.attr('id') );
+ }
+ }
+ });
+ refreshECTables();
+ });
+
+ /**
+ * Used to redraw the page
+ */
+ function refresh() {
+ refreshECTables();
+ }
+
+ /**
+ * Generates the compactions table
+ */
+ function refreshECTables() {
+ getCompactionCoordinator();
+ var ecInfo = JSON.parse(sessionStorage.ecInfo);
+ var ccAddress = ecInfo.server;
+ var numCompactors = ecInfo.numCompactors;
+ var lastContactTime = timeDuration(ecInfo.lastContact);
+ console.log("compaction coordinator = " + ccAddress);
+ console.log("numCompactors = " + numCompactors);
+ $('#ccHostname').text(ccAddress);
+ $('#ccNumQueues').text(ecInfo.numQueues);
+ $('#ccNumCompactors').text(numCompactors);
+ $('#ccLastContact').html(lastContactTime);
+
+ // user paging is not reset on reload
+ if(compactorsTable) compactorsTable.ajax.reload(null, false );
+ if(runningTable) runningTable.ajax.reload(null, false );
+ }
+
+ /**
+ * Get address of the compaction coordinator info
+ */
+ function getCompactionCoordinator() {
+ $.getJSON('/rest/ec', function(data) {
+ sessionStorage.ecInfo = JSON.stringify(data);
+ });
+ }
+
+ function refreshCompactors() {
+ console.log("Refresh compactors table.");
+ // user paging is not reset on reload
+ if(compactorsTable) compactorsTable.ajax.reload(null, false );
+ }
+
+ function refreshRunning() {
+ console.log("Refresh running compactions table.");
+ // user paging is not reset on reload
+ if(runningTable) runningTable.ajax.reload(null, false );
+ }
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
new file mode 100644
index 0000000..ec95a68
--- /dev/null
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/ec.ftl
@@ -0,0 +1,80 @@
+<#--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+ <div class="row">
+ <div class="col-xs-12">
+ <h3>${title}</h3>
+ </div>
+ </div>
+ <#if coordinatorRunning == true>
+ <div id="ecDiv">
+ <div class="row">
+ <div class="col-xs-12">
+ <div class="panel panel-primary">
+ <div
class="panel-heading">Compaction Coordinator running on: <span
id="ccHostname" title="The hostname of the compaction coordinator
server"></span></div>
+ <div class="panel-body">
+ Queues <span id="ccNumQueues" class="badge" title="Number
of queues configured">0</span></span>
+ Compactors <span id="ccNumCompactors" class="badge"
title="Number of compactors running">0</span>
+ Last Contact <span id="ccLastContact" class="badge"
title="Last time data was fetched. Server fetches on refresh, at most every
minute."></span>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="row">
+ <div class="col-xs-12">
+ <table id="compactorsTable" class="table table-bordered table-striped
table-condensed">
+ <caption><span class="table-caption">Compactors</span>
+ <a href="javascript:refreshCompactors();"><span class="glyphicon
glyphicon-refresh"/></a></caption>
+ <thead>
+ <tr>
+ <th class="firstcell" title="The hostname the compactor is
running on.">Server</th>
+ <th title="The name of the queue this compactor is
assigned.">Queue</th>
+ <th class="duration" title="Last time data was fetched. Server
fetches on refresh, at most every minute.">Last Contact</th>
+ </tr>
+ </thead>
+ </table>
+ </div>
+ <div class="row">
+ <div class="col-xs-12">
+ <table id="runningTable" class="table table-bordered table-striped
table-condensed">
+ <caption><span class="table-caption">Running Compactions</span>
+ <a href="javascript:refreshRunning();"><span class="glyphicon
glyphicon-refresh"/></a></caption>
+ <thead>
+ <tr>
+ <th class="firstcell" title="The hostname the compactor is
running on.">Server Hostname</th>
+ <th title="The type of compaction.">Kind</th>
+ <th title="The status returned by the last
update.">Status</th>
+ <th title="The name of the queue this compactor is
assigned.">Queue</th>
+ <th title="The ID of the table being compacted.">Table
ID</th>
+ <th title="The number of files being compacted."># of
Files</th>
+ <th title="The progress of the compaction."
class="progBar">Progress</th>
+ <th class="duration" title="The time of the last update for
the compaction">Last Update</th>
+ <th class="duration" title="How long compaction has been
running">Duration</th>
+ <th class="details-control">More</th>
+ </tr>
+ </thead>
+ <tbody></tbody>
+ </table>
+ </div>
+ </div>
+ </div>
+ <#else>
+ <div id="ccBanner"><div class="alert alert-danger" role="alert">Compaction
Coordinator Not Running</div></div>
+ </#if>
diff --git
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
index f180b6d..a9cca59 100644
---
a/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
+++
b/server/monitor/src/main/resources/org/apache/accumulo/monitor/templates/navbar.ftl
@@ -54,6 +54,7 @@
<li><a href="/compactions">Active Compactions</a></li>
<li><a href="/scans">Active Scans</a></li>
<li><a href="/bulkImports">Bulk Imports</a></li>
+ <li><a href="/ec">External Compactions</a></li>
<li><a href="/replication">Replication</a></li>
</ul>
</li>
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
new file mode 100644
index 0000000..305344c
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import
org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that external compactions report progress from start to finish. To
prevent flaky test
+ * failures, we only measure progress in quarter segments: STARTED, QUARTER,
HALF, THREE_QUARTERS.
+ * We can detect if the compaction finished without errors but the coordinator
will never report
+ * 100% progress since it will remove the ECID upon completion. The {@link
SlowIterator} is used to
+ * control the length of time it takes to complete the compaction.
+ */
+public class ExternalCompactionProgressIT extends AccumuloClusterHarness {
+ private static final Logger log =
LoggerFactory.getLogger(ExternalCompactionProgressIT.class);
+ private static final int ROWS = 10_000;
+
+ enum EC_PROGRESS {
+ STARTED, QUARTER, HALF, THREE_QUARTERS
+ }
+
+ Map<String,RunningCompactorInfo> runningMap = new HashMap<>();
+ List<EC_PROGRESS> progressList = new ArrayList<>();
+
+ private final AtomicBoolean compactionFinished = new AtomicBoolean(false);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+ ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+ }
+
+ @Test
+ public void testProgress() throws Exception {
+ MiniAccumuloClusterImpl.ProcessInfo c1 = null, coord = null;
+ String table1 = this.getUniqueNames(1)[0];
+ try (AccumuloClient client =
+ Accumulo.newClient().from(getCluster().getClientProperties()).build())
{
+ ExternalCompactionTestUtils.createTable(client, table1, "cs1");
+ ExternalCompactionTestUtils.writeData(client, table1, ROWS);
+ c1 = ((MiniAccumuloClusterImpl) getCluster()).exec(Compactor.class,
"-q", "DCQ1");
+ coord =
ExternalCompactionTestUtils.startCoordinator(((MiniAccumuloClusterImpl)
getCluster()),
+ CompactionCoordinator.class, getCluster().getServerContext());
+
+ Thread checkerThread = startChecker();
+ checkerThread.start();
+
+ IteratorSetting setting = new IteratorSetting(50, "Slow",
SlowIterator.class);
+ SlowIterator.setSleepTime(setting, 1);
+ client.tableOperations().attachIterator(table1, setting,
+ EnumSet.of(IteratorUtil.IteratorScope.majc));
+ log.info("Compacting table");
+ ExternalCompactionTestUtils.compact(client, table1, 2, "DCQ1", true);
+ ExternalCompactionTestUtils.verify(client, table1, 2, ROWS);
+
+ log.info("Done Compacting table");
+ compactionFinished.set(true);
+ checkerThread.join();
+
+ verifyProgress();
+ } finally {
+ ExternalCompactionTestUtils.stopProcesses(c1, coord);
+ }
+ }
+
+ public Thread startChecker() {
+ return Threads.createThread("RC checker", () -> {
+ try {
+ while (!compactionFinished.get()) {
+ checkRunning();
+ sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+ }
+ } catch (TException e) {
+ log.warn("{}", e.getMessage(), e);
+ }
+ });
+ }
+
+ /**
+ * Check running compaction progress.
+ */
+ private void checkRunning() throws TException {
+ var ecList =
ExternalCompactionTestUtils.getRunningCompactions(getCluster().getServerContext());
+ var ecMap = ecList.getCompactions();
+ if (ecMap != null) {
+ ecMap.forEach((ecid, ec) -> {
+ // returns null if it's a new mapping
+ RunningCompactorInfo rci = new
RunningCompactorInfo(System.currentTimeMillis(), ecid, ec);
+ RunningCompactorInfo previousRci = runningMap.put(ecid, rci);
+ if (previousRci == null) {
+ log.debug("New ECID {} with inputFiles: {}", ecid, rci.inputFiles);
+ } else {
+ if (rci.progress <= previousRci.progress) {
+ log.warn("{} did not progress. It went from {} to {}", ecid,
previousRci.progress,
+ rci.progress);
+ } else {
+ log.debug("{} progressed from {} to {}", ecid,
previousRci.progress, rci.progress);
+ if (rci.progress > 0 && rci.progress <= 25)
+ progressList.add(EC_PROGRESS.STARTED);
+ else if (rci.progress > 25 && rci.progress <= 50)
+ progressList.add(EC_PROGRESS.QUARTER);
+ else if (rci.progress > 50 && rci.progress <= 75)
+ progressList.add(EC_PROGRESS.HALF);
+ else if (rci.progress > 75 && rci.progress <= 100)
+ progressList.add(EC_PROGRESS.THREE_QUARTERS);
+ }
+ if (!rci.status.equals(TCompactionState.IN_PROGRESS.name())) {
+ log.debug("Saw status other than IN_PROGRESS: {}", rci.status);
+ }
+ }
+ });
+ }
+ }
+
+ private void verifyProgress() {
+ log.info("Verify Progress.");
+ assertTrue("Missing start of progress",
progressList.contains(EC_PROGRESS.STARTED));
+ assertTrue("Missing quarter progress",
progressList.contains(EC_PROGRESS.QUARTER));
+ assertTrue("Missing half progress",
progressList.contains(EC_PROGRESS.HALF));
+ assertTrue("Missing three quarters progress",
+ progressList.contains(EC_PROGRESS.THREE_QUARTERS));
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 7f4199a..509704e 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -41,7 +42,6 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
@@ -76,6 +76,7 @@ import
org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.TestFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,11 +145,10 @@ public class ExternalCompactionTestUtils {
}
- public static void writeData(AccumuloClient client, String table1)
- throws MutationsRejectedException, TableNotFoundException,
AccumuloException,
- AccumuloSecurityException {
+ public static void writeData(AccumuloClient client, String table1, int rows)
+ throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
try (BatchWriter bw = client.createBatchWriter(table1)) {
- for (int i = 0; i < MAX_DATA; i++) {
+ for (int i = 0; i < rows; i++) {
Mutation m = new Mutation(row(i));
m.put("", "", "" + i);
bw.addMutation(m);
@@ -158,8 +158,18 @@ public class ExternalCompactionTestUtils {
client.tableOperations().flush(table1);
}
+ public static void writeData(AccumuloClient client, String table1)
+ throws TableNotFoundException, AccumuloException,
AccumuloSecurityException {
+ writeData(client, table1, MAX_DATA);
+ }
+
public static void verify(AccumuloClient client, String table1, int modulus)
throws TableNotFoundException, AccumuloSecurityException,
AccumuloException {
+ verify(client, table1, modulus, MAX_DATA);
+ }
+
+ public static void verify(AccumuloClient client, String table1, int modulus,
int rows)
+ throws TableNotFoundException, AccumuloSecurityException,
AccumuloException {
try (Scanner scanner = client.createScanner(table1)) {
int count = 0;
for (Entry<Key,Value> entry : scanner) {
@@ -169,7 +179,7 @@ public class ExternalCompactionTestUtils {
}
int expectedCount = 0;
- for (int i = 0; i < MAX_DATA; i++) {
+ for (int i = 0; i < rows; i++) {
if (i % modulus == 0)
expectedCount++;
}
@@ -237,13 +247,14 @@ public class ExternalCompactionTestUtils {
}
public static TExternalCompactionList getRunningCompactions(ClientContext
context)
- throws Exception {
- HostAndPort coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
- if (null == coordinatorHost) {
+ throws TException {
+ Optional<HostAndPort> coordinatorHost =
+ ExternalCompactionUtil.findCompactionCoordinator(context);
+ if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
}
- CompactionCoordinatorService.Client client = ThriftUtil
- .getClient(new CompactionCoordinatorService.Client.Factory(),
coordinatorHost, context);
+ CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+ new CompactionCoordinatorService.Client.Factory(),
coordinatorHost.get(), context);
try {
TExternalCompactionList running =
client.getRunningCompactions(TraceUtil.traceInfo(),
context.rpcCreds());
@@ -255,12 +266,13 @@ public class ExternalCompactionTestUtils {
private static TExternalCompactionList getCompletedCompactions(ClientContext
context)
throws Exception {
- HostAndPort coordinatorHost =
ExternalCompactionUtil.findCompactionCoordinator(context);
- if (null == coordinatorHost) {
+ Optional<HostAndPort> coordinatorHost =
+ ExternalCompactionUtil.findCompactionCoordinator(context);
+ if (coordinatorHost.isEmpty()) {
throw new TTransportException("Unable to get CompactionCoordinator
address from ZooKeeper");
}
- CompactionCoordinatorService.Client client = ThriftUtil
- .getClient(new CompactionCoordinatorService.Client.Factory(),
coordinatorHost, context);
+ CompactionCoordinatorService.Client client = ThriftUtil.getClient(
+ new CompactionCoordinatorService.Client.Factory(),
coordinatorHost.get(), context);
try {
TExternalCompactionList completed =
client.getCompletedCompactions(TraceUtil.traceInfo(),
context.rpcCreds());