This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 10a295d  closes #2059 - add IT for coordinator restart during 
compaction
10a295d is described below

commit 10a295d6f8be22cdf5d015bfab58fd6672594393
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon May 3 16:53:50 2021 +0000

    closes #2059 - add IT for coordinator restart during compaction
---
 .../apache/accumulo/test/ExternalCompactionIT.java | 55 ++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index df34c25..9ab5f55 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -263,6 +263,61 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
   }
 
   @Test
+  public void testCoordinatorRestartsDuringCompaction() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      String table1 = "ectt9";
+      createTable(client, table1, "cs1", 2);
+      writeData(client, table1);
+      cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+      ProcessInfo process = cluster.exec(CompactionCoordinator.class);
+      compact(client, table1, 2, "DCQ1", false);
+      TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
+      // Wait for the compaction to start by waiting for 1 external compaction 
column
+      Set<ExternalCompactionId> ecids = new HashSet<>();
+      do {
+        UtilWaitThread.sleep(50);
+        try (TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
+            .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+          tm.stream().flatMap(t -> 
t.getExternalCompactions().keySet().stream())
+              .forEach(ecids::add);
+        }
+      } while (ecids.isEmpty());
+
+      // Stop the Coordinator
+      Process coord = process.getProcess();
+      if (coord.supportsNormalTermination()) {
+        cluster.stopProcessWithTimeout(coord, 60, TimeUnit.SECONDS);
+      } else {
+        LOG.info("Stopping tserver manually");
+        new ProcessBuilder("kill", Long.toString(coord.pid())).start();
+        coord.waitFor();
+      }
+
+      // Start the TestCompactionCoordinator so that we have
+      // access to the metrics.
+      cluster.exec(TestCompactionCoordinator.class);
+
+      // Wait for coordinator to start
+      ExternalCompactionMetrics metrics = null;
+      while (null == metrics) {
+        try {
+          metrics = getCoordinatorMetrics();
+        } catch (Exception e) {
+          UtilWaitThread.sleep(250);
+        }
+      }
+
+      // wait for failure or test timeout
+      metrics = getCoordinatorMetrics();
+      while (metrics.getRunning() == 0) {
+        UtilWaitThread.sleep(250);
+        metrics = getCoordinatorMetrics();
+      }
+
+    }
+  }
+
+  @Test
   public void testCompactionAndCompactorDies() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
       // Stop the TabletServer so that it does not commit the compaction and 
remove

Reply via email to