This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 5edcccc277 Enables CompactionExecutorIT tests (#4080)
5edcccc277 is described below

commit 5edcccc277a3ad2773551860d7f38b3470bbefb1
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Mon Dec 18 22:26:42 2023 -0500

    Enables CompactionExecutorIT tests (#4080)
    
    * Adds additional method to stop compactors by resource group names.
    * Remove failing compaction selector test
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../MiniAccumuloClusterControl.java                |  20 +++
 .../test/compaction/CompactionExecutorIT.java      | 165 ++++++++++++++-------
 2 files changed, 134 insertions(+), 51 deletions(-)

diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 6d3b36026e..b3977dab00 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -254,6 +254,26 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     stop(server, null);
   }
 
+  public void stopCompactorGroup(String compactorResourceGroup) {
+    synchronized (compactorProcesses) {
+      var group = compactorProcesses.get(compactorResourceGroup);
+      if (group == null) {
+        return;
+      }
+      group.forEach(process -> {
+        try {
+          cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
+        } catch (ExecutionException | TimeoutException e) {
+          log.warn("Compactor did not fully stop after 30 seconds", e);
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      });
+      compactorProcesses.remove(compactorResourceGroup);
+    }
+  }
+
   @Override
   public synchronized void stop(ServerType server, String hostname) throws 
IOException {
     switch (server) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
index a7309cb693..19f6d0bb30 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.compaction;
 
+import static org.apache.accumulo.core.util.LazySingletons.GSON;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -29,7 +30,7 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,6 +39,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -67,36 +69,50 @@ import 
org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactionPlan;
 import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 
-@Disabled // ELASTICITY_TODO
 public class CompactionExecutorIT extends SharedMiniClusterBase {
+  public static final List<String> compactionGroups = new LinkedList<>();
+  public static final Logger log = 
LoggerFactory.getLogger(CompactionExecutorIT.class);
 
   public static class TestPlanner implements CompactionPlanner {
 
+    private static class ExecutorConfig {
+      String group;
+    }
+
     private int filesPerCompaction;
     private List<CompactionExecutorId> executorIds;
     private EnumSet<CompactionKind> kindsToProcess = 
EnumSet.noneOf(CompactionKind.class);
 
     @Override
     public void init(InitParameters params) {
-      var executors = Integer.parseInt(params.getOptions().get("executors"));
+      var executors = params.getOptions().get("executors");
       this.filesPerCompaction = 
Integer.parseInt(params.getOptions().get("filesPerCompaction"));
       this.executorIds = new ArrayList<>();
       for (String kind : params.getOptions().get("process").split(",")) {
         kindsToProcess.add(CompactionKind.valueOf(kind.toUpperCase()));
       }
 
-      for (int i = 0; i < executors; i++) {
-        var ceid = params.getExecutorManager().createExecutor("e" + i, 2);
+      for (JsonElement element : GSON.get().fromJson(executors, 
JsonArray.class)) {
+        ExecutorConfig executorConfig = GSON.get().fromJson(element, 
ExecutorConfig.class);
+        var ceid = 
params.getExecutorManager().getExternalExecutor(executorConfig.group);
         executorIds.add(ceid);
       }
 
@@ -137,40 +153,57 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
     }
   }
 
-  @BeforeAll
-  public static void setup() throws Exception {
-    SharedMiniClusterBase.startMiniClusterWithConfig((miniCfg, coreSite) -> {
-      Map<String,String> siteCfg = new HashMap<>();
-
+  public static class CompactionExecutorITConfig implements 
MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
conf) {
       var csp = Property.COMPACTION_SERVICE_PREFIX.getKey();
-      siteCfg.put(csp + "cs1.planner", TestPlanner.class.getName());
-      siteCfg.put(csp + "cs1.planner.opts.executors", "3");
-      siteCfg.put(csp + "cs1.planner.opts.filesPerCompaction", "5");
-      siteCfg.put(csp + "cs1.planner.opts.process", "SYSTEM");
-
-      siteCfg.put(csp + "cs2.planner", TestPlanner.class.getName());
-      siteCfg.put(csp + "cs2.planner.opts.executors", "2");
-      siteCfg.put(csp + "cs2.planner.opts.filesPerCompaction", "7");
-      siteCfg.put(csp + "cs2.planner.opts.process", "SYSTEM");
-
-      siteCfg.put(csp + "cs3.planner", TestPlanner.class.getName());
-      siteCfg.put(csp + "cs3.planner.opts.executors", "1");
-      siteCfg.put(csp + "cs3.planner.opts.filesPerCompaction", "3");
-      siteCfg.put(csp + "cs3.planner.opts.process", "USER");
-
-      siteCfg.put(csp + "cs4.planner", TestPlanner.class.getName());
-      siteCfg.put(csp + "cs4.planner.opts.executors", "2");
-      siteCfg.put(csp + "cs4.planner.opts.filesPerCompaction", "11");
-      siteCfg.put(csp + "cs4.planner.opts.process", "USER");
+      cfg.setProperty(csp + "cs1.planner", TestPlanner.class.getName());
+      cfg.setProperty(csp + "cs1.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'e1'},"
+              + "{'name':'two', 'type':'external', 'group':'e2'}, 
{'name':'three', 'type':'external','group':'e3'}]");
+      cfg.setProperty(csp + "cs1.planner.opts.filesPerCompaction", "5");
+      cfg.setProperty(csp + "cs1.planner.opts.process", "SYSTEM");
+
+      cfg.setProperty(csp + "cs2.planner", TestPlanner.class.getName());
+      cfg.setProperty(csp + "cs2.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'f1'},"
+              + "{'name':'two', 'type':'external', 'group':'f2'}]");
+      cfg.setProperty(csp + "cs2.planner.opts.filesPerCompaction", "7");
+      cfg.setProperty(csp + "cs2.planner.opts.process", "SYSTEM");
+
+      cfg.setProperty(csp + "cs3.planner", TestPlanner.class.getName());
+      cfg.setProperty(csp + "cs3.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'g1'}]");
+      cfg.setProperty(csp + "cs3.planner.opts.filesPerCompaction", "3");
+      cfg.setProperty(csp + "cs3.planner.opts.process", "USER");
+
+      cfg.setProperty(csp + "cs4.planner", TestPlanner.class.getName());
+      cfg.setProperty(csp + "cs4.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'h1'},"
+              + "{'name':'two', 'type':'external', 'group':'h2'}]");
+      cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11");
+      cfg.setProperty(csp + "cs4.planner.opts.process", "USER");
 
       // this is meant to be dynamically reconfigured
-      siteCfg.put(csp + "recfg.planner", TestPlanner.class.getName());
-      siteCfg.put(csp + "recfg.planner.opts.executors", "2");
-      siteCfg.put(csp + "recfg.planner.opts.filesPerCompaction", "11");
-      siteCfg.put(csp + "recfg.planner.opts.process", "SYSTEM");
+      cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName());
+      cfg.setProperty(csp + "recfg.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'i1'},"
+              + "{'name':'two', 'type':'external', 'group':'i2'}]");
+      cfg.setProperty(csp + "recfg.planner.opts.filesPerCompaction", "11");
+      cfg.setProperty(csp + "recfg.planner.opts.process", "SYSTEM");
+
+      Stream.of("e1", "e2", "e3", "f1", "f2", "g1", "h1", "h2", "i1", "i2")
+          .forEach(s -> 
cfg.getClusterServerConfiguration().addCompactorResourceGroup(s, 0));
+
+      // use raw local file system
+      conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    }
+  }
 
-      miniCfg.setSiteConfig(siteCfg);
-    });
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase
+        .startMiniClusterWithConfig(new 
CompactionExecutorIT.CompactionExecutorITConfig());
   }
 
   @AfterAll
@@ -180,6 +213,18 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
 
   @AfterEach
   public void cleanup() {
+    var iter = compactionGroups.iterator();
+    while (iter.hasNext()) {
+      var group = iter.next();
+      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(group,
 0);
+      try {
+        getCluster().getClusterControl().stopCompactorGroup(group);
+      } catch (Exception e) {
+        log.warn("Compaction group: {} failed to fully stop", group, e);
+      } finally {
+        iter.remove();
+      }
+    }
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       client.tableOperations().list().stream()
           .filter(
@@ -196,6 +241,12 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
 
   @Test
   public void testReconfigureCompactionService() throws Exception {
+    Stream.of("i1", "i2").forEach(g -> {
+      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(g,
 1);
+      compactionGroups.add(g);
+    });
+    getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       createTable(client, "rctt", "recfg");
 
@@ -211,7 +262,12 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
           Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"recfg.planner.opts.filesPerCompaction",
           "5");
       client.instanceOperations().setProperty(
-          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"recfg.planner.opts.executors", "1");
+          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"recfg.planner.opts.executors",
+          "[{'name':'small', 'type':'external', 'group':'group1'}]");
+      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup("group1",
+          1);
+      compactionGroups.add("group1");
+      getCluster().getClusterControl().start(ServerType.COMPACTOR);
 
       addFiles(client, "rctt", 10);
 
@@ -232,11 +288,19 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
       client.instanceOperations().setProperty(
           Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.process", "SYSTEM");
       client.instanceOperations().setProperty(
-          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.executors", "3");
+          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"newcs.planner.opts.executors",
+          "[{'name':'one', 'type':'external', 'group':'add1'},"
+              + "{'name':'two', 'type':'external', 'group':'add2'}, 
{'name':'three', 'type':'external','group':'add3'}]");
       client.instanceOperations().setProperty(
           Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner",
           TestPlanner.class.getName());
 
+      Stream.of("add1", "add2", "add3").forEach(s -> {
+        
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s,
 1);
+        compactionGroups.add(s);
+      });
+      getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
       createTable(client, "acst", "newcs");
 
       addFiles(client, "acst", 42);
@@ -256,6 +320,12 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
    */
   @Test
   public void testDispatchSystem() throws Exception {
+    Stream.of("e1", "e2", "e3", "f1", "f2").forEach(s -> {
+      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s,
 1);
+      compactionGroups.add(s);
+    });
+    getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       createTable(client, "dst1", "cs1");
       createTable(client, "dst2", "cs2");
@@ -280,6 +350,12 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
 
   @Test
   public void testDispatchUser() throws Exception {
+    Stream.of("h1", "h2", "g1").forEach(s -> {
+      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s,
 1);
+      compactionGroups.add(s);
+    });
+    getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
       createTable(client, "dut1", "cs3");
       createTable(client, "dut2", "cs3", "special", "cs4");
@@ -327,14 +403,8 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
   @Test
   public void testTooManyDeletes() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
-      Map<String,
-          String> props = Map.of(Property.TABLE_COMPACTION_SELECTOR.getKey(),
-              TooManyDeletesSelector.class.getName(),
-              Property.TABLE_COMPACTION_SELECTOR_OPTS.getKey() + "threshold", 
".4");
       var deleteSummarizerCfg =
           
SummarizerConfiguration.builder(DeletesSummarizer.class.getName()).build();
-      client.tableOperations().create("tmd_selector", new 
NewTableConfiguration()
-          .setProperties(props).enableSummarization(deleteSummarizerCfg));
       client.tableOperations().create("tmd_control1",
           new 
NewTableConfiguration().enableSummarization(deleteSummarizerCfg));
       client.tableOperations().create("tmd_control2",
@@ -342,9 +412,6 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
       client.tableOperations().create("tmd_control3",
           new 
NewTableConfiguration().enableSummarization(deleteSummarizerCfg));
 
-      addFile(client, "tmd_selector", 1, 1000, false);
-      addFile(client, "tmd_selector", 1, 1000, true);
-
       addFile(client, "tmd_control1", 1, 1000, false);
       addFile(client, "tmd_control1", 1, 1000, true);
 
@@ -358,10 +425,6 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
       assertEquals(2, getFiles(client, "tmd_control2").size());
       assertEquals(2, getFiles(client, "tmd_control3").size());
 
-      while (getFiles(client, "tmd_selector").size() != 0) {
-        Thread.sleep(100);
-      }
-
       assertEquals(2, getFiles(client, "tmd_control1").size());
       assertEquals(2, getFiles(client, "tmd_control2").size());
       assertEquals(2, getFiles(client, "tmd_control3").size());

Reply via email to