This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new b21af9f ref #2026 ref #1992 ref #1993 exposed metrics and details via http to support verification in ITs b21af9f is described below commit b21af9fea21bff53f3779c057ddc5bc16c730ac9 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 26 18:44:21 2021 +0000 ref #2026 ref #1992 ref #1993 exposed metrics and details via http to support verification in ITs --- .../org/apache/accumulo/core/conf/Property.java | 4 +- server/compaction-coordinator/pom.xml | 4 + .../coordinator/CompactionCoordinator.java | 78 ++++++++++++++++++ .../coordinator/ExternalCompactionMetrics.java | 92 ++++++++++++++++++++++ .../coordinator/CompactionCoordinatorTest.java | 6 ++ .../org/apache/accumulo/compactor/Compactor.java | 8 +- .../apache/accumulo/test/ExternalCompactionIT.java | 78 +++++++++++++++++- .../accumulo/test/ExternalDoNothingCompactor.java | 8 ++ test/src/main/resources/log4j2-test.properties | 3 + 9 files changed, 276 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f0713c2..aff4398 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1043,7 +1043,9 @@ public enum Property { COORDINATOR_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN, "if the ports above are in use, search higher ports until one is available"), COORDINATOR_CLIENTPORT("coordinator.port.client", "9100", PropertyType.PORT, - "The port used for handling client connections on the compactor servers"), + "The port used for handling Thrift client connections on the compaction coordinator server"), + COORDINATOR_METRICPORT("coordinator.port.metrics", "9099", PropertyType.PORT, + "The port used for the metric http server on the compaction coordinator server"), COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml index 66f98b6..4ed49e3 100644 --- a/server/compaction-coordinator/pom.xml +++ b/server/compaction-coordinator/pom.xml @@ -60,6 +60,10 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index e6e7549..bbf23a2 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; +import java.io.IOException; import java.net.UnknownHostException; import java.time.Duration; import java.util.List; @@ -29,6 +30,10 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -74,13 +79,23 @@ import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.http.entity.ContentType; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; + public class CompactionCoordinator extends AbstractServer implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener { @@ -93,6 +108,8 @@ public class CompactionCoordinator extends AbstractServer protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); + private static final Gson GSON = new Gson(); + /* Map of compactionId to RunningCompactions */ protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING = new ConcurrentHashMap<>(); @@ -100,6 +117,7 @@ public class CompactionCoordinator extends AbstractServer /* Map of queue name to last time compactor called to get a compaction job */ private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + private final ExternalCompactionMetrics metrics = new ExternalCompactionMetrics(); private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); protected SecurityOperation security; protected final AccumuloConfiguration aconf; @@ -221,6 +239,48 @@ public class CompactionCoordinator extends AbstractServer return sp; } + protected Server startHttpMetricServer() throws Exception { + int port = getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT) + .iterator().next(); + String hostname = getHostname(); + Server metricServer = new Server(new QueuedThreadPool(4, 1)); + ServerConnector c = new ServerConnector(metricServer); + c.setHost(hostname); + c.setPort(port); + metricServer.addConnector(c); + ContextHandlerCollection handlers = new ContextHandlerCollection(); + metricServer.setHandler(handlers); + ContextHandler metricContext = new ContextHandler("/metrics"); + metricContext.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType(ContentType.APPLICATION_JSON.toString()); + response.getWriter().print(metrics.toJson(GSON)); + } + }); + handlers.addHandler(metricContext); + + ContextHandler detailsContext = new ContextHandler("/details"); + detailsContext.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType(ContentType.APPLICATION_JSON.toString()); + response.getWriter().print(GSON.toJson(RUNNING)); + } + }); + handlers.addHandler(detailsContext); + + metricServer.start(); + LOG.info("Metrics HTTP server listening on {}:{}", hostname, port); + return metricServer; + } + @Override public void run() { @@ -232,6 +292,13 @@ public class CompactionCoordinator extends AbstractServer } final HostAndPort clientAddress = coordinatorAddress.address; + Server metricServer = null; + try { + metricServer = startHttpMetricServer(); + } catch (Exception e1) { + throw new RuntimeException("Failed to start metric http server", e1); + } + try { getCoordinatorLock(clientAddress); } catch (KeeperException | InterruptedException e) { @@ -400,7 +467,15 @@ public class CompactionCoordinator extends AbstractServer UtilWaitThread.sleep(checkInterval - duration); } } + LOG.info("Shutting down"); + if (null != metricServer) { + try { + metricServer.stop(); + } catch (Exception e) { + LOG.error("Error stopping metric server", e); + } + } } protected long getMissingCompactorWarningTime() { @@ -484,6 +559,7 @@ public class CompactionCoordinator extends AbstractServer } RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), new RunningCompaction(job, compactorAddress, tserver)); + metrics.incrementStarted(); LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); result = job; break; @@ -632,6 +708,7 @@ public class CompactionCoordinator extends AbstractServer rc.setCompleted(); compactionFinalizer.commitCompaction(ecid, KeyExtent.fromThrift(textent), stats.fileSize, stats.entriesWritten); + metrics.incrementCompleted(); } else { LOG.error( "Compaction completed called by Compactor for {}, but no running compaction for that id.", @@ -655,6 +732,7 @@ public class CompactionCoordinator extends AbstractServer // CBUG: Should we remove rc from RUNNING here and remove the isCompactionCompleted method? rc.setCompleted(); compactionFinalizer.failCompactions(Map.of(ecid, KeyExtent.fromThrift(extent))); + metrics.incrementFailed(); } else { LOG.error( "Compaction failed called by Compactor for {}, but no running compaction for that id.", diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java new file mode 100644 index 0000000..2343583 --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import com.google.gson.Gson; + +public class ExternalCompactionMetrics { + + private long started = 0; + private long running = 0; + private long completed = 0; + private long failed = 0; + + public long getStarted() { + return started; + } + + public void setStarted(long started) { + this.started = started; + } + + public void incrementStarted() { + this.started++; + } + + public long getRunning() { + return running; + } + + public void setRunning(long running) { + this.running = running; + } + + public void incrementRunning() { + this.running++; + } + + public long getCompleted() { + return completed; + } + + public void setCompleted(long completed) { + this.completed = completed; + } + + public void incrementCompleted() { + this.completed++; + } + + public long getFailed() { + return failed; + } + + public void setFailed(long failed) { + this.failed = failed; + } + + public void incrementFailed() { + this.failed++; + } + + public String toJson(Gson gson) { + return gson.toJson(this); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("started: ").append(started); + buf.append("running: ").append(running); + buf.append("completed: ").append(completed); + buf.append("failed: ").append(failed); + return buf.toString(); + } + +} diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index e1ec07d..7e1d27c 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -61,6 +61,7 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.easymock.EasyMock; +import org.eclipse.jetty.server.Server; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -184,6 +185,11 @@ public class CompactionCoordinatorTest { }; } + @Override + protected Server startHttpMetricServer() throws Exception { + return null; + } + } @Test diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 6f98923..74344b7 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -68,6 +68,7 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ServiceLock; import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; @@ -174,7 +175,7 @@ public class Compactor extends AbstractServer TimeUnit.MILLISECONDS); } - private void checkIfCanceled() { + protected void checkIfCanceled() { TExternalCompactionJob job = JOB_HOLDER.getJob(); if (job != null) { try { @@ -600,6 +601,10 @@ public class Compactor extends AbstractServer return supplier; } + protected long getWaitTimeBetweenCompactionChecks() { + return 3000; + } + @Override public void run() { @@ -631,6 +636,7 @@ public class Compactor extends AbstractServer job = getNextJob(getNextId()); if (!job.isSetExternalCompactionId()) { LOG.info("No external compactions in queue {}", this.queueName); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks()); continue; } if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 32566b1..a2b7a3c 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -21,6 +21,11 @@ package org.apache.accumulo.test; import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -33,6 +38,7 @@ import java.util.stream.Stream; import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv; import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.coordinator.ExternalCompactionMetrics; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -75,6 +81,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.gson.Gson; public class ExternalCompactionIT extends ConfigurableMacBase { @@ -194,6 +201,56 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } @Test + public void testUserCompactionCancellation() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + + String table1 = "ectt6"; + createTable(client, table1, "cs1"); + writeData(client, table1); + + // The ExternalDoNothingCompactor creates a compaction thread that + // sleeps for 5 minutes. + // Wait for the coordinator to insert the running compaction metadata + // entry into the metadata table, then cancel the compaction + cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + cluster.exec(CompactionCoordinator.class); + + compact(client, table1, 2, "DCQ1", false); + + List<TabletMetadata> md = new ArrayList<>(); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + + while (md.size() == 0) { + tm.close(); + tm = getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + } + client.tableOperations().cancelCompaction(table1); + + // ExternalDoNothingCompactor runs the cancel checker every 5s and the compaction thread + // sleeps for 1s between checks to see if it's canceled or not. + UtilWaitThread.sleep(8000); + + // The metadata tablets will be deleted from the metadata table because we have deleted the + // table + // Verify that the compaction failed by looking at the metrics in the Coordinator. + HttpRequest req = + HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build(); + HttpClient hc = HttpClient.newHttpClient(); + HttpResponse<String> res = hc.send(req, BodyHandlers.ofString()); + ExternalCompactionMetrics metrics = + new Gson().fromJson(res.body(), ExternalCompactionMetrics.class); + Assert.assertEquals(1, metrics.getStarted()); + Assert.assertEquals(0, metrics.getRunning()); + Assert.assertEquals(0, metrics.getCompleted()); + Assert.assertEquals(1, metrics.getFailed()); + } + } + + @Test public void testDeleteTableDuringExternalCompaction() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { @@ -221,14 +278,29 @@ public class ExternalCompactionIT extends ConfigurableMacBase { while (md.size() == 0) { tm.close(); - md.clear(); tm = getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER) .fetch(ColumnType.ECOMP).build(); tm.forEach(t -> md.add(t)); } client.tableOperations().delete(table1); - // CBUG: How to verify? Metadata tablets are gone... - UtilWaitThread.sleep(1000); // to see the logs + + // ExternalDoNothingCompactor runs the cancel checker every 5s and the compaction thread + // sleeps for 1s between checks to see if it's canceled or not. + UtilWaitThread.sleep(8000); + + // The metadata tablets will be deleted from the metadata table because we have deleted the + // table + // Verify that the compaction failed by looking at the metrics in the Coordinator. + HttpRequest req = + HttpRequest.newBuilder().GET().uri(new URI("http://localhost:9099/metrics")).build(); + HttpClient hc = HttpClient.newHttpClient(); + HttpResponse<String> res = hc.send(req, BodyHandlers.ofString()); + ExternalCompactionMetrics metrics = + new Gson().fromJson(res.body(), ExternalCompactionMetrics.class); + Assert.assertEquals(1, metrics.getStarted()); + Assert.assertEquals(0, metrics.getRunning()); + Assert.assertEquals(0, metrics.getCompleted()); + Assert.assertEquals(1, metrics.getFailed()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java index b1441ae..9199c44 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java @@ -19,6 +19,8 @@ package org.apache.accumulo.test; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -40,6 +42,12 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { } @Override + protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, + long timeBetweenChecks) { + schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000, TimeUnit.MILLISECONDS); + } + + @Override protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, AtomicReference<Throwable> err) { diff --git a/test/src/main/resources/log4j2-test.properties b/test/src/main/resources/log4j2-test.properties index 35891b8..b61cdc1 100644 --- a/test/src/main/resources/log4j2-test.properties +++ b/test/src/main/resources/log4j2-test.properties @@ -143,6 +143,9 @@ logger.35.level = info logger.36.name = org.apache.thrift.transport.TIOStreamTransport logger.36.level = error +logger.37.name = org.eclipse.jetty +logger.37.level = warn + rootLogger.level = debug rootLogger.appenderRef.console.ref = STDOUT