This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 10a295d closes #2059 - add IT for coordinator restart during compaction 10a295d is described below commit 10a295d6f8be22cdf5d015bfab58fd6672594393 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon May 3 16:53:50 2021 +0000 closes #2059 - add IT for coordinator restart during compaction --- .../apache/accumulo/test/ExternalCompactionIT.java | 55 ++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index df34c25..9ab5f55 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -263,6 +263,61 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } @Test + public void testCoordinatorRestartsDuringCompaction() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String table1 = "ectt9"; + createTable(client, table1, "cs1", 2); + writeData(client, table1); + cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + ProcessInfo process = cluster.exec(CompactionCoordinator.class); + compact(client, table1, 2, "DCQ1", false); + TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); + // Wait for the compaction to start by waiting for 1 external compaction column + Set<ExternalCompactionId> ecids = new HashSet<>(); + do { + UtilWaitThread.sleep(50); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forTable(tid).fetch(ColumnType.ECOMP).build()) { + tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()) + .forEach(ecids::add); + } + } while (ecids.isEmpty()); + + // Stop the Coordinator + Process coord = process.getProcess(); + if (coord.supportsNormalTermination()) { + cluster.stopProcessWithTimeout(coord, 60, TimeUnit.SECONDS); + } else { + LOG.info("Stopping tserver manually"); + new ProcessBuilder("kill", Long.toString(coord.pid())).start(); + coord.waitFor(); + } + + // Start the TestCompactionCoordinator so that we have + // access to the metrics. + cluster.exec(TestCompactionCoordinator.class); + + // Wait for coordinator to start + ExternalCompactionMetrics metrics = null; + while (null == metrics) { + try { + metrics = getCoordinatorMetrics(); + } catch (Exception e) { + UtilWaitThread.sleep(250); + } + } + + // wait for failure or test timeout + metrics = getCoordinatorMetrics(); + while (metrics.getRunning() == 0) { + UtilWaitThread.sleep(250); + metrics = getCoordinatorMetrics(); + } + + } + } + + @Test public void testCompactionAndCompactorDies() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { // Stop the TabletServer so that it does not commit the compaction and remove