This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 67fd967415 Fix bug in compaction props (#4092)
67fd967415 is described below

commit 67fd967415612ab0770b422a5b24430655275946
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Fri Dec 22 00:16:17 2023 -0500

    Fix bug in compaction props (#4092)
    
    * Add tests for maxOpen to override open.max
    
    Adds a test to ensure that setting the maxOpen option for a compaction
    service will override the deprecated `open.max` property if set.
    
    Condenses helper methods in planner tests
    
    Uses CompactionPlannerInitParams for tests instead of custom test code
    
    Modifies the ConfigurationCopy class to accept a parent conf to allow
      for testing user set properties vs default values in test cases
    
    Adds test case for default compaction service used with deprecated
      property
    
    Removes hardcoded maxOpen value with reference to default property value
    
    Modifies the getFullyQualifiedOption to return the correct path for the
    `<service>.planner.opts.` properties.
---
 .../accumulo/core/conf/ConfigurationCopy.java      |  24 +-
 .../spi/compaction/DefaultCompactionPlanner.java   |   3 +-
 .../compaction/CompactionPlannerInitParams.java    |   2 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 361 +++++++++++----------
 4 files changed, 214 insertions(+), 176 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java 
b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java
index 14424c196a..e487e2840a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java
@@ -30,6 +30,8 @@ import java.util.stream.Stream;
  * configuration
  */
 public class ConfigurationCopy extends AccumuloConfiguration {
+
+  AccumuloConfiguration parent = null;
   private long updateCount = 0;
   final Map<String,String> copy = Collections.synchronizedMap(new HashMap<>());
 
@@ -42,6 +44,17 @@ public class ConfigurationCopy extends AccumuloConfiguration 
{
     this(config.entrySet());
   }
 
+  /**
+   * Creates a new configuration
+   *
+   * @param config - configuration property key/value pairs to copy
+   * @param parent - Higher level accumulo config to allow for property 
overrides
+   */
+  public ConfigurationCopy(Map<String,String> config, AccumuloConfiguration 
parent) {
+    this(config.entrySet());
+    this.parent = parent;
+  }
+
   /**
    * Creates a new configuration.
    *
@@ -69,11 +82,20 @@ public class ConfigurationCopy extends 
AccumuloConfiguration {
 
   @Override
   public String get(Property property) {
-    return copy.get(property.getKey());
+    if (copy.containsKey(property.getKey())) {
+      return copy.get(property.getKey());
+    } else if (parent != null) {
+      return parent.get(property);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public void getProperties(Map<String,String> props, Predicate<String> 
filter) {
+    if (parent != null) {
+      parent.getProperties(props, filter);
+    }
     for (Entry<String,String> entry : copy.entrySet()) {
       if (filter.test(entry.getKey())) {
         props.put(entry.getKey(), entry.getValue());
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index be8e25299b..9385806831 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -221,7 +221,8 @@ public class DefaultCompactionPlanner implements 
CompactionPlanner {
       this.maxFilesToCompact = 
Integer.parseInt(params.getServiceEnvironment().getConfiguration()
           .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
     } else {
-      this.maxFilesToCompact = 
Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10"));
+      this.maxFilesToCompact = 
Integer.parseInt(params.getOptions().getOrDefault("maxOpen",
+          
Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue()));
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
index 0f79ce4df0..eb287153d6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java
@@ -60,7 +60,7 @@ public class CompactionPlannerInitParams implements 
CompactionPlanner.InitParame
 
   @Override
   public String getFullyQualifiedOption(String key) {
-    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".opts." + key;
+    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".planner.opts." + key;
   }
 
   @Override
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 568b57cd2d..c6262f1e02 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -22,27 +22,34 @@ import static 
com.google.common.collect.MoreCollectors.onlyElement;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration;
 import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder;
+import org.apache.accumulo.core.util.ConfigurationImpl;
 import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
+import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.easymock.EasyMock;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DefaultCompactionPlannerTest {
 
@@ -50,6 +57,12 @@ public class DefaultCompactionPlannerTest {
     return c.stream().collect(onlyElement());
   }
 
+  private static final Configuration defaultConf =
+      new ConfigurationImpl(DefaultConfiguration.getInstance());
+  private static final CompactionServiceId csid = 
CompactionServiceId.of("cs1");
+
+  private static final Logger log = 
LoggerFactory.getLogger(DefaultCompactionPlannerTest.class);
+
   @Test
   public void testFindFilesToCompact() {
 
@@ -130,7 +143,13 @@ public class DefaultCompactionPlannerTest {
 
   @Test
   public void testRunningCompaction() {
-    var planner = createPlanner(true);
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(defaultConf, executors);
+
     var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
     var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
     var compacting =
@@ -152,12 +171,126 @@ public class DefaultCompactionPlannerTest {
     // planner should compact.
     var job = getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
+  }
+
+  /**
+   * Tests that the maxOpen property overrides the deprecated open.max 
property with the default
+   * service
+   */
+  @Test
+  @SuppressWarnings("removal")
+  public void testOverrideMaxOpenDefaultService() {
+    // Set old property and use that for max open files.
+    ConfigurationCopy aconf = new ConfigurationCopy(Map.of(), 
DefaultConfiguration.getInstance());
+    aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes();
+    EasyMock.replay(senv);
+
+    // Use the CompactionServicesConfig to create options based on default 
property values
+    var compactionServices = new CompactionServicesConfig(aconf, log::warn);
+    var options = compactionServices.getOptions().get("default");
+
+    var initParams =
+        new CompactionPlannerInitParams(CompactionServiceId.of("default"), 
options, senv);
+
+    var planner = new DefaultCompactionPlanner();
+    planner.init(initParams);
+
+    var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", 
"F5", "14M", "F6",
+        "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", 
"20M", "FC", "21M", "FD",
+        "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M");
+    Set<CompactionJob> compacting = Set.of();
+    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    var plan = planner.makePlan(params);
+    var job = getOnlyElement(plan.getJobs());
+    assertEquals(all, job.getFiles());
+    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "large"),
+        job.getExecutor());
+
+    aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5");
+    // Create new initParams so executor IDs can be reused
+    initParams = new 
CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv);
+    planner = new DefaultCompactionPlanner();
+    planner.init(initParams);
+
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", 
"F5", "14M"),
+        job.getFiles());
+    
assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"),
 "medium"),
+        job.getExecutor());
+  }
+
+  /**
+   * Tests that the maxOpen property overrides the deprecated open.max property
+   */
+  @Test
+  @SuppressWarnings("removal")
+  public void testOverrideMaxOpen() {
+    ConfigurationCopy aconf = new ConfigurationCopy(Map.of(), 
DefaultConfiguration.getInstance());
+    // Set old property and use that for max open files.
+    aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17");
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(config, executors);
+    var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", 
"16M", "F6", "32M",
+        "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", 
"FC", "2G", "FD", "4G",
+        "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G");
+    Set<CompactionJob> compacting = Set.of();
+    var params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    var plan = planner.makePlan(params);
+    var job = getOnlyElement(plan.getJobs());
+    assertEquals(all, job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), 
job.getExecutor());
+
+    // Set new property that overrides the old property.
+    aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "15");
+    config = new ConfigurationImpl(aconf);
+    planner = createPlanner(config, executors);
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+
+    // 17 files that do not meet the compaction ratio. When max files to 
compact is 15,
+    // the plan should do 3 files then 15
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
+
+    aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "5");
+    // 17 files that do not meet the compaction ratio. When max files to 
compact is 5 should do 5,
+    // files then another 5, then the final 5.
+    config = new ConfigurationImpl(aconf);
+    planner = createPlanner(config, executors);
+    params = createPlanningParams(all, all, compacting, 2, 
CompactionKind.USER);
+    plan = planner.makePlan(params);
+    job = getOnlyElement(plan.getJobs());
+    assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", 
"F5", "16M"),
+        job.getFiles());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
   }
 
   @Test
   public void testUserCompaction() {
-    var planner = createPlanner(true);
+    ConfigurationCopy aconf = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+    aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen", "15");
+    ConfigurationImpl config = new ConfigurationImpl(aconf);
+
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3},"
+        + "{'name':'huge','type': 'internal','numThreads':4}]";
+
+    var planner = createPlanner(config, executors);
     var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", 
"F5", "13M");
     var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M");
     var compacting =
@@ -168,7 +301,7 @@ public class DefaultCompactionPlannerTest {
     // a running non-user compaction should not prevent a user compaction
     var job = getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
 
     // should only run one user compaction at a time
     compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", 
"3M", "F2", "3M")));
@@ -186,7 +319,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 
     // should compact all 15
     all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", 
"64M", "F8", "128M",
@@ -196,7 +329,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("huge"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), 
job.getExecutor());
 
     // For user compaction, can compact a subset that meets the compaction 
ratio if there is also a
     // larger set of files the meets the compaction ratio
@@ -206,7 +339,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), 
job.getExecutor());
 
     // There is a subset of small files that meets the compaction ratio, but 
the larger set does not
     // so compact everything to avoid doing more than logarithmic work
@@ -215,13 +348,17 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("medium"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), 
job.getExecutor());
 
   }
 
   @Test
   public void testMaxSize() {
-    var planner = createPlanner(false);
+    String executors = "[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
+        + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}]";
+
+    var planner = createPlanner(defaultConf, executors);
     var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", 
"131M", "F5", "132M");
     var params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
     var plan = planner.makePlan(params);
@@ -229,14 +366,14 @@ public class DefaultCompactionPlannerTest {
     // should only compact files less than max size
     var job = getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), 
job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
 
     // user compaction can exceed the max size
     params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER);
     plan = planner.makePlan(params);
     job = getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
+    assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), 
job.getExecutor());
   }
 
   /**
@@ -245,18 +382,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorInternalTypeNoNumThreads() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M'},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + 
"{'name':'large','type':'internal','maxSize':'512M','numThreads':3}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 'internal','maxSize':'32M'",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'internal','maxSize':'512M','numThreads':3");
     var e = assertThrows(NullPointerException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("numThreads"), "Error message didn't 
contain numThreads");
   }
 
@@ -266,18 +397,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorExternalTypeNumThreads() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
-
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + 
"{'name':'large','type':'external','maxSize':'512M','numThreads':3}]";
 
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'512M','numThreads':3");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("numThreads"), "Error message didn't 
contain numThreads");
   }
 
@@ -287,18 +412,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorExternalNoQueue() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type':'external','maxSize':'512M'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'512M'");
     var e = assertThrows(NullPointerException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("queue"), "Error message didn't contain 
queue");
   }
 
@@ -308,17 +427,12 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorOnlyOneMaxSize() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + "{'name':'medium','type':'internal','numThreads':2},"
+        + "{'name':'large','type':'external','queue':'q1'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','numThreads':2", "'type': 
'external','queue':'q1'");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
   }
 
@@ -328,69 +442,20 @@ public class DefaultCompactionPlannerTest {
   @Test
   public void testErrorDuplicateMaxSize() {
     DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+    String executors = "[{'name':'small','type':'internal','maxSize':'32M', 
'numThreads':1},"
+        + 
"{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},"
+        + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]";
 
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-    EasyMock.replay(conf, senv);
-
-    String executors = getExecutors("'type': 
'internal','maxSize':'32M','numThreads':1",
-        "'type': 'internal','maxSize':'128M','numThreads':2",
-        "'type': 'external','maxSize':'128M','queue':'q1'");
     var e = assertThrows(IllegalArgumentException.class,
-        () -> planner.init(getInitParams(senv, executors)), "Failed to throw 
error");
+        () -> planner.init(getInitParams(defaultConf, executors)), "Failed to 
throw error");
     assertTrue(e.getMessage().contains("maxSize"), "Error message didn't 
contain maxSize");
   }
 
-  private CompactionPlanner.InitParameters getInitParams(ServiceEnvironment 
senv,
-      String executors) {
-    return new CompactionPlanner.InitParameters() {
-
-      @Override
-      public ServiceEnvironment getServiceEnvironment() {
-        return senv;
-      }
-
-      @Override
-      public Map<String,String> getOptions() {
-        return Map.of("executors", executors, "maxOpen", "15");
-      }
-
-      @Override
-      public String getFullyQualifiedOption(String key) {
-        assertEquals("maxOpen", key);
-        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-      }
-
-      @Override
-      public ExecutorManager getExecutorManager() {
-        return new ExecutorManager() {
-          @Override
-          public CompactionExecutorId createExecutor(String name, int threads) 
{
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-
-          @Override
-          public CompactionExecutorId getExternalExecutor(String name) {
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-        };
-      }
-    };
-  }
-
-  private String getExecutors(String small, String medium, String large) {
-    String execBldr = "[{'name':'small'," + small + "},{'name':'medium'," + 
medium + "},"
-        + "{'name':'large'," + large + "}]";
-    return execBldr.replaceAll("'", "\"");
-  }
-
   private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
       Set<CompactableFile> files) {
     return new CompactionPlanImpl.BuilderImpl(kind, all, all)
-        .addJob((short) all.size(), 
CompactionExecutorIdImpl.externalId("small"), files).build()
-        .getJobs().iterator().next();
+        .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, 
"small"), files)
+        .build().getJobs().iterator().next();
   }
 
   private static Set<CompactableFile> createCFs(String... namesSizePairs) {
@@ -486,80 +551,30 @@ public class DefaultCompactionPlannerTest {
     };
   }
 
-  private static DefaultCompactionPlanner createPlanner(boolean 
withHugeExecutor) {
-    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
-    Configuration conf = EasyMock.createMock(Configuration.class);
-    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
-
-    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
-    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
-
-    EasyMock.replay(conf, senv);
+  private static CompactionPlanner.InitParameters getInitParams(Configuration 
conf,
+      String executors) {
 
-    StringBuilder execBldr =
-        new StringBuilder("[{'name':'small','type': 
'internal','maxSize':'32M','numThreads':1},"
-            + "{'name':'medium','type': 
'internal','maxSize':'128M','numThreads':2},"
-            + "{'name':'large','type': 
'internal','maxSize':'512M','numThreads':3}");
+    String maxOpen =
+        conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts.maxOpen");
+    Map<String,String> options = new HashMap<>();
+    options.put("executors", executors.replaceAll("'", "\""));
 
-    if (withHugeExecutor) {
-      execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]");
-    } else {
-      execBldr.append("]");
+    if (maxOpen != null) {
+      options.put("maxOpen", maxOpen);
     }
 
-    String executors = execBldr.toString().replaceAll("'", "\"");
-
-    planner.init(new CompactionPlanner.InitParameters() {
-
-      @Override
-      public ServiceEnvironment getServiceEnvironment() {
-        return senv;
-      }
-
-      @Override
-      public Map<String,String> getOptions() {
-        return Map.of("executors", executors, "maxOpen", "15");
-      }
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.replay(senv);
 
-      @Override
-      public String getFullyQualifiedOption(String key) {
-        assertEquals("maxOpen", key);
-        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + 
"cs1.planner.opts." + key;
-      }
+    return new CompactionPlannerInitParams(csid, options, senv);
+  }
 
-      @Override
-      public ExecutorManager getExecutorManager() {
-        return new ExecutorManager() {
-          @Override
-          public CompactionExecutorId createExecutor(String name, int threads) 
{
-            switch (name) {
-              case "small":
-                assertEquals(1, threads);
-                break;
-              case "medium":
-                assertEquals(2, threads);
-                break;
-              case "large":
-                assertEquals(3, threads);
-                break;
-              case "huge":
-                assertEquals(4, threads);
-                break;
-              default:
-                fail("Unexpected name " + name);
-                break;
-            }
-            return CompactionExecutorIdImpl.externalId(name);
-          }
-
-          @Override
-          public CompactionExecutorId getExternalExecutor(String name) {
-            throw new UnsupportedOperationException();
-          }
-        };
-      }
-    });
+  private static DefaultCompactionPlanner createPlanner(Configuration conf, 
String executors) {
+    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+    var initParams = getInitParams(conf, executors);
 
+    planner.init(initParams);
     return planner;
   }
 }

Reply via email to