This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5edcccc277 Enables CompactionExecutorIT tests (#4080) 5edcccc277 is described below commit 5edcccc277a3ad2773551860d7f38b3470bbefb1 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Mon Dec 18 22:26:42 2023 -0500 Enables CompactionExecutorIT tests (#4080) * Adds additional method to stop compactors by resource group names. * Remove failing compaction selector test --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../MiniAccumuloClusterControl.java | 20 +++ .../test/compaction/CompactionExecutorIT.java | 165 ++++++++++++++------- 2 files changed, 134 insertions(+), 51 deletions(-) diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 6d3b36026e..b3977dab00 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -254,6 +254,26 @@ public class MiniAccumuloClusterControl implements ClusterControl { stop(server, null); } + public void stopCompactorGroup(String compactorResourceGroup) { + synchronized (compactorProcesses) { + var group = compactorProcesses.get(compactorResourceGroup); + if (group == null) { + return; + } + group.forEach(process -> { + try { + cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + log.warn("Compactor did not fully stop after 30 seconds", e); + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + compactorProcesses.remove(compactorResourceGroup); + } + } + @Override public synchronized void stop(ServerType server, String hostname) throws IOException { switch (server) { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java index a7309cb693..19f6d0bb30 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.util.LazySingletons.GSON; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -29,7 +30,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -38,6 +39,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -67,36 +69,50 @@ import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionPlan; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; -@Disabled // ELASTICITY_TODO public class CompactionExecutorIT extends SharedMiniClusterBase { + public static final List<String> compactionGroups = new LinkedList<>(); + public static final Logger log = LoggerFactory.getLogger(CompactionExecutorIT.class); public static class TestPlanner implements CompactionPlanner { + private static class ExecutorConfig { + String group; + } + private int filesPerCompaction; private List<CompactionExecutorId> executorIds; private EnumSet<CompactionKind> kindsToProcess = EnumSet.noneOf(CompactionKind.class); @Override public void init(InitParameters params) { - var executors = Integer.parseInt(params.getOptions().get("executors")); + var executors = params.getOptions().get("executors"); this.filesPerCompaction = Integer.parseInt(params.getOptions().get("filesPerCompaction")); this.executorIds = new ArrayList<>(); for (String kind : params.getOptions().get("process").split(",")) { kindsToProcess.add(CompactionKind.valueOf(kind.toUpperCase())); } - for (int i = 0; i < executors; i++) { - var ceid = params.getExecutorManager().createExecutor("e" + i, 2); + for (JsonElement element : GSON.get().fromJson(executors, JsonArray.class)) { + ExecutorConfig executorConfig = GSON.get().fromJson(element, ExecutorConfig.class); + var ceid = params.getExecutorManager().getExternalExecutor(executorConfig.group); executorIds.add(ceid); } @@ -137,40 +153,57 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { } } - @BeforeAll - public static void setup() throws Exception { - SharedMiniClusterBase.startMiniClusterWithConfig((miniCfg, coreSite) -> { - Map<String,String> siteCfg = new HashMap<>(); - + public static class CompactionExecutorITConfig implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { var csp = Property.COMPACTION_SERVICE_PREFIX.getKey(); - siteCfg.put(csp + "cs1.planner", TestPlanner.class.getName()); - siteCfg.put(csp + "cs1.planner.opts.executors", "3"); - siteCfg.put(csp + "cs1.planner.opts.filesPerCompaction", "5"); - siteCfg.put(csp + "cs1.planner.opts.process", "SYSTEM"); - - siteCfg.put(csp + "cs2.planner", TestPlanner.class.getName()); - siteCfg.put(csp + "cs2.planner.opts.executors", "2"); - siteCfg.put(csp + "cs2.planner.opts.filesPerCompaction", "7"); - siteCfg.put(csp + "cs2.planner.opts.process", "SYSTEM"); - - siteCfg.put(csp + "cs3.planner", TestPlanner.class.getName()); - siteCfg.put(csp + "cs3.planner.opts.executors", "1"); - siteCfg.put(csp + "cs3.planner.opts.filesPerCompaction", "3"); - siteCfg.put(csp + "cs3.planner.opts.process", "USER"); - - siteCfg.put(csp + "cs4.planner", TestPlanner.class.getName()); - siteCfg.put(csp + "cs4.planner.opts.executors", "2"); - siteCfg.put(csp + "cs4.planner.opts.filesPerCompaction", "11"); - siteCfg.put(csp + "cs4.planner.opts.process", "USER"); + cfg.setProperty(csp + "cs1.planner", TestPlanner.class.getName()); + cfg.setProperty(csp + "cs1.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'e1'}," + + "{'name':'two', 'type':'external', 'group':'e2'}, {'name':'three', 'type':'external','group':'e3'}]"); + cfg.setProperty(csp + "cs1.planner.opts.filesPerCompaction", "5"); + cfg.setProperty(csp + "cs1.planner.opts.process", "SYSTEM"); + + cfg.setProperty(csp + "cs2.planner", TestPlanner.class.getName()); + cfg.setProperty(csp + "cs2.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'f1'}," + + "{'name':'two', 'type':'external', 'group':'f2'}]"); + cfg.setProperty(csp + "cs2.planner.opts.filesPerCompaction", "7"); + cfg.setProperty(csp + "cs2.planner.opts.process", "SYSTEM"); + + cfg.setProperty(csp + "cs3.planner", TestPlanner.class.getName()); + cfg.setProperty(csp + "cs3.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'g1'}]"); + cfg.setProperty(csp + "cs3.planner.opts.filesPerCompaction", "3"); + cfg.setProperty(csp + "cs3.planner.opts.process", "USER"); + + cfg.setProperty(csp + "cs4.planner", TestPlanner.class.getName()); + cfg.setProperty(csp + "cs4.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'h1'}," + + "{'name':'two', 'type':'external', 'group':'h2'}]"); + cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11"); + cfg.setProperty(csp + "cs4.planner.opts.process", "USER"); // this is meant to be dynamically reconfigured - siteCfg.put(csp + "recfg.planner", TestPlanner.class.getName()); - siteCfg.put(csp + "recfg.planner.opts.executors", "2"); - siteCfg.put(csp + "recfg.planner.opts.filesPerCompaction", "11"); - siteCfg.put(csp + "recfg.planner.opts.process", "SYSTEM"); + cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName()); + cfg.setProperty(csp + "recfg.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'i1'}," + + "{'name':'two', 'type':'external', 'group':'i2'}]"); + cfg.setProperty(csp + "recfg.planner.opts.filesPerCompaction", "11"); + cfg.setProperty(csp + "recfg.planner.opts.process", "SYSTEM"); + + Stream.of("e1", "e2", "e3", "f1", "f2", "g1", "h1", "h2", "i1", "i2") + .forEach(s -> cfg.getClusterServerConfiguration().addCompactorResourceGroup(s, 0)); + + // use raw local file system + conf.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + } - miniCfg.setSiteConfig(siteCfg); - }); + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase + .startMiniClusterWithConfig(new CompactionExecutorIT.CompactionExecutorITConfig()); } @AfterAll @@ -180,6 +213,18 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { @AfterEach public void cleanup() { + var iter = compactionGroups.iterator(); + while (iter.hasNext()) { + var group = iter.next(); + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(group, 0); + try { + getCluster().getClusterControl().stopCompactorGroup(group); + } catch (Exception e) { + log.warn("Compaction group: {} failed to fully stop", group, e); + } finally { + iter.remove(); + } + } try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { client.tableOperations().list().stream() .filter( @@ -196,6 +241,12 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { @Test public void testReconfigureCompactionService() throws Exception { + Stream.of("i1", "i2").forEach(g -> { + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(g, 1); + compactionGroups.add(g); + }); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { createTable(client, "rctt", "recfg"); @@ -211,7 +262,12 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.filesPerCompaction", "5"); client.instanceOperations().setProperty( - Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", "1"); + Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", + "[{'name':'small', 'type':'external', 'group':'group1'}]"); + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup("group1", + 1); + compactionGroups.add("group1"); + getCluster().getClusterControl().start(ServerType.COMPACTOR); addFiles(client, "rctt", 10); @@ -232,11 +288,19 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { client.instanceOperations().setProperty( Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.process", "SYSTEM"); client.instanceOperations().setProperty( - Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "3"); + Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", + "[{'name':'one', 'type':'external', 'group':'add1'}," + + "{'name':'two', 'type':'external', 'group':'add2'}, {'name':'three', 'type':'external','group':'add3'}]"); client.instanceOperations().setProperty( Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", TestPlanner.class.getName()); + Stream.of("add1", "add2", "add3").forEach(s -> { + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s, 1); + compactionGroups.add(s); + }); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + createTable(client, "acst", "newcs"); addFiles(client, "acst", 42); @@ -256,6 +320,12 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { */ @Test public void testDispatchSystem() throws Exception { + Stream.of("e1", "e2", "e3", "f1", "f2").forEach(s -> { + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s, 1); + compactionGroups.add(s); + }); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { createTable(client, "dst1", "cs1"); createTable(client, "dst2", "cs2"); @@ -280,6 +350,12 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { @Test public void testDispatchUser() throws Exception { + Stream.of("h1", "h2", "g1").forEach(s -> { + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(s, 1); + compactionGroups.add(s); + }); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { createTable(client, "dut1", "cs3"); createTable(client, "dut2", "cs3", "special", "cs4"); @@ -327,14 +403,8 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { @Test public void testTooManyDeletes() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - Map<String, - String> props = Map.of(Property.TABLE_COMPACTION_SELECTOR.getKey(), - TooManyDeletesSelector.class.getName(), - Property.TABLE_COMPACTION_SELECTOR_OPTS.getKey() + "threshold", ".4"); var deleteSummarizerCfg = SummarizerConfiguration.builder(DeletesSummarizer.class.getName()).build(); - client.tableOperations().create("tmd_selector", new NewTableConfiguration() - .setProperties(props).enableSummarization(deleteSummarizerCfg)); client.tableOperations().create("tmd_control1", new NewTableConfiguration().enableSummarization(deleteSummarizerCfg)); client.tableOperations().create("tmd_control2", @@ -342,9 +412,6 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { client.tableOperations().create("tmd_control3", new NewTableConfiguration().enableSummarization(deleteSummarizerCfg)); - addFile(client, "tmd_selector", 1, 1000, false); - addFile(client, "tmd_selector", 1, 1000, true); - addFile(client, "tmd_control1", 1, 1000, false); addFile(client, "tmd_control1", 1, 1000, true); @@ -358,10 +425,6 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { assertEquals(2, getFiles(client, "tmd_control2").size()); assertEquals(2, getFiles(client, "tmd_control3").size()); - while (getFiles(client, "tmd_selector").size() != 0) { - Thread.sleep(100); - } - assertEquals(2, getFiles(client, "tmd_control1").size()); assertEquals(2, getFiles(client, "tmd_control2").size()); assertEquals(2, getFiles(client, "tmd_control3").size());