This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit b8869fb72e2aa58736250c52f3366f196ac86c79
Merge: 829634653b fdd73fb8b2
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Dec 27 20:30:47 2023 +0000

    Merge branch '2.1'

 .../spi/compaction/DefaultCompactionPlanner.java   |   6 +-
 .../compaction/CompactionPlannerInitParams.java    |   2 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 343 ++++++---------------
 3 files changed, 106 insertions(+), 245 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index b73544143e,9385806831..8f7969519a
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@@ -279,106 -210,105 +280,107 @@@ public class DefaultCompactionPlanner i
      determineMaxFilesToCompact(params);
    }
  
 -  @SuppressWarnings("removal")
++  @SuppressWarnings("deprecation")
    private void determineMaxFilesToCompact(InitParameters params) {
 -    String fqo = params.getFullyQualifiedOption("maxOpen");
 -    if (!params.getServiceEnvironment().getConfiguration().isSet(fqo)
 -        && params.getServiceEnvironment().getConfiguration()
 -            .isSet(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())) {
 -      log.warn("The property " + Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()
 -          + " was set, it is deprecated.  Set the " + fqo + " option 
instead.");
 -      this.maxFilesToCompact = 
Integer.parseInt(params.getServiceEnvironment().getConfiguration()
 -          .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
 -    } else {
 -      this.maxFilesToCompact = 
Integer.parseInt(params.getOptions().getOrDefault("maxOpen",
 -          
Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue()));
 +
 +    String maxOpen = params.getOptions().get("maxOpen");
 +    if (maxOpen == null) {
-       maxOpen = "10";
-       log.trace("default maxOpen not set, defaulting to 10");
++      maxOpen = 
Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue();
++      log.trace("default maxOpen not set, defaulting to {}", maxOpen);
      }
 +    this.maxFilesToCompact = Integer.parseInt(maxOpen);
    }
  
 -  @Override
 -  public CompactionPlan makePlan(PlanningParameters params) {
 -    try {
 +  private void validateConfig(JsonElement json, List<String> fields, String 
className) {
  
 -      if (params.getCandidates().isEmpty()) {
 -        return params.createPlanBuilder().build();
 -      }
 +    JsonObject jsonObject = GSON.get().fromJson(json, JsonObject.class);
 +
 +    List<String> objectProperties = new ArrayList<>(jsonObject.keySet());
 +    HashSet<String> classFieldNames = new HashSet<>(fields);
 +
 +    if (!classFieldNames.containsAll(objectProperties)) {
 +      objectProperties.removeAll(classFieldNames);
 +      throw new JsonParseException(
 +          "Invalid fields: " + objectProperties + " provided for class: " + 
className);
 +    }
 +  }
  
 -      Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
 +  @Override
 +  public CompactionPlan makePlan(PlanningParameters params) {
 +    if (params.getCandidates().isEmpty()) {
 +      return params.createPlanBuilder().build();
 +    }
  
 -      long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
 +    Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
  
 -      Collection<CompactableFile> group;
 -      if (params.getRunningCompactions().isEmpty()) {
 -        group = findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact,
 -            maxSizeToCompact);
 +    long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
  
 -        if (!group.isEmpty() && group.size() < params.getCandidates().size()
 -            && params.getCandidates().size() <= maxFilesToCompact
 -            && (params.getKind() == CompactionKind.USER
 -                || params.getKind() == CompactionKind.SELECTOR)) {
 -          // USER and SELECTOR compactions must eventually compact all files. 
When a subset of files
 -          // that meets the compaction ratio is selected, look ahead and see 
if the next compaction
 -          // would also meet the compaction ratio. If not then compact 
everything to avoid doing
 -          // more than logarithmic work across multiple comapctions.
 +    Collection<CompactableFile> group;
 +    if (params.getRunningCompactions().isEmpty()) {
 +      group =
 +          findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact, maxSizeToCompact);
  
 -          filesCopy.removeAll(group);
 -          filesCopy.add(getExpected(group, 0));
 +      if (!group.isEmpty() && group.size() < params.getCandidates().size()
 +          && params.getCandidates().size() <= maxFilesToCompact
 +          && (params.getKind() == CompactionKind.USER
 +              || params.getKind() == CompactionKind.SELECTOR)) {
 +        // USER and SELECTOR compactions must eventually compact all files. 
When a subset of files
 +        // that meets the compaction ratio is selected, look ahead and see if 
the next compaction
 +        // would also meet the compaction ratio. If not then compact 
everything to avoid doing
 +        // more than logarithmic work across multiple comapctions.
  
 -          if (findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact,
 -              maxSizeToCompact).isEmpty()) {
 -            // The next possible compaction does not meet the compaction 
ratio, so compact
 -            // everything.
 -            group = Set.copyOf(params.getCandidates());
 -          }
 +        filesCopy.removeAll(group);
 +        filesCopy.add(getExpected(group, 0));
  
 +        if (findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact,
 +            maxSizeToCompact).isEmpty()) {
 +          // The next possible compaction does not meet the compaction ratio, 
so compact
 +          // everything.
 +          group = Set.copyOf(params.getCandidates());
          }
  
 -      } else if (params.getKind() == CompactionKind.SYSTEM) {
 -        // This code determines if once the files compacting finish would 
they be included in a
 -        // compaction with the files smaller than them? If so, then wait for 
the running compaction
 -        // to complete.
 +      }
  
 -        // The set of files running compactions may produce
 -        var expectedFiles = getExpected(params.getRunningCompactions());
 +    } else if (params.getKind() == CompactionKind.SYSTEM) {
 +      // This code determines if once the files compacting finish would they 
be included in a
 +      // compaction with the files smaller than them? If so, then wait for 
the running compaction
 +      // to complete.
  
 -        if (!Collections.disjoint(filesCopy, expectedFiles)) {
 -          throw new AssertionError();
 -        }
 +      // The set of files running compactions may produce
 +      var expectedFiles = getExpected(params.getRunningCompactions());
  
 -        filesCopy.addAll(expectedFiles);
 +      if (!Collections.disjoint(filesCopy, expectedFiles)) {
 +        throw new AssertionError();
 +      }
  
 -        group = findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact,
 -            maxSizeToCompact);
 +      filesCopy.addAll(expectedFiles);
  
 -        if (!Collections.disjoint(group, expectedFiles)) {
 -          // file produced by running compaction will eventually compact with 
existing files, so
 -          // wait.
 -          group = Set.of();
 -        }
 -      } else {
 +      group =
 +          findDataFilesToCompact(filesCopy, params.getRatio(), 
maxFilesToCompact, maxSizeToCompact);
 +
 +      if (!Collections.disjoint(group, expectedFiles)) {
 +        // file produced by running compaction will eventually compact with 
existing files, so
 +        // wait.
          group = Set.of();
        }
 +    } else {
 +      group = Set.of();
 +    }
  
 -      if (group.isEmpty()
 -          && (params.getKind() == CompactionKind.USER || params.getKind() == 
CompactionKind.SELECTOR
 -              || params.getKind() == CompactionKind.CHOP)
 -          && params.getRunningCompactions().stream()
 -              .noneMatch(job -> job.getKind() == params.getKind())) {
 -        group = findMaximalRequiredSetToCompact(params.getCandidates(), 
maxFilesToCompact);
 -      }
 +    if (group.isEmpty()
 +        && (params.getKind() == CompactionKind.USER || params.getKind() == 
CompactionKind.SELECTOR)
 +        && params.getRunningCompactions().stream()
 +            .noneMatch(job -> job.getKind() == params.getKind())) {
 +      group = findMaximalRequiredSetToCompact(params.getCandidates(), 
maxFilesToCompact);
 +    }
  
 -      if (group.isEmpty()) {
 -        return params.createPlanBuilder().build();
 -      } else {
 -        // determine which executor to use based on the size of the files
 -        var ceid = getExecutor(group);
 +    if (group.isEmpty()) {
 +      return params.createPlanBuilder().build();
 +    } else {
 +      // determine which executor to use based on the size of the files
 +      var ceid = getExecutor(group);
  
 -        return params.createPlanBuilder().addJob(createPriority(params, 
group), ceid, group)
 -            .build();
 -      }
 -    } catch (RuntimeException e) {
 -      throw e;
 +      return params.createPlanBuilder().addJob(createPriority(params, group), 
ceid, group).build();
      }
    }
  
diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
index 3a3f72c699,eb287153d6..86fd8c478a
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
@@@ -61,7 -60,7 +61,7 @@@ public class CompactionPlannerInitParam
  
    @Override
    public String getFullyQualifiedOption(String key) {
-     return prefix + serviceId + ".opts." + key;
 -    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".planner.opts." + key;
++    return prefix + serviceId + ".planner.opts." + key;
    }
  
    @Override
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index c043eac841,ab2003841b..bafda93e3e
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@@ -20,10 -20,8 +20,9 @@@ package org.apache.accumulo.core.spi.co
  
  import static com.google.common.collect.MoreCollectors.onlyElement;
  import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertNotNull;
  import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
- import static org.junit.jupiter.api.Assertions.fail;
  
  import java.net.URI;
  import java.net.URISyntaxException;
@@@ -34,19 -33,24 +34,23 @@@ import java.util.Set
  import java.util.stream.Collectors;
  
  import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+ import org.apache.accumulo.core.conf.ConfigurationCopy;
  import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+ import org.apache.accumulo.core.conf.DefaultConfiguration;
  import org.apache.accumulo.core.conf.Property;
 -import org.apache.accumulo.core.conf.SiteConfiguration;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.spi.common.ServiceEnvironment;
  import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration;
  import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder;
 +import 
org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters;
+ import org.apache.accumulo.core.util.ConfigurationImpl;
  import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
  import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+ import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 -import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
  import org.easymock.EasyMock;
  import org.junit.jupiter.api.Test;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +
 +import com.google.gson.JsonParseException;
  
  public class DefaultCompactionPlannerTest {
  
@@@ -54,8 -58,14 +58,13 @@@
      return c.stream().collect(onlyElement());
    }
  
+   private static final Configuration defaultConf =
+       new ConfigurationImpl(DefaultConfiguration.getInstance());
+   private static final CompactionServiceId csid = 
CompactionServiceId.of("cs1");
 -
 -  private static final Logger log = 
LoggerFactory.getLogger(DefaultCompactionPlannerTest.class);
++  private static final String prefix = 
Property.COMPACTION_SERVICE_PREFIX.getKey();
+ 
    @Test
 -  public void testFindFilesToCompact() {
 +  public void testFindFilesToCompact() throws Exception {
  
      testFFtC(createCFs("F4", "1M", "F5", "1M", "F6", "1M"),
          createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "1M", "F5", 
"1M", "F6", "1M"),
@@@ -133,8 -143,14 +142,14 @@@
    }
  
    @Test
 -  public void testRunningCompaction() {
 +  public void testRunningCompaction() throws Exception {
-     var planner = createPlanner(true);
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+         + "{'name':'huge','type': 'internal','numThreads':4}]";
+ 
+     var planner = createPlanner(defaultConf, executors);
+ 
      var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
      var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
      var compacting =
@@@ -156,12 -172,138 +171,21 @@@
      // planner should compact.
      var job = getOnlyElement(plan.getJobs());
      assertEquals(candidates, job.getFiles());
-     assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+     assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
    }
  
 -  /**
 -   * Tests that the maxOpen property overrides the deprecated open.max 
property with the default
 -   * service
 -   */
    @Test
 -  @SuppressWarnings("removal")
 -  public void testOverrideMaxOpenDefaultService() {
 -    Map<String,String> overrides = new HashMap<>();
 -    // Set old property and use that for max open files.
 -    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
 -    SiteConfiguration aconf = 
SiteConfiguration.empty().withOverrides(overrides).build();
 -    ConfigurationImpl config = new ConfigurationImpl(aconf);
 -
 -    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
 -    EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes();
 -    EasyMock.replay(senv);
 -
 -    // Use the CompactionServicesConfig to create options based on default 
property values
 -    var compactionServices = new CompactionServicesConfig(aconf, log::warn);
 -    var options = compactionServices.getOptions().get("default");
 -
 -    var initParams =
 -        new CompactionPlannerInitParams(CompactionServiceId.of("default"), 
options, senv);
 -
 -    var planner = new DefaultCompactionPlanner();
 -    planner.init(initParams);
 -
 -    var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", 
"F5", "14M", "F6",
 -        "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", 
"20M", "FC", "21M", "FD",
 -        "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M");
 -    Set<CompactionJob> compacting = Set.of();
 -    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
 -    var plan = planner.makePlan(params);
 -    var job = getOnlyElement(plan.getJobs());
 -    assertEquals(all, job.getFiles());
 -    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "large"),
 -        job.getExecutor());
 -
 -    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5");
 -    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
 -    config = new ConfigurationImpl(aconf);
 -    senv = EasyMock.createMock(ServiceEnvironment.class);
 -    EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes();
 -    EasyMock.replay(senv);
 -
 -    // Create new initParams so executor IDs can be reused
 -    initParams = new 
CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv);
 -    planner = new DefaultCompactionPlanner();
 -    planner.init(initParams);
 -
 -    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
 -    plan = planner.makePlan(params);
 -    job = getOnlyElement(plan.getJobs());
 -    assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", 
"13M", "F5", "14M"),
 -        job.getFiles());
 -    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "medium"),
 -        job.getExecutor());
 -  }
 -
 -  /**
 -   * Tests that the maxOpen property overrides the deprecated open.max 
property
 -   */
 -  @Test
 -  @SuppressWarnings("removal")
 -  public void testOverrideMaxOpen() {
 -    Map<String,String> overrides = new HashMap<>();
 -    // Set old property and use that for max open files.
 -    overrides.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
 -    SiteConfiguration aconf = 
SiteConfiguration.empty().withOverrides(overrides).build();
 -    ConfigurationImpl config = new ConfigurationImpl(aconf);
 -
 -    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
 -        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
 -        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
 -        + "{'name':'huge','type': 'internal','numThreads':4}]";
 -
 -    var planner = createPlanner(config, executors);
 -    var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", 
"16M", "F6", "32M",
 -        "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", 
"FC", "2G", "FD", "4G",
 -        "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G");
 -    Set<CompactionJob> compacting = Set.of();
 -    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
 -    var plan = planner.makePlan(params);
 -    var job = getOnlyElement(plan.getJobs());
 -    assertEquals(all, job.getFiles());
 -    assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), 
job.getExecutor());
 -
 -    // Set new property that overrides the old property.
 -    overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen",
 -        "15");
 -    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
 -    config = new ConfigurationImpl(aconf);
 -    planner = createPlanner(config, executors);
 -    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
 -    plan = planner.makePlan(params);
 -
 -    // 17 files that do not meet the compaction ratio. When max files to 
compact is 15,
 -    // the plan should do 3 files then 15
 -    job = getOnlyElement(plan.getJobs());
 -    assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
 -    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 -
 -    overrides.put(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen",
 -        "5");
 -    aconf = SiteConfiguration.empty().withOverrides(overrides).build();
 -    // 17 files that do not meet the compaction ratio. When max files to 
compact is 5 should do 5,
 -    // files then another 5, then the final 5.
 -    config = new ConfigurationImpl(aconf);
 -    planner = createPlanner(config, executors);
 -    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
 -    plan = planner.makePlan(params);
 -    job = getOnlyElement(plan.getJobs());
 -    assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", 
"F5", "16M"),
 -        job.getFiles());
 -    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 -  }
 -
 -  @Test
 -  public void testUserCompaction() {
 +  public void testUserCompaction() throws Exception {
-     var planner = createPlanner(true);
+     ConfigurationCopy aconf = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
 -    aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "15");
++    aconf.set(prefix + "cs1.planner.opts.maxOpen", "15");
+     ConfigurationImpl config = new ConfigurationImpl(aconf);
+ 
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+         + "{'name':'huge','type': 'internal','numThreads':4}]";
+ 
+     var planner = createPlanner(config, executors);
      var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
      var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
      var compacting =
@@@ -224,8 -366,12 +248,12 @@@
    }
  
    @Test
 -  public void testMaxSize() {
 +  public void testMaxSize() throws Exception {
-     var planner = createPlanner(false);
+     String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
+ 
+     var planner = createPlanner(defaultConf, executors);
      var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", 
"131M", "F5", "132M");
      var params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
      var plan = planner.makePlan(params);
@@@ -240,77 -386,9 +268,64 @@@
      plan = planner.makePlan(params);
      job = getOnlyElement(plan.getJobs());
      assertEquals(all, job.getFiles());
-     assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+     assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
    }
  
 +  @Test
 +  public void testQueueCreation() throws Exception {
 +    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
 +
 +    String queues = "[{\"name\": \"small\", 
\"maxSize\":\"32M\"},{\"name\":\"midsize\"}]";
-     planner.init(getInitParamQueues(senv, queues));
++    planner.init(getInitParamQueues(defaultConf, queues));
 +
 +    var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "1M");
 +    var params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
 +    var plan = planner.makePlan(params);
 +
 +    var job = getOnlyElement(plan.getJobs());
 +    assertEquals(all, job.getFiles());
 +    assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
 +
 +    all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M");
 +    params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
 +    plan = planner.makePlan(params);
 +
 +    job = getOnlyElement(plan.getJobs());
 +    assertEquals(all, job.getFiles());
 +    assertEquals(CompactionExecutorIdImpl.externalId("midsize"), 
job.getExecutor());
 +  }
 +
 +  /**
 +   * Tests that additional fields in the JSON objects cause errors to be 
thrown.
 +   */
 +  @Test
 +  public void testErrorAdditionalConfigFields() {
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
- 
 +    DefaultCompactionPlanner QueuePlanner = new DefaultCompactionPlanner();
 +
 +    String queues =
 +        "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, 
{\"name\":\"largeQueue\", \"type\":\"internal\", \"foo\":\"bar\", 
\"queue\":\"broken\"}]";
 +
-     final InitParameters queueParams = getInitParamQueues(senv, queues);
++    final InitParameters queueParams = getInitParamQueues(defaultConf, 
queues);
 +    assertNotNull(queueParams);
 +    var e = assertThrows(JsonParseException.class, () -> 
QueuePlanner.init(queueParams),
 +        "Failed to throw error");
 +    assertTrue(e.getMessage().contains("[type, foo, queue]"),
 +        "Error message didn't contain '[type, foo, queue]'");
 +
-     String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-         "'type': 'internal','maxSize':'128M','numThreads':2, 'foo':'bar'",
-         "'type': 'internal','numThreads':1, 'unexpectedField':'foo'");
++    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
++        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2, 'foo':'bar'},"
++        + "{'name':'large','type': 'internal','numThreads':1, 
'unexpectedField':'foo'}]";
 +
-     final InitParameters execParams = getInitParams(senv, executors);
++    final InitParameters execParams = getInitParams(defaultConf, executors);
 +    assertNotNull(execParams);
 +
 +    DefaultCompactionPlanner ExecPlanner = new DefaultCompactionPlanner();
 +    var err = assertThrows(JsonParseException.class, () -> 
ExecPlanner.init(execParams),
 +        "Failed to throw error");
 +    assertTrue(err.getMessage().contains("Invalid fields: [foo]"),
 +        "Error message didn't contain '[foo]'");
 +  }
 +
    /**
     * Tests internal type executor with no numThreads set throws error
     */
@@@ -374,59 -434,6 +371,45 @@@
      assertTrue(e.getMessage().contains("queue"), "Error message didn't 
contain queue");
    }
  
 +  /**
 +   * Tests queue with missing name throws error
 +   */
 +  @Test
 +  public void testErrorQueueNoName() {
 +    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
- 
 +    String queues = "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, 
{\"maxSize\":\"120M\"}]";
 +
-     final InitParameters params = getInitParamQueues(senv, queues);
++    final InitParameters params = getInitParamQueues(defaultConf, queues);
 +    assertNotNull(params);
 +
 +    var e = assertThrows(NullPointerException.class, () -> 
planner.init(params),
 +        "Failed to throw error");
 +    assertEquals(e.getMessage(), "'name' must be specified", "Error message 
didn't contain 'name'");
 +  }
 +
 +  /**
 +   * Tests not having executors or queues throws errors
 +   */
 +  @Test
 +  public void testErrorNoExecutors() {
 +    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
- 
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-     EasyMock.replay(conf, senv);
- 
-     var execParams = getInitParams(senv, "");
++    var execParams = getInitParams(defaultConf, "");
 +    assertNotNull(execParams);
 +
 +    var e = assertThrows(IllegalStateException.class, () -> 
planner.init(execParams),
 +        "Failed to throw error");
 +    assertEquals("No defined executors or queues for this planner", 
e.getMessage(),
 +        "Error message was not equal");
 +
-     var params = getInitParamQueues(senv, "");
++    var params = getInitParamQueues(defaultConf, "");
 +    assertNotNull(params);
 +
 +    var e2 = assertThrows(IllegalStateException.class, () -> 
planner.init(params),
 +        "Failed to throw error");
 +    assertEquals("No defined executors or queues for this planner", 
e2.getMessage(),
 +        "Error message was not equal");
 +  }
 +
    /**
     * Tests executors can only have one without a max size.
     */
@@@ -471,92 -467,11 +443,12 @@@
    private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
        Set<CompactableFile> files) {
      return new CompactionPlanImpl.BuilderImpl(kind, all, all)
-         .addJob((short) all.size(), 
CompactionExecutorIdImpl.externalId("small"), files).build()
-         .getJobs().iterator().next();
+         .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, 
"small"), files)
+         .build().getJobs().iterator().next();
    }
  
 -  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
 +  private static Set<CompactableFile> createCFs(String... namesSizePairs)
 +      throws URISyntaxException {
      Set<CompactableFile> files = new HashSet<>();
  
      for (int i = 0; i < namesSizePairs.length; i += 2) {
@@@ -645,80 -564,30 +537,47 @@@
      };
    }
  
-   private static DefaultCompactionPlanner createPlanner(boolean 
withHugeExecutor) {
-     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-     Configuration conf = EasyMock.createMock(Configuration.class);
-     
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
++  private static CompactionPlanner.InitParameters 
getInitParamQueues(Configuration conf,
++      String queues) {
 +
-     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
++    String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
++    Map<String,String> options = new HashMap<>();
++    options.put("queues", queues.replaceAll("'", "\""));
 +
-     EasyMock.replay(conf, senv);
++    if (maxOpen != null) {
++      options.put("maxOpen", maxOpen);
++    }
 +
-     StringBuilder execBldr =
-         new StringBuilder("[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
-             + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
-             + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}");
++    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
++    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
++    EasyMock.replay(senv);
 +
-     if (withHugeExecutor) {
-       execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]");
-     } else {
-       execBldr.append("]");
-     }
++    return new CompactionPlannerInitParams(csid, prefix, options, senv);
++  }
 +
-     String executors = execBldr.toString().replaceAll("'", "\"");
+   private static CompactionPlanner.InitParameters getInitParams(Configuration 
conf,
+       String executors) {
  
-     planner.init(new CompactionPlanner.InitParameters() {
 -    String maxOpen =
 -        conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen");
++    String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
+     Map<String,String> options = new HashMap<>();
+     options.put("executors", executors.replaceAll("'", "\""));
  
-       @Override
-       public ServiceEnvironment getServiceEnvironment() {
-         return senv;
-       }
+     if (maxOpen != null) {
+       options.put("maxOpen", maxOpen);
+     }
  
-       @Override
-       public Map<String,String> getOptions() {
-         return Map.of("executors", executors, "maxOpen", "15");
-       }
+     ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+     EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+     EasyMock.replay(senv);
  
-       @Override
-       public String getFullyQualifiedOption(String key) {
-         assertEquals("maxOpen", key);
-         return Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-       }
 -    return new CompactionPlannerInitParams(csid, options, senv);
++    return new CompactionPlannerInitParams(csid, prefix, options, senv);
+   }
  
-       @Override
-       public ExecutorManager getExecutorManager() {
-         return new ExecutorManager() {
-           @Override
-           public CompactionExecutorId createExecutor(String name, int 
threads) {
-             switch (name) {
-               case "small":
-                 assertEquals(1, threads);
-                 break;
-               case "medium":
-                 assertEquals(2, threads);
-                 break;
-               case "large":
-                 assertEquals(3, threads);
-                 break;
-               case "huge":
-                 assertEquals(4, threads);
-                 break;
-               default:
-                 fail("Unexpected name " + name);
-                 break;
-             }
-             return CompactionExecutorIdImpl.externalId(name);
-           }
- 
-           @Override
-           public CompactionExecutorId getExternalExecutor(String name) {
-             throw new UnsupportedOperationException();
-           }
-         };
-       }
-     });
+   private static DefaultCompactionPlanner createPlanner(Configuration conf, 
String executors) {
+     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+     var initParams = getInitParams(conf, executors);
  
+     planner.init(initParams);
      return planner;
    }
  }

Reply via email to