This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit b8869fb72e2aa58736250c52f3366f196ac86c79 Merge: 829634653b fdd73fb8b2 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Wed Dec 27 20:30:47 2023 +0000 Merge branch '2.1' .../spi/compaction/DefaultCompactionPlanner.java | 6 +- .../compaction/CompactionPlannerInitParams.java | 2 +- .../compaction/DefaultCompactionPlannerTest.java | 343 ++++++--------------- 3 files changed, 106 insertions(+), 245 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index b73544143e,9385806831..8f7969519a --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@@ -279,106 -210,105 +280,107 @@@ public class DefaultCompactionPlanner i determineMaxFilesToCompact(params); } - @SuppressWarnings("removal") ++ @SuppressWarnings("deprecation") private void determineMaxFilesToCompact(InitParameters params) { - String fqo = params.getFullyQualifiedOption("maxOpen"); - if (!params.getServiceEnvironment().getConfiguration().isSet(fqo) - && params.getServiceEnvironment().getConfiguration() - .isSet(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())) { - log.warn("The property " + Property.TSERV_MAJC_THREAD_MAXOPEN.getKey() - + " was set, it is deprecated. Set the " + fqo + " option instead."); - this.maxFilesToCompact = Integer.parseInt(params.getServiceEnvironment().getConfiguration() - .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())); - } else { - this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", - Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue())); + + String maxOpen = params.getOptions().get("maxOpen"); + if (maxOpen == null) { - maxOpen = "10"; - log.trace("default maxOpen not set, defaulting to 10"); ++ maxOpen = Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue(); ++ log.trace("default maxOpen not set, defaulting to {}", maxOpen); } + this.maxFilesToCompact = Integer.parseInt(maxOpen); } - @Override - public CompactionPlan makePlan(PlanningParameters params) { - try { + private void validateConfig(JsonElement json, List<String> fields, String className) { - if (params.getCandidates().isEmpty()) { - return params.createPlanBuilder().build(); - } + JsonObject jsonObject = GSON.get().fromJson(json, JsonObject.class); + + List<String> objectProperties = new ArrayList<>(jsonObject.keySet()); + HashSet<String> classFieldNames = new HashSet<>(fields); + + if (!classFieldNames.containsAll(objectProperties)) { + objectProperties.removeAll(classFieldNames); + throw new JsonParseException( + "Invalid fields: " + objectProperties + " provided for class: " + className); + } + } - Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates()); + @Override + public CompactionPlan makePlan(PlanningParameters params) { + if (params.getCandidates().isEmpty()) { + return params.createPlanBuilder().build(); + } - long maxSizeToCompact = getMaxSizeToCompact(params.getKind()); + Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates()); - Collection<CompactableFile> group; - if (params.getRunningCompactions().isEmpty()) { - group = findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, - maxSizeToCompact); + long maxSizeToCompact = getMaxSizeToCompact(params.getKind()); - if (!group.isEmpty() && group.size() < params.getCandidates().size() - && params.getCandidates().size() <= maxFilesToCompact - && (params.getKind() == CompactionKind.USER - || params.getKind() == CompactionKind.SELECTOR)) { - // USER and SELECTOR compactions must eventually compact all files. When a subset of files - // that meets the compaction ratio is selected, look ahead and see if the next compaction - // would also meet the compaction ratio. If not then compact everything to avoid doing - // more than logarithmic work across multiple comapctions. + Collection<CompactableFile> group; + if (params.getRunningCompactions().isEmpty()) { + group = + findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); - filesCopy.removeAll(group); - filesCopy.add(getExpected(group, 0)); + if (!group.isEmpty() && group.size() < params.getCandidates().size() + && params.getCandidates().size() <= maxFilesToCompact + && (params.getKind() == CompactionKind.USER + || params.getKind() == CompactionKind.SELECTOR)) { + // USER and SELECTOR compactions must eventually compact all files. When a subset of files + // that meets the compaction ratio is selected, look ahead and see if the next compaction + // would also meet the compaction ratio. If not then compact everything to avoid doing + // more than logarithmic work across multiple comapctions. - if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, - maxSizeToCompact).isEmpty()) { - // The next possible compaction does not meet the compaction ratio, so compact - // everything. - group = Set.copyOf(params.getCandidates()); - } + filesCopy.removeAll(group); + filesCopy.add(getExpected(group, 0)); + if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, + maxSizeToCompact).isEmpty()) { + // The next possible compaction does not meet the compaction ratio, so compact + // everything. + group = Set.copyOf(params.getCandidates()); } - } else if (params.getKind() == CompactionKind.SYSTEM) { - // This code determines if once the files compacting finish would they be included in a - // compaction with the files smaller than them? If so, then wait for the running compaction - // to complete. + } - // The set of files running compactions may produce - var expectedFiles = getExpected(params.getRunningCompactions()); + } else if (params.getKind() == CompactionKind.SYSTEM) { + // This code determines if once the files compacting finish would they be included in a + // compaction with the files smaller than them? If so, then wait for the running compaction + // to complete. - if (!Collections.disjoint(filesCopy, expectedFiles)) { - throw new AssertionError(); - } + // The set of files running compactions may produce + var expectedFiles = getExpected(params.getRunningCompactions()); - filesCopy.addAll(expectedFiles); + if (!Collections.disjoint(filesCopy, expectedFiles)) { + throw new AssertionError(); + } - group = findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, - maxSizeToCompact); + filesCopy.addAll(expectedFiles); - if (!Collections.disjoint(group, expectedFiles)) { - // file produced by running compaction will eventually compact with existing files, so - // wait. - group = Set.of(); - } - } else { + group = + findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + + if (!Collections.disjoint(group, expectedFiles)) { + // file produced by running compaction will eventually compact with existing files, so + // wait. group = Set.of(); } + } else { + group = Set.of(); + } - if (group.isEmpty() - && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR - || params.getKind() == CompactionKind.CHOP) - && params.getRunningCompactions().stream() - .noneMatch(job -> job.getKind() == params.getKind())) { - group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); - } + if (group.isEmpty() + && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR) + && params.getRunningCompactions().stream() + .noneMatch(job -> job.getKind() == params.getKind())) { + group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); + } - if (group.isEmpty()) { - return params.createPlanBuilder().build(); - } else { - // determine which executor to use based on the size of the files - var ceid = getExecutor(group); + if (group.isEmpty()) { + return params.createPlanBuilder().build(); + } else { + // determine which executor to use based on the size of the files + var ceid = getExecutor(group); - return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group) - .build(); - } - } catch (RuntimeException e) { - throw e; + return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build(); } } diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java index 3a3f72c699,eb287153d6..86fd8c478a --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java @@@ -61,7 -60,7 +61,7 @@@ public class CompactionPlannerInitParam @Override public String getFullyQualifiedOption(String key) { - return prefix + serviceId + ".opts." + key; - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".planner.opts." + key; ++ return prefix + serviceId + ".planner.opts." + key; } @Override diff --cc core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index c043eac841,ab2003841b..bafda93e3e --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@@ -20,10 -20,8 +20,9 @@@ package org.apache.accumulo.core.spi.co import static com.google.common.collect.MoreCollectors.onlyElement; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; - import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; import java.net.URISyntaxException; @@@ -34,19 -33,24 +34,23 @@@ import java.util.Set import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; + import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; + import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration; import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters; + import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; + import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; -import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.gson.JsonParseException; public class DefaultCompactionPlannerTest { @@@ -54,8 -58,14 +58,13 @@@ return c.stream().collect(onlyElement()); } + private static final Configuration defaultConf = + new ConfigurationImpl(DefaultConfiguration.getInstance()); + private static final CompactionServiceId csid = CompactionServiceId.of("cs1"); - - private static final Logger log = LoggerFactory.getLogger(DefaultCompactionPlannerTest.class); ++ private static final String prefix = Property.COMPACTION_SERVICE_PREFIX.getKey(); + @Test - public void testFindFilesToCompact() { + public void testFindFilesToCompact() throws Exception { testFFtC(createCFs("F4", "1M", "F5", "1M", "F6", "1M"), createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "1M", "F5", "1M", "F6", "1M"), @@@ -133,8 -143,14 +142,14 @@@ } @Test - public void testRunningCompaction() { + public void testRunningCompaction() throws Exception { - var planner = createPlanner(true); + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," + + "{'name':'huge','type': 'internal','numThreads':4}]"; + + var planner = createPlanner(defaultConf, executors); + var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@@ -156,12 -172,138 +171,21 @@@ // planner should compact. var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); } - /** - * Tests that the maxOpen property overrides the deprecated open.max property with the default - * service - */ @Test - @SuppressWarnings("removal") - public void testOverrideMaxOpenDefaultService() { - Map<String,String> overrides = new HashMap<>(); - // Set old property and use that for max open files. - overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); - SiteConfiguration aconf = SiteConfiguration.empty().withOverrides(overrides).build(); - ConfigurationImpl config = new ConfigurationImpl(aconf); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes(); - EasyMock.replay(senv); - - // Use the CompactionServicesConfig to create options based on default property values - var compactionServices = new CompactionServicesConfig(aconf, log::warn); - var options = compactionServices.getOptions().get("default"); - - var initParams = - new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); - - var planner = new DefaultCompactionPlanner(); - planner.init(initParams); - - var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M", "F6", - "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", "20M", "FC", "21M", "FD", - "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M"); - Set<CompactionJob> compacting = Set.of(); - var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - var plan = planner.makePlan(params); - var job = getOnlyElement(plan.getJobs()); - assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "large"), - job.getExecutor()); - - overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5"); - aconf = SiteConfiguration.empty().withOverrides(overrides).build(); - config = new ConfigurationImpl(aconf); - senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes(); - EasyMock.replay(senv); - - // Create new initParams so executor IDs can be reused - initParams = new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); - planner = new DefaultCompactionPlanner(); - planner.init(initParams); - - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M"), - job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "medium"), - job.getExecutor()); - } - - /** - * Tests that the maxOpen property overrides the deprecated open.max property - */ - @Test - @SuppressWarnings("removal") - public void testOverrideMaxOpen() { - Map<String,String> overrides = new HashMap<>(); - // Set old property and use that for max open files. - overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); - SiteConfiguration aconf = SiteConfiguration.empty().withOverrides(overrides).build(); - ConfigurationImpl config = new ConfigurationImpl(aconf); - - String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," - + "{'name':'huge','type': 'internal','numThreads':4}]"; - - var planner = createPlanner(config, executors); - var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", "16M", "F6", "32M", - "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", "FC", "2G", "FD", "4G", - "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G"); - Set<CompactionJob> compacting = Set.of(); - var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - var plan = planner.makePlan(params); - var job = getOnlyElement(plan.getJobs()); - assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), job.getExecutor()); - - // Set new property that overrides the old property. - overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", - "15"); - aconf = SiteConfiguration.empty().withOverrides(overrides).build(); - config = new ConfigurationImpl(aconf); - planner = createPlanner(config, executors); - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - - // 17 files that do not meet the compaction ratio. When max files to compact is 15, - // the plan should do 3 files then 15 - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); - - overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", - "5"); - aconf = SiteConfiguration.empty().withOverrides(overrides).build(); - // 17 files that do not meet the compaction ratio. When max files to compact is 5 should do 5, - // files then another 5, then the final 5. - config = new ConfigurationImpl(aconf); - planner = createPlanner(config, executors); - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", "F5", "16M"), - job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); - } - - @Test - public void testUserCompaction() { + public void testUserCompaction() throws Exception { - var planner = createPlanner(true); + ConfigurationCopy aconf = new ConfigurationCopy(DefaultConfiguration.getInstance()); - aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "15"); ++ aconf.set(prefix + "cs1.planner.opts.maxOpen", "15"); + ConfigurationImpl config = new ConfigurationImpl(aconf); + + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," + + "{'name':'huge','type': 'internal','numThreads':4}]"; + + var planner = createPlanner(config, executors); var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@@ -224,8 -366,12 +248,12 @@@ } @Test - public void testMaxSize() { + public void testMaxSize() throws Exception { - var planner = createPlanner(false); + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; + + var planner = createPlanner(defaultConf, executors); var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", "131M", "F5", "132M"); var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); var plan = planner.makePlan(params); @@@ -240,77 -386,9 +268,64 @@@ plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); } + @Test + public void testQueueCreation() throws Exception { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); + + String queues = "[{\"name\": \"small\", \"maxSize\":\"32M\"},{\"name\":\"midsize\"}]"; - planner.init(getInitParamQueues(senv, queues)); ++ planner.init(getInitParamQueues(defaultConf, queues)); + + var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "1M"); + var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); + var plan = planner.makePlan(params); + + var job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + + all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M"); + params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); + plan = planner.makePlan(params); + + job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.externalId("midsize"), job.getExecutor()); + } + + /** + * Tests that additional fields in the JSON objects cause errors to be thrown. + */ + @Test + public void testErrorAdditionalConfigFields() { - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - + DefaultCompactionPlanner QueuePlanner = new DefaultCompactionPlanner(); + + String queues = + "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"name\":\"largeQueue\", \"type\":\"internal\", \"foo\":\"bar\", \"queue\":\"broken\"}]"; + - final InitParameters queueParams = getInitParamQueues(senv, queues); ++ final InitParameters queueParams = getInitParamQueues(defaultConf, queues); + assertNotNull(queueParams); + var e = assertThrows(JsonParseException.class, () -> QueuePlanner.init(queueParams), + "Failed to throw error"); + assertTrue(e.getMessage().contains("[type, foo, queue]"), + "Error message didn't contain '[type, foo, queue]'"); + - String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", - "'type': 'internal','maxSize':'128M','numThreads':2, 'foo':'bar'", - "'type': 'internal','numThreads':1, 'unexpectedField':'foo'"); ++ String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," ++ + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2, 'foo':'bar'}," ++ + "{'name':'large','type': 'internal','numThreads':1, 'unexpectedField':'foo'}]"; + - final InitParameters execParams = getInitParams(senv, executors); ++ final InitParameters execParams = getInitParams(defaultConf, executors); + assertNotNull(execParams); + + DefaultCompactionPlanner ExecPlanner = new DefaultCompactionPlanner(); + var err = assertThrows(JsonParseException.class, () -> ExecPlanner.init(execParams), + "Failed to throw error"); + assertTrue(err.getMessage().contains("Invalid fields: [foo]"), + "Error message didn't contain '[foo]'"); + } + /** * Tests internal type executor with no numThreads set throws error */ @@@ -374,59 -434,6 +371,45 @@@ assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue"); } + /** + * Tests queue with missing name throws error + */ + @Test + public void testErrorQueueNoName() { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - + String queues = "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"maxSize\":\"120M\"}]"; + - final InitParameters params = getInitParamQueues(senv, queues); ++ final InitParameters params = getInitParamQueues(defaultConf, queues); + assertNotNull(params); + + var e = assertThrows(NullPointerException.class, () -> planner.init(params), + "Failed to throw error"); + assertEquals(e.getMessage(), "'name' must be specified", "Error message didn't contain 'name'"); + } + + /** + * Tests not having executors or queues throws errors + */ + @Test + public void testErrorNoExecutors() { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(conf, senv); - - var execParams = getInitParams(senv, ""); ++ var execParams = getInitParams(defaultConf, ""); + assertNotNull(execParams); + + var e = assertThrows(IllegalStateException.class, () -> planner.init(execParams), + "Failed to throw error"); + assertEquals("No defined executors or queues for this planner", e.getMessage(), + "Error message was not equal"); + - var params = getInitParamQueues(senv, ""); ++ var params = getInitParamQueues(defaultConf, ""); + assertNotNull(params); + + var e2 = assertThrows(IllegalStateException.class, () -> planner.init(params), + "Failed to throw error"); + assertEquals("No defined executors or queues for this planner", e2.getMessage(), + "Error message was not equal"); + } + /** * Tests executors can only have one without a max size. */ @@@ -471,92 -467,11 +443,12 @@@ private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob((short) all.size(), CompactionExecutorIdImpl.externalId("small"), files).build() - .getJobs().iterator().next(); + .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, "small"), files) + .build().getJobs().iterator().next(); } - private static Set<CompactableFile> createCFs(String... namesSizePairs) { + private static Set<CompactableFile> createCFs(String... namesSizePairs) + throws URISyntaxException { Set<CompactableFile> files = new HashSet<>(); for (int i = 0; i < namesSizePairs.length; i += 2) { @@@ -645,80 -564,30 +537,47 @@@ }; } - private static DefaultCompactionPlanner createPlanner(boolean withHugeExecutor) { - DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - Configuration conf = EasyMock.createMock(Configuration.class); - EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); ++ private static CompactionPlanner.InitParameters getInitParamQueues(Configuration conf, ++ String queues) { + - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); ++ String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen"); ++ Map<String,String> options = new HashMap<>(); ++ options.put("queues", queues.replaceAll("'", "\"")); + - EasyMock.replay(conf, senv); ++ if (maxOpen != null) { ++ options.put("maxOpen", maxOpen); ++ } + - StringBuilder execBldr = - new StringBuilder("[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}"); ++ ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); ++ EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); ++ EasyMock.replay(senv); + - if (withHugeExecutor) { - execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]"); - } else { - execBldr.append("]"); - } ++ return new CompactionPlannerInitParams(csid, prefix, options, senv); ++ } + - String executors = execBldr.toString().replaceAll("'", "\""); + private static CompactionPlanner.InitParameters getInitParams(Configuration conf, + String executors) { - planner.init(new CompactionPlanner.InitParameters() { - String maxOpen = - conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen"); ++ String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen"); + Map<String,String> options = new HashMap<>(); + options.put("executors", executors.replaceAll("'", "\"")); - @Override - public ServiceEnvironment getServiceEnvironment() { - return senv; - } + if (maxOpen != null) { + options.put("maxOpen", maxOpen); + } - @Override - public Map<String,String> getOptions() { - return Map.of("executors", executors, "maxOpen", "15"); - } + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(senv); - @Override - public String getFullyQualifiedOption(String key) { - assertEquals("maxOpen", key); - return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; - } - return new CompactionPlannerInitParams(csid, options, senv); ++ return new CompactionPlannerInitParams(csid, prefix, options, senv); + } - @Override - public ExecutorManager getExecutorManager() { - return new ExecutorManager() { - @Override - public CompactionExecutorId createExecutor(String name, int threads) { - switch (name) { - case "small": - assertEquals(1, threads); - break; - case "medium": - assertEquals(2, threads); - break; - case "large": - assertEquals(3, threads); - break; - case "huge": - assertEquals(4, threads); - break; - default: - fail("Unexpected name " + name); - break; - } - return CompactionExecutorIdImpl.externalId(name); - } - - @Override - public CompactionExecutorId getExternalExecutor(String name) { - throw new UnsupportedOperationException(); - } - }; - } - }); + private static DefaultCompactionPlanner createPlanner(Configuration conf, String executors) { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + var initParams = getInitParams(conf, executors); + planner.init(initParams); return planner; } }