This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to
refs/heads/1451-external-compactions-feature by this push:
new b21af9f ref #2026 ref #1992 ref #1993 exposed metrics and details via
http to support verification in ITs
b21af9f is described below
commit b21af9fea21bff53f3779c057ddc5bc16c730ac9
Author: Dave Marion <[email protected]>
AuthorDate: Mon Apr 26 18:44:21 2021 +0000
ref #2026 ref #1992 ref #1993 exposed metrics and details via http to
support verification in ITs
---
.../org/apache/accumulo/core/conf/Property.java | 4 +-
server/compaction-coordinator/pom.xml | 4 +
.../coordinator/CompactionCoordinator.java | 78 ++++++++++++++++++
.../coordinator/ExternalCompactionMetrics.java | 92 ++++++++++++++++++++++
.../coordinator/CompactionCoordinatorTest.java | 6 ++
.../org/apache/accumulo/compactor/Compactor.java | 8 +-
.../apache/accumulo/test/ExternalCompactionIT.java | 78 +++++++++++++++++-
.../accumulo/test/ExternalDoNothingCompactor.java | 8 ++
test/src/main/resources/log4j2-test.properties | 3 +
9 files changed, 276 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f0713c2..aff4398 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1043,7 +1043,9 @@ public enum Property {
COORDINATOR_PORTSEARCH("coordinator.port.search", "false",
PropertyType.BOOLEAN,
"if the ports above are in use, search higher ports until one is
available"),
COORDINATOR_CLIENTPORT("coordinator.port.client", "9100", PropertyType.PORT,
- "The port used for handling client connections on the compactor
servers"),
+ "The port used for handling Thrift client connections on the compaction
coordinator server"),
+ COORDINATOR_METRICPORT("coordinator.port.metrics", "9099", PropertyType.PORT,
+ "The port used for the metric http server on the compaction coordinator
server"),
COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1",
PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests."),
COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s",
diff --git a/server/compaction-coordinator/pom.xml
b/server/compaction-coordinator/pom.xml
index 66f98b6..4ed49e3 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -60,6 +60,10 @@
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index e6e7549..bbf23a2 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator;
import static
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.List;
@@ -29,6 +30,10 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -74,13 +79,23 @@ import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.http.entity.ContentType;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+
public class CompactionCoordinator extends AbstractServer
implements
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
LiveTServerSet.Listener {
@@ -93,6 +108,8 @@ public class CompactionCoordinator extends AbstractServer
protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
+ private static final Gson GSON = new Gson();
+
/* Map of compactionId to RunningCompactions */
protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
new ConcurrentHashMap<>();
@@ -100,6 +117,7 @@ public class CompactionCoordinator extends AbstractServer
/* Map of queue name to last time compactor called to get a compaction job */
private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new
ConcurrentHashMap<>();
+ private final ExternalCompactionMetrics metrics = new
ExternalCompactionMetrics();
private final GarbageCollectionLogger gcLogger = new
GarbageCollectionLogger();
protected SecurityOperation security;
protected final AccumuloConfiguration aconf;
@@ -221,6 +239,48 @@ public class CompactionCoordinator extends AbstractServer
return sp;
}
+ protected Server startHttpMetricServer() throws Exception {
+ int port =
getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT)
+ .iterator().next();
+ String hostname = getHostname();
+ Server metricServer = new Server(new QueuedThreadPool(4, 1));
+ ServerConnector c = new ServerConnector(metricServer);
+ c.setHost(hostname);
+ c.setPort(port);
+ metricServer.addConnector(c);
+ ContextHandlerCollection handlers = new ContextHandlerCollection();
+ metricServer.setHandler(handlers);
+ ContextHandler metricContext = new ContextHandler("/metrics");
+ metricContext.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest,
HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+ response.setStatus(200);
+ response.setContentType(ContentType.APPLICATION_JSON.toString());
+ response.getWriter().print(metrics.toJson(GSON));
+ }
+ });
+ handlers.addHandler(metricContext);
+
+ ContextHandler detailsContext = new ContextHandler("/details");
+ detailsContext.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest,
HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+ response.setStatus(200);
+ response.setContentType(ContentType.APPLICATION_JSON.toString());
+ response.getWriter().print(GSON.toJson(RUNNING));
+ }
+ });
+ handlers.addHandler(detailsContext);
+
+ metricServer.start();
+ LOG.info("Metrics HTTP server listening on {}:{}", hostname, port);
+ return metricServer;
+ }
+
@Override
public void run() {
@@ -232,6 +292,13 @@ public class CompactionCoordinator extends AbstractServer
}
final HostAndPort clientAddress = coordinatorAddress.address;
+ Server metricServer = null;
+ try {
+ metricServer = startHttpMetricServer();
+ } catch (Exception e1) {
+ throw new RuntimeException("Failed to start metric http server", e1);
+ }
+
try {
getCoordinatorLock(clientAddress);
} catch (KeeperException | InterruptedException e) {
@@ -400,7 +467,15 @@ public class CompactionCoordinator extends AbstractServer
UtilWaitThread.sleep(checkInterval - duration);
}
}
+
LOG.info("Shutting down");
+ if (null != metricServer) {
+ try {
+ metricServer.stop();
+ } catch (Exception e) {
+ LOG.error("Error stopping metric server", e);
+ }
+ }
}
protected long getMissingCompactorWarningTime() {
@@ -484,6 +559,7 @@ public class CompactionCoordinator extends AbstractServer
}
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
new RunningCompaction(job, compactorAddress, tserver));
+ metrics.incrementStarted();
LOG.debug("Returning external job {} to {}", job.externalCompactionId,
compactorAddress);
result = job;
break;
@@ -632,6 +708,7 @@ public class CompactionCoordinator extends AbstractServer
rc.setCompleted();
compactionFinalizer.commitCompaction(ecid,
KeyExtent.fromThrift(textent), stats.fileSize,
stats.entriesWritten);
+ metrics.incrementCompleted();
} else {
LOG.error(
"Compaction completed called by Compactor for {}, but no running
compaction for that id.",
@@ -655,6 +732,7 @@ public class CompactionCoordinator extends AbstractServer
// CBUG: Should we remove rc from RUNNING here and remove the
isCompactionCompleted method?
rc.setCompleted();
compactionFinalizer.failCompactions(Map.of(ecid,
KeyExtent.fromThrift(extent)));
+ metrics.incrementFailed();
} else {
LOG.error(
"Compaction failed called by Compactor for {}, but no running
compaction for that id.",
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
new file mode 100644
index 0000000..2343583
--- /dev/null
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import com.google.gson.Gson;
+
+public class ExternalCompactionMetrics {
+
+ private long started = 0;
+ private long running = 0;
+ private long completed = 0;
+ private long failed = 0;
+
+ public long getStarted() {
+ return started;
+ }
+
+ public void setStarted(long started) {
+ this.started = started;
+ }
+
+ public void incrementStarted() {
+ this.started++;
+ }
+
+ public long getRunning() {
+ return running;
+ }
+
+ public void setRunning(long running) {
+ this.running = running;
+ }
+
+ public void incrementRunning() {
+ this.running++;
+ }
+
+ public long getCompleted() {
+ return completed;
+ }
+
+ public void setCompleted(long completed) {
+ this.completed = completed;
+ }
+
+ public void incrementCompleted() {
+ this.completed++;
+ }
+
+ public long getFailed() {
+ return failed;
+ }
+
+ public void setFailed(long failed) {
+ this.failed = failed;
+ }
+
+ public void incrementFailed() {
+ this.failed++;
+ }
+
+ public String toJson(Gson gson) {
+ return gson.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("started: ").append(started);
+ buf.append("running: ").append(running);
+ buf.append("completed: ").append(completed);
+ buf.append("failed: ").append(failed);
+ return buf.toString();
+ }
+
+}
diff --git
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index e1ec07d..7e1d27c 100644
---
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -61,6 +61,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -184,6 +185,11 @@ public class CompactionCoordinatorTest {
};
}
+ @Override
+ protected Server startHttpMetricServer() throws Exception {
+ return null;
+ }
+
}
@Test
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 6f98923..74344b7 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
@@ -174,7 +175,7 @@ public class Compactor extends AbstractServer
TimeUnit.MILLISECONDS);
}
- private void checkIfCanceled() {
+ protected void checkIfCanceled() {
TExternalCompactionJob job = JOB_HOLDER.getJob();
if (job != null) {
try {
@@ -600,6 +601,10 @@ public class Compactor extends AbstractServer
return supplier;
}
+ protected long getWaitTimeBetweenCompactionChecks() {
+ return 3000;
+ }
+
@Override
public void run() {
@@ -631,6 +636,7 @@ public class Compactor extends AbstractServer
job = getNextJob(getNextId());
if (!job.isSetExternalCompactionId()) {
LOG.info("No external compactions in queue {}", this.queueName);
+ UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks());
continue;
}
if
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 32566b1..a2b7a3c 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -21,6 +21,11 @@ package org.apache.accumulo.test;
import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -33,6 +38,7 @@ import java.util.stream.Stream;
import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.coordinator.ExternalCompactionMetrics;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -75,6 +81,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
public class ExternalCompactionIT extends ConfigurableMacBase {
@@ -194,6 +201,56 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
}
@Test
+ public void testUserCompactionCancellation() throws Exception {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
+
+ String table1 = "ectt6";
+ createTable(client, table1, "cs1");
+ writeData(client, table1);
+
+ // The ExternalDoNothingCompactor creates a compaction thread that
+ // sleeps for 5 minutes.
+ // Wait for the coordinator to insert the running compaction metadata
+ // entry into the metadata table, then cancel the compaction
+ cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+ cluster.exec(CompactionCoordinator.class);
+
+ compact(client, table1, 2, "DCQ1", false);
+
+ List<TabletMetadata> md = new ArrayList<>();
+ TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build();
+ tm.forEach(t -> md.add(t));
+
+ while (md.size() == 0) {
+ tm.close();
+ tm =
getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER)
+ .fetch(ColumnType.ECOMP).build();
+ tm.forEach(t -> md.add(t));
+ }
+ client.tableOperations().cancelCompaction(table1);
+
+ // ExternalDoNothingCompactor runs the cancel checker every 5s and the
compaction thread
+ // sleeps for 1s between checks to see if it's canceled or not.
+ UtilWaitThread.sleep(8000);
+
+ // The metadata tablets will be deleted from the metadata table because
we have deleted the
+ // table
+ // Verify that the compaction failed by looking at the metrics in the
Coordinator.
+ HttpRequest req =
+ HttpRequest.newBuilder().GET().uri(new
URI("http://localhost:9099/metrics")).build();
+ HttpClient hc = HttpClient.newHttpClient();
+ HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+ ExternalCompactionMetrics metrics =
+ new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+ Assert.assertEquals(1, metrics.getStarted());
+ Assert.assertEquals(0, metrics.getRunning());
+ Assert.assertEquals(0, metrics.getCompleted());
+ Assert.assertEquals(1, metrics.getFailed());
+ }
+ }
+
+ @Test
public void testDeleteTableDuringExternalCompaction() throws Exception {
try (AccumuloClient client =
Accumulo.newClient().from(getClientProperties()).build()) {
@@ -221,14 +278,29 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
while (md.size() == 0) {
tm.close();
- md.clear();
tm =
getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER)
.fetch(ColumnType.ECOMP).build();
tm.forEach(t -> md.add(t));
}
client.tableOperations().delete(table1);
- // CBUG: How to verify? Metadata tablets are gone...
- UtilWaitThread.sleep(1000); // to see the logs
+
+ // ExternalDoNothingCompactor runs the cancel checker every 5s and the
compaction thread
+ // sleeps for 1s between checks to see if it's canceled or not.
+ UtilWaitThread.sleep(8000);
+
+ // The metadata tablets will be deleted from the metadata table because
we have deleted the
+ // table
+ // Verify that the compaction failed by looking at the metrics in the
Coordinator.
+ HttpRequest req =
+ HttpRequest.newBuilder().GET().uri(new
URI("http://localhost:9099/metrics")).build();
+ HttpClient hc = HttpClient.newHttpClient();
+ HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+ ExternalCompactionMetrics metrics =
+ new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+ Assert.assertEquals(1, metrics.getStarted());
+ Assert.assertEquals(0, metrics.getRunning());
+ Assert.assertEquals(0, metrics.getCompleted());
+ Assert.assertEquals(1, metrics.getFailed());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
index b1441ae..9199c44 100644
---
a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
+++
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
@@ -19,6 +19,8 @@
package org.apache.accumulo.test;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@@ -40,6 +42,12 @@ public class ExternalDoNothingCompactor extends Compactor
implements Iface {
}
@Override
+ protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
+ long timeBetweenChecks) {
+ schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder
totalInputEntries,
LongAdder totalInputBytes, CountDownLatch started, CountDownLatch
stopped,
AtomicReference<Throwable> err) {
diff --git a/test/src/main/resources/log4j2-test.properties
b/test/src/main/resources/log4j2-test.properties
index 35891b8..b61cdc1 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -143,6 +143,9 @@ logger.35.level = info
logger.36.name = org.apache.thrift.transport.TIOStreamTransport
logger.36.level = error
+logger.37.name = org.eclipse.jetty
+logger.37.level = warn
+
rootLogger.level = debug
rootLogger.appenderRef.console.ref = STDOUT