This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new caedeb66cd7 Add API to update compaction engine (#16803)
caedeb66cd7 is described below

commit caedeb66cd77ac143980bbe78924303f49de0b06
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 26 20:44:51 2024 -0700

    Add API to update compaction engine (#16803)
    
    Changes:
    - Add API `/druid/coordinator/v1/config/compaction/global` to update 
cluster level compaction config
    - Add class `CompactionConfigUpdateRequest`
    - Fix bug in `CoordinatorCompactionConfig` which caused compaction engine 
to not be persisted.
    Use json field name `engine` instead of `compactionEngine` because JSON 
field names must align
    with the getter name.
    - Update MSQ validation error messages
    - Complete overhaul of `CoordinatorCompactionConfigResourceTest` to remove 
unnecessary mocking
    and add more meaningful tests.
    - Add `TuningConfigBuilder` to easily build tuning configs for tests.
    - Add `DatasourceCompactionConfigBuilder`
---
 .../NewestSegmentFirstPolicyBenchmark.java         |  21 +-
 .../druid/msq/indexing/MSQCompactionRunner.java    |   2 +-
 .../msq/indexing/MSQCompactionRunnerTest.java      |   2 +-
 .../common/task/NativeCompactionRunner.java        |   2 +-
 .../indexing/ClientCompactionRunnerInfo.java       |  42 +-
 .../CompactionConfigValidationResult.java          |  15 +-
 .../coordinator/CoordinatorCompactionConfig.java   |  17 +-
 .../coordinator/DataSourceCompactionConfig.java    |   7 +
 .../config/DataSourceCompactionConfigBuilder.java  | 155 ++++
 .../server/http/CompactionConfigUpdateRequest.java |  82 ++
 .../http/CoordinatorCompactionConfigsResource.java |  59 +-
 .../indexing/ClientCompactionRunnerInfoTest.java   |   9 +-
 .../CoordinatorCompactionConfigTest.java}          |  28 +-
 .../DataSourceCompactionConfigTest.java            | 372 +++------
 .../CoordinatorCompactionConfigsResourceTest.java  | 849 +++++++++++----------
 15 files changed, 898 insertions(+), 764 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 98c27c4b2b8..87e92ab6fb1 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -92,21 +92,12 @@ public class NewestSegmentFirstPolicyBenchmark
       final String dataSource = DATA_SOURCE_PREFIX + i;
       compactionConfigs.put(
           dataSource,
-          new DataSourceCompactionConfig(
-              dataSource,
-              0,
-              inputSegmentSizeBytes,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null,
-              null
-          )
+          DataSourceCompactionConfig
+              .builder()
+              .forDataSource(dataSource)
+              .withTaskPriority(0)
+              .withInputSegmentSizeBytes(inputSegmentSizeBytes)
+              .build()
       );
     }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index fb25097e980..28ec422e2ac 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -143,7 +143,7 @@ public class MSQCompactionRunner implements CompactionRunner
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
-                            .orElse(new CompactionConfigValidationResult(true, 
null));
+                            
.orElse(CompactionConfigValidationResult.success());
   }
 
   @Override
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index b95243f7783..3f73a9b513e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -211,7 +211,7 @@ public class MSQCompactionRunnerTest
     CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "Different name[sum_added] and fieldName(s)[[added]] for aggregator 
unsupported for MSQ engine.",
+        "MSQ: Different name[sum_added] and fieldName(s)[[added]] for 
aggregator",
         validationResult.getReason()
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 30761b674e5..2074d14f0f9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -88,7 +88,7 @@ public class NativeCompactionRunner implements 
CompactionRunner
       CompactionTask compactionTask
   )
   {
-    return new CompactionConfigValidationResult(true, null);
+    return CompactionConfigValidationResult.success();
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index ed9e22dfaa2..74504184608 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfo
   {
     CompactionEngine compactionEngine = newConfig.getEngine() == null ? 
defaultCompactionEngine : newConfig.getEngine();
     if (compactionEngine == CompactionEngine.NATIVE) {
-      return new CompactionConfigValidationResult(true, null);
+      return CompactionConfigValidationResult.success();
     } else {
       return compactionConfigSupportedByMSQEngine(newConfig);
     }
@@ -125,7 +125,7 @@ public class ClientCompactionRunnerInfo
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
-                            .orElse(new CompactionConfigValidationResult(true, 
null));
+                            
.orElse(CompactionConfigValidationResult.success());
   }
 
   /**
@@ -135,22 +135,19 @@ public class ClientCompactionRunnerInfo
   {
     if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
           || partitionsSpec instanceof DynamicPartitionsSpec)) {
-      return new CompactionConfigValidationResult(
-          false,
-          "Invalid partitionsSpec type[%s] for MSQ engine. Type must be either 
'dynamic' or 'range'.",
+      return CompactionConfigValidationResult.failure(
+          "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or 
'range'",
           partitionsSpec.getClass().getSimpleName()
 
       );
     }
     if (partitionsSpec instanceof DynamicPartitionsSpec
         && ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null) 
{
-      return new CompactionConfigValidationResult(
-          false,
-          "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ 
engine.",
-          ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+      return CompactionConfigValidationResult.failure(
+          "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
       );
     }
-    return new CompactionConfigValidationResult(true, null);
+    return CompactionConfigValidationResult.success();
   }
 
   /**
@@ -162,12 +159,11 @@ public class ClientCompactionRunnerInfo
   )
   {
     if (metricsSpec != null && isRollup != null && !isRollup) {
-      return new CompactionConfigValidationResult(
-          false,
-          "rollup in granularitySpec must be set to True if metricsSpec is 
specifed for MSQ engine."
+      return CompactionConfigValidationResult.failure(
+          "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is 
specified"
       );
     }
-    return new CompactionConfigValidationResult(true, null);
+    return CompactionConfigValidationResult.success();
   }
 
   /**
@@ -179,14 +175,13 @@ public class ClientCompactionRunnerInfo
       int maxNumTasks = QueryContext.of(context)
                                     
.getInt(ClientMSQContext.CTX_MAX_NUM_TASKS, 
ClientMSQContext.DEFAULT_MAX_NUM_TASKS);
       if (maxNumTasks < 2) {
-        return new CompactionConfigValidationResult(false,
-                                                    "MSQ context maxNumTasks 
[%,d] cannot be less than 2, "
-                                                    + "since at least 1 
controller and 1 worker is necessary.",
-                                                    maxNumTasks
+        return CompactionConfigValidationResult.failure(
+            "MSQ: Context maxNumTasks[%,d] must be at least 2 (1 controller + 
1 worker)",
+            maxNumTasks
         );
       }
     }
-    return new CompactionConfigValidationResult(true, null);
+    return CompactionConfigValidationResult.success();
   }
 
   /**
@@ -195,7 +190,7 @@ public class ClientCompactionRunnerInfo
   public static CompactionConfigValidationResult 
validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
   {
     if (metricsSpec == null) {
-      return new CompactionConfigValidationResult(true, null);
+      return CompactionConfigValidationResult.success();
     }
     return Arrays.stream(metricsSpec)
                  .filter(aggregatorFactory ->
@@ -206,11 +201,10 @@ public class ClientCompactionRunnerInfo
                                                       
.equals(aggregatorFactory.getName())))
                  .findFirst()
                  .map(aggregatorFactory ->
-                          new CompactionConfigValidationResult(
-                              false,
-                              "Different name[%s] and fieldName(s)[%s] for 
aggregator unsupported for MSQ engine.",
+                          CompactionConfigValidationResult.failure(
+                              "MSQ: Different name[%s] and fieldName(s)[%s] 
for aggregator",
                               aggregatorFactory.getName(),
                               aggregatorFactory.requiredFields()
-                          )).orElse(new CompactionConfigValidationResult(true, 
null));
+                          
)).orElse(CompactionConfigValidationResult.success());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
index 88eaa3e923a..d482903e0d6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
@@ -23,10 +23,23 @@ import org.apache.druid.java.util.common.StringUtils;
 
 public class CompactionConfigValidationResult
 {
+  private static final CompactionConfigValidationResult SUCCESS
+      = new CompactionConfigValidationResult(true, null);
+
   private final boolean valid;
   private final String reason;
 
-  public CompactionConfigValidationResult(boolean valid, String format, 
Object... args)
+  public static CompactionConfigValidationResult success()
+  {
+    return SUCCESS;
+  }
+
+  public static CompactionConfigValidationResult failure(String msgFormat, 
Object... args)
+  {
+    return new CompactionConfigValidationResult(false, msgFormat, args);
+  }
+
+  private CompactionConfigValidationResult(boolean valid, String format, 
Object... args)
   {
     this.valid = valid;
     this.reason = format == null ? null : StringUtils.format(format, args);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index 036c53121e9..15e19cdbd77 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.server.http.CompactionConfigUpdateRequest;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -54,23 +55,21 @@ public class CoordinatorCompactionConfig
         baseConfig.compactionTaskSlotRatio,
         baseConfig.maxCompactionTaskSlots,
         baseConfig.useAutoScaleSlots,
-        null
+        baseConfig.compactionEngine
     );
   }
 
   public static CoordinatorCompactionConfig from(
       CoordinatorCompactionConfig baseConfig,
-      @Nullable Double compactionTaskSlotRatio,
-      @Nullable Integer maxCompactionTaskSlots,
-      @Nullable Boolean useAutoScaleSlots
+      CompactionConfigUpdateRequest update
   )
   {
     return new CoordinatorCompactionConfig(
         baseConfig.compactionConfigs,
-        compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : 
compactionTaskSlotRatio,
-        maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : 
maxCompactionTaskSlots,
-        useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : 
useAutoScaleSlots,
-        null
+        Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), 
baseConfig.compactionTaskSlotRatio),
+        Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), 
baseConfig.maxCompactionTaskSlots),
+        Configs.valueOrDefault(update.getUseAutoScaleSlots(), 
baseConfig.useAutoScaleSlots),
+        Configs.valueOrDefault(update.getCompactionEngine(), 
baseConfig.compactionEngine)
     );
   }
 
@@ -90,7 +89,7 @@ public class CoordinatorCompactionConfig
       @JsonProperty("compactionTaskSlotRatio") @Nullable Double 
compactionTaskSlotRatio,
       @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
       @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
-      @JsonProperty("compactionEngine") @Nullable CompactionEngine 
compactionEngine
+      @JsonProperty("engine") @Nullable CompactionEngine compactionEngine
   )
   {
     this.compactionConfigs = compactionConfigs;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 767e8218f31..91af8b6267d 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import 
org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
@@ -42,6 +43,12 @@ public class DataSourceCompactionConfig
   private final String dataSource;
   private final int taskPriority;
   private final long inputSegmentSizeBytes;
+
+  public static DataSourceCompactionConfigBuilder builder()
+  {
+    return new DataSourceCompactionConfigBuilder();
+  }
+
   /**
    * The number of input segments is limited because the byte size of a 
serialized task spec is limited by
    * 
org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig.maxZnodeBytes.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
new file mode 100644
index 00000000000..9cb00dfa614
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.config;
+
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
+import org.joda.time.Period;
+
+import java.util.Map;
+
+public class DataSourceCompactionConfigBuilder
+{
+  private String dataSource;
+  private Integer taskPriority;
+  private Long inputSegmentSizeBytes;
+  private Integer maxRowsPerSegment;
+  private Period skipOffsetFromLatest;
+  private UserCompactionTaskQueryTuningConfig tuningConfig;
+  private UserCompactionTaskGranularityConfig granularitySpec;
+  private UserCompactionTaskDimensionsConfig dimensionsSpec;
+  private AggregatorFactory[] metricsSpec;
+  private UserCompactionTaskTransformConfig transformSpec;
+  private UserCompactionTaskIOConfig ioConfig;
+  private CompactionEngine engine;
+  private Map<String, Object> taskContext;
+
+  public DataSourceCompactionConfig build()
+  {
+    return new DataSourceCompactionConfig(
+        dataSource,
+        taskPriority,
+        inputSegmentSizeBytes,
+        maxRowsPerSegment,
+        skipOffsetFromLatest,
+        tuningConfig,
+        granularitySpec,
+        dimensionsSpec,
+        metricsSpec,
+        transformSpec,
+        ioConfig,
+        engine,
+        taskContext
+    );
+  }
+
+  public DataSourceCompactionConfigBuilder forDataSource(String dataSource)
+  {
+    this.dataSource = dataSource;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withTaskPriority(Integer 
taskPriority)
+  {
+    this.taskPriority = taskPriority;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long 
inputSegmentSizeBytes)
+  {
+    this.inputSegmentSizeBytes = inputSegmentSizeBytes;
+    return this;
+  }
+
+  @Deprecated
+  public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer 
maxRowsPerSegment)
+  {
+    this.maxRowsPerSegment = maxRowsPerSegment;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period 
skipOffsetFromLatest)
+  {
+    this.skipOffsetFromLatest = skipOffsetFromLatest;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withTuningConfig(
+      UserCompactionTaskQueryTuningConfig tuningConfig
+  )
+  {
+    this.tuningConfig = tuningConfig;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withGranularitySpec(
+      UserCompactionTaskGranularityConfig granularitySpec
+  )
+  {
+    this.granularitySpec = granularitySpec;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withDimensionsSpec(
+      UserCompactionTaskDimensionsConfig dimensionsSpec
+  )
+  {
+    this.dimensionsSpec = dimensionsSpec;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[] 
metricsSpec)
+  {
+    this.metricsSpec = metricsSpec;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withTransformSpec(
+      UserCompactionTaskTransformConfig transformSpec
+  )
+  {
+    this.transformSpec = transformSpec;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder 
withIoConfig(UserCompactionTaskIOConfig ioConfig)
+  {
+    this.ioConfig = ioConfig;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine)
+  {
+    this.engine = engine;
+    return this;
+  }
+
+  public DataSourceCompactionConfigBuilder withTaskContext(Map<String, Object> 
taskContext)
+  {
+    this.taskContext = taskContext;
+    return this;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
 
b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
new file mode 100644
index 00000000000..72ab23fde8a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import javax.annotation.Nullable;
+
+/**
+ * Payload to update the cluster-level compaction config.
+ * All fields of this class must be nullable. A non-value indicates that the
+ * corresponding field is being updated.
+ */
+public class CompactionConfigUpdateRequest
+{
+  private final Double compactionTaskSlotRatio;
+  private final Integer maxCompactionTaskSlots;
+  private final Boolean useAutoScaleSlots;
+  private final CompactionEngine compactionEngine;
+
+  @JsonCreator
+  public CompactionConfigUpdateRequest(
+      @JsonProperty("compactionTaskSlotRatio") @Nullable Double 
compactionTaskSlotRatio,
+      @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
+      @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
+      @JsonProperty("compactionEngine") @Nullable CompactionEngine 
compactionEngine
+  )
+  {
+    this.compactionTaskSlotRatio = compactionTaskSlotRatio;
+    this.maxCompactionTaskSlots = maxCompactionTaskSlots;
+    this.useAutoScaleSlots = useAutoScaleSlots;
+    this.compactionEngine = compactionEngine;
+  }
+
+  @Nullable
+  @JsonProperty
+  public Double getCompactionTaskSlotRatio()
+  {
+    return compactionTaskSlotRatio;
+  }
+
+  @Nullable
+  @JsonProperty
+  public Integer getMaxCompactionTaskSlots()
+  {
+    return maxCompactionTaskSlots;
+  }
+
+  @Nullable
+  @JsonProperty
+  public Boolean getUseAutoScaleSlots()
+  {
+    return useAutoScaleSlots;
+  }
+
+  @Nullable
+  @JsonProperty
+  public CompactionEngine getCompactionEngine()
+  {
+    return compactionEngine;
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index 0bba5cf63fa..e8baf825090 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
 import org.apache.druid.server.http.security.ConfigResourceFilter;
 import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.servlet.http.HttpServletRequest;
@@ -69,7 +70,7 @@ public class CoordinatorCompactionConfigsResource
 {
   private static final Logger LOG = new 
Logger(CoordinatorCompactionConfigsResource.class);
   private static final long UPDATE_RETRY_DELAY = 1000;
-  static final int UPDATE_NUM_RETRY = 5;
+  static final int MAX_UPDATE_RETRIES = 5;
 
   private final CoordinatorConfigManager configManager;
   private final AuditManager auditManager;
@@ -86,11 +87,46 @@ public class CoordinatorCompactionConfigsResource
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
-  public Response getCompactionConfig()
+  public Response getClusterCompactionConfig()
   {
     return Response.ok(configManager.getCurrentCompactionConfig()).build();
   }
 
+  @POST
+  @Path("/global")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response updateClusterCompactionConfig(
+      CompactionConfigUpdateRequest updatePayload,
+      @Context HttpServletRequest req
+  )
+  {
+    UnaryOperator<CoordinatorCompactionConfig> operator = current -> {
+      final CoordinatorCompactionConfig newConfig = 
CoordinatorCompactionConfig.from(current, updatePayload);
+
+      final List<DataSourceCompactionConfig> datasourceConfigs = 
newConfig.getCompactionConfigs();
+      if (CollectionUtils.isNullOrEmpty(datasourceConfigs)
+          || current.getEngine() == newConfig.getEngine()) {
+        return newConfig;
+      }
+
+      // Validate all the datasource configs against the new engine
+      for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) {
+        CompactionConfigValidationResult validationResult =
+            
ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, 
newConfig.getEngine());
+        if (!validationResult.isValid()) {
+          throw InvalidInput.exception(
+              "Cannot update engine to [%s] as it does not support"
+              + " compaction config of DataSource[%s]. Reason[%s].",
+              newConfig.getEngine(), datasourceConfig.getDataSource(), 
validationResult.getReason()
+          );
+        }
+      }
+
+      return newConfig;
+    };
+    return updateConfigHelper(operator, 
AuthorizationUtils.buildAuditInfo(req));
+  }
+
   @POST
   @Path("/taskslots")
   @Consumes(MediaType.APPLICATION_JSON)
@@ -101,19 +137,20 @@ public class CoordinatorCompactionConfigsResource
       @Context HttpServletRequest req
   )
   {
-    UnaryOperator<CoordinatorCompactionConfig> operator =
-        current -> CoordinatorCompactionConfig.from(
-            current,
+    return updateClusterCompactionConfig(
+        new CompactionConfigUpdateRequest(
             compactionTaskSlotRatio,
             maxCompactionTaskSlots,
-            useAutoScaleSlots
-        );
-    return updateConfigHelper(operator, 
AuthorizationUtils.buildAuditInfo(req));
+            useAutoScaleSlots,
+            null
+        ),
+        req
+    );
   }
 
   @POST
   @Consumes(MediaType.APPLICATION_JSON)
-  public Response addOrUpdateCompactionConfig(
+  public Response addOrUpdateDatasourceCompactionConfig(
       final DataSourceCompactionConfig newConfig,
       @Context HttpServletRequest req
   )
@@ -144,7 +181,7 @@ public class CoordinatorCompactionConfigsResource
   @GET
   @Path("/{dataSource}")
   @Produces(MediaType.APPLICATION_JSON)
-  public Response getCompactionConfig(@PathParam("dataSource") String 
dataSource)
+  public Response getDatasourceCompactionConfig(@PathParam("dataSource") 
String dataSource)
   {
     final CoordinatorCompactionConfig current = 
configManager.getCurrentCompactionConfig();
     final Map<String, DataSourceCompactionConfig> configs = current
@@ -233,7 +270,7 @@ public class CoordinatorCompactionConfigsResource
     int attemps = 0;
     SetResult setResult = null;
     try {
-      while (attemps < UPDATE_NUM_RETRY) {
+      while (attemps < MAX_UPDATE_RETRIES) {
         setResult = configManager.getAndUpdateCompactionConfig(configOperator, 
auditInfo);
         if (setResult.isOk() || !setResult.isRetryable()) {
           break;
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index f6d4a2b6e58..01daa1da4af 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -64,8 +64,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "Invalid partitionsSpec type[HashedPartitionsSpec] for MSQ engine."
-        + " Type must be either 'dynamic' or 'range'.",
+        "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 
'dynamic' or 'range'",
         validationResult.getReason()
     );
   }
@@ -85,7 +84,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "maxTotalRows[100] in DynamicPartitionsSpec not supported for MSQ 
engine.",
+        "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning",
         validationResult.getReason()
     );
   }
@@ -144,7 +143,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "rollup in granularitySpec must be set to True if metricsSpec is 
specifed for MSQ engine.",
+        "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is 
specified",
         validationResult.getReason()
     );
   }
@@ -167,7 +166,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "Different name[sum_added] and fieldName(s)[[added]] for aggregator 
unsupported for MSQ engine.",
+        "MSQ: Different name[sum_added] and fieldName(s)[[added]] for 
aggregator",
         validationResult.getReason()
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
similarity index 57%
copy from 
server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
copy to 
server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
index 88eaa3e923a..6f39f552791 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
@@ -19,26 +19,22 @@
 
 package org.apache.druid.server.coordinator;
 
-import org.apache.druid.java.util.common.StringUtils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class CompactionConfigValidationResult
+public class CoordinatorCompactionConfigTest
 {
-  private final boolean valid;
-  private final String reason;
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
 
-  public CompactionConfigValidationResult(boolean valid, String format, 
Object... args)
+  @Test
+  public void testSerdeDefaultConfig() throws Exception
   {
-    this.valid = valid;
-    this.reason = format == null ? null : StringUtils.format(format, args);
-  }
+    final CoordinatorCompactionConfig defaultConfig = 
CoordinatorCompactionConfig.empty();
+    final String json = MAPPER.writeValueAsString(defaultConfig);
 
-  public boolean isValid()
-  {
-    return valid;
-  }
-
-  public String getReason()
-  {
-    return reason;
+    CoordinatorCompactionConfig deserialized = MAPPER.readValue(json, 
CoordinatorCompactionConfig.class);
+    Assert.assertEquals(defaultConfig, deserialized);
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index a9334f077a4..67f276f7651 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -42,9 +42,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 
@@ -52,27 +50,15 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
 {
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
 
-  @Rule
-  public final ExpectedException expectedException = ExpectedException.none();
-
   @Test
   public void testSerdeBasic() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        null,
-        null,
-        new Period(3600),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -90,21 +76,15 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeWithMaxRowsPerSegment() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        30,
-        new Period(3600),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        CompactionEngine.MSQ,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withMaxRowsPerSegment(30)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withEngine(CompactionEngine.MSQ)
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -121,41 +101,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeWithMaxTotalRows() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        new UserCompactionTaskQueryTuningConfig(
-            null,
-            null,
-            null,
-            10000L,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        CompactionEngine.NATIVE,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withEngine(CompactionEngine.NATIVE)
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -172,42 +125,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        10000,
-        new Period(3600),
-        new UserCompactionTaskQueryTuningConfig(
-            null,
-            null,
-            null,
-            10000L,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
-
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withMaxRowsPerSegment(10000)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -301,21 +226,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeGranularitySpec() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -332,21 +250,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeGranularitySpecWithQueryGranularity() throws Exception
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, 
null),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, 
Granularities.YEAR, null))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -366,21 +277,13 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeWithNullGranularitySpec() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -397,21 +300,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeGranularitySpecWithNullValues() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(null, null, null),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, 
null, null))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -428,21 +324,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeGranularitySpecWithRollup() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(null, null, true),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new UserCompactionTaskGranularityConfig(null, 
null, true))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -462,21 +351,15 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        new UserCompactionTaskIOConfig(true),
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+        .withIoConfig(new UserCompactionTaskIOConfig(true))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -494,21 +377,15 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeIOConfigWithNullDropExisting() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        new UserCompactionTaskIOConfig(null),
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+        .withIoConfig(new UserCompactionTaskIOConfig(null))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -526,21 +403,18 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeDimensionsSpec() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        null,
-        new 
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withDimensionsSpec(
+            new UserCompactionTaskDimensionsConfig(
+                DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))
+            )
+        )
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -558,21 +432,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   public void testSerdeTransformSpec() throws IOException
   {
     NullHandling.initializeForTests();
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        null,
-        null,
-        null,
-        new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1", 
"foo", null)),
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withTransformSpec(new UserCompactionTaskTransformConfig(new 
SelectorDimFilter("dim1", "foo", null)))
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
@@ -589,21 +456,14 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerdeMetricsSpec() throws IOException
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        null,
-        null,
-        new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
+    final DataSourceCompactionConfig config = DataSourceCompactionConfig
+        .builder()
+        .forDataSource("dataSource")
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(new Period(3600))
+        .withMetricsSpec(new AggregatorFactory[]{new 
CountAggregatorFactory("cnt")})
+        .withTaskContext(ImmutableMap.of("key", "val"))
+        .build();
     final String json = OBJECT_MAPPER.writeValueAsString(config);
     final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
 
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 17db2285477..5be533c0e46 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -19,528 +19,529 @@
 
 package org.apache.druid.server.http;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import org.apache.druid.audit.AuditEntry;
+import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.client.indexing.ClientMSQContext;
 import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.common.config.TestConfigManagerConfig;
 import org.apache.druid.error.ErrorResponse;
 import org.apache.druid.indexer.CompactionEngine;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataCASUpdate;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.TestMetadataStorageConnector;
+import org.apache.druid.metadata.TestMetadataStorageTablesConfig;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorConfigManager;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import 
org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import 
org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
+import org.joda.time.Interval;
 import org.joda.time.Period;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.skife.jdbi.v2.Handle;
 
+import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CoordinatorCompactionConfigsResourceTest
 {
-  private static final DataSourceCompactionConfig OLD_CONFIG = new 
DataSourceCompactionConfig(
-      "oldDataSource",
-      null,
-      500L,
-      null,
-      new Period(3600),
-      null,
-      new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
-      null,
-      null,
-      null,
-      null,
-      null,
-      ImmutableMap.of("key", "val")
-  );
-  private static final DataSourceCompactionConfig NEW_CONFIG = new 
DataSourceCompactionConfig(
-      "newDataSource",
-      null,
-      500L,
-      null,
-      new Period(1800),
-      null,
-      new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null),
-      null,
-      null,
-      null,
-      null,
-      null,
-      ImmutableMap.of("key", "val")
-  );
-  private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
-
-  private static final CoordinatorCompactionConfig ORIGINAL_CONFIG
-      = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
-
-  private static final String DATASOURCE_NOT_EXISTS = "notExists";
-
-  @Mock
-  private JacksonConfigManager mockJacksonConfigManager;
+  private static final double DELTA = 1e-9;
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
 
   @Mock
   private HttpServletRequest mockHttpServletRequest;
 
-  @Mock
-  private MetadataStorageConnector mockConnector;
-
-  @Mock
-  private MetadataStorageTablesConfig mockConnectorConfig;
-
-  @Mock
-  private AuditManager mockAuditManager;
-
-  private CoordinatorCompactionConfigsResource 
coordinatorCompactionConfigsResource;
+  private TestCoordinatorConfigManager configManager;
+  private CoordinatorCompactionConfigsResource resource;
 
   @Before
   public void setup()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(OLD_CONFIG_IN_BYTES);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(ORIGINAL_CONFIG);
-    
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
-    Mockito.when(mockAuditManager.fetchAuditHistory(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ImmutableList.of());
-    coordinatorCompactionConfigsResource = new 
CoordinatorCompactionConfigsResource(
-        new CoordinatorConfigManager(mockJacksonConfigManager, mockConnector, 
mockConnectorConfig),
-        mockAuditManager
-    );
     Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
+    final AuditManager auditManager = new TestAuditManager();
+    configManager = TestCoordinatorConfigManager.create(auditManager);
+    resource = new CoordinatorCompactionConfigsResource(configManager, 
auditManager);
+    configManager.delegate.start();
+  }
+
+  @After
+  public void tearDown()
+  {
+    configManager.delegate.stop();
   }
 
   @Test
-  public void testSetCompactionTaskLimitWithExistingConfig()
+  public void testGetDefaultClusterConfig()
   {
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-
-    double compactionTaskSlotRatio = 0.5;
-    int maxCompactionTaskSlots = 9;
-    Response result = 
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
-        compactionTaskSlotRatio,
-        maxCompactionTaskSlots,
-        true,
-        mockHttpServletRequest
-    );
-    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
result.getStatus());
-    Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
-    Assert.assertNotNull(newConfigCaptor.getValue());
-    
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), 
maxCompactionTaskSlots);
-    Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
-    Assert.assertEquals(compactionTaskSlotRatio, 
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
+    Response response = resource.getClusterCompactionConfig();
+    final CoordinatorCompactionConfig defaultConfig
+        = verifyAndGetPayload(response, CoordinatorCompactionConfig.class);
+
+    Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), 
DELTA);
+    Assert.assertEquals(Integer.MAX_VALUE, 
defaultConfig.getMaxCompactionTaskSlots());
+    Assert.assertFalse(defaultConfig.isUseAutoScaleSlots());
+    Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty());
+    Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
   }
 
   @Test
-  public void testAddOrUpdateCompactionConfigWithExistingConfig()
+  public void testUpdateGlobalConfig()
   {
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-
-    final DataSourceCompactionConfig newConfig = new 
DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
true),
-        null,
-        null,
-        null,
-        null,
-        CompactionEngine.NATIVE,
-        ImmutableMap.of("key", "val")
-    );
-    Response result = 
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        newConfig,
+    Response response = resource.updateClusterCompactionConfig(
+        new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ),
         mockHttpServletRequest
     );
-    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
result.getStatus());
-    Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
-    Assert.assertNotNull(newConfigCaptor.getValue());
-    Assert.assertEquals(2, 
newConfigCaptor.getValue().getCompactionConfigs().size());
-    Assert.assertEquals(OLD_CONFIG, 
newConfigCaptor.getValue().getCompactionConfigs().get(0));
-    Assert.assertEquals(newConfig, 
newConfigCaptor.getValue().getCompactionConfigs().get(1));
-    Assert.assertEquals(newConfig.getEngine(), 
newConfigCaptor.getValue().getEngine());
+    verifyStatus(Response.Status.OK, response);
+
+    final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload(
+        resource.getClusterCompactionConfig(),
+        CoordinatorCompactionConfig.class
+    );
+
+    Assert.assertNotNull(updatedConfig);
+    Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), 
DELTA);
+    Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots());
+    Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
+    Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine());
   }
 
   @Test
-  public void testDeleteCompactionConfigWithExistingConfig()
+  public void testSetCompactionTaskLimit()
   {
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-    final String datasourceName = "dataSource";
-    final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig(
-        datasourceName,
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
-    final CoordinatorCompactionConfig originalConfig = 
CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(originalConfig);
-
-    Response result = 
coordinatorCompactionConfigsResource.deleteCompactionConfig(
-        datasourceName,
-        mockHttpServletRequest
-    );
-    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
result.getStatus());
-    Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
-    Assert.assertNotNull(newConfigCaptor.getValue());
-    Assert.assertEquals(0, 
newConfigCaptor.getValue().getCompactionConfigs().size());
+    final CoordinatorCompactionConfig defaultConfig
+        = verifyAndGetPayload(resource.getClusterCompactionConfig(), 
CoordinatorCompactionConfig.class);
+
+    Response response = resource.setCompactionTaskLimit(0.5, 9, true, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
+
+    final CoordinatorCompactionConfig updatedConfig
+        = verifyAndGetPayload(resource.getClusterCompactionConfig(), 
CoordinatorCompactionConfig.class);
+
+    // Verify that the task slot fields have been updated
+    Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), 
DELTA);
+    Assert.assertEquals(9, updatedConfig.getMaxCompactionTaskSlots());
+    Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
+
+    // Verify that the other fields are unchanged
+    Assert.assertEquals(defaultConfig.getCompactionConfigs(), 
updatedConfig.getCompactionConfigs());
+    Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine());
+  }
+
+  @Test
+  public void testGetUnknownDatasourceConfigThrowsNotFound()
+  {
+    Response response = resource.getDatasourceCompactionConfig(DS.WIKI);
+    verifyStatus(Response.Status.NOT_FOUND, response);
+  }
+
+  @Test
+  public void testAddDatasourceConfig()
+  {
+    final DataSourceCompactionConfig newDatasourceConfig
+        = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
+    Response response = 
resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
+
+    final DataSourceCompactionConfig fetchedDatasourceConfig
+        = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), 
DataSourceCompactionConfig.class);
+    Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig);
+
+    final CoordinatorCompactionConfig fullCompactionConfig
+        = verifyAndGetPayload(resource.getClusterCompactionConfig(), 
CoordinatorCompactionConfig.class);
+    Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
+    Assert.assertEquals(newDatasourceConfig, 
fullCompactionConfig.getCompactionConfigs().get(0));
   }
 
   @Test
-  public void testUpdateShouldRetryIfRetryableException()
+  public void testUpdateDatasourceConfig()
   {
-    Mockito.when(
-        mockJacksonConfigManager.set(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any()
+    final DataSourceCompactionConfig originalDatasourceConfig = 
DataSourceCompactionConfig
+        .builder()
+        .forDataSource(DS.WIKI)
+        .withInputSegmentSizeBytes(500L)
+        .withSkipOffsetFromLatest(Period.hours(1))
+        .withGranularitySpec(
+            new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
true)
         )
-    ).thenReturn(ConfigManager.SetResult.retryableFailure(new 
ISE("retryable")));
+        .withEngine(CompactionEngine.NATIVE)
+        .build();
 
-    coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        NEW_CONFIG,
+    Response response = resource.addOrUpdateDatasourceCompactionConfig(
+        originalDatasourceConfig,
         mockHttpServletRequest
     );
+    verifyStatus(Response.Status.OK, response);
+
+    final DataSourceCompactionConfig updatedDatasourceConfig = 
DataSourceCompactionConfig
+        .builder()
+        .forDataSource(DS.WIKI)
+        .withInputSegmentSizeBytes(1000L)
+        .withSkipOffsetFromLatest(Period.hours(3))
+        .withGranularitySpec(
+            new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
true)
+        )
+        .withEngine(CompactionEngine.MSQ)
+        .build();
 
-    // Verify that the update is retried upto the max number of retries
-    Mockito.verify(
-        mockJacksonConfigManager,
-        Mockito.times(CoordinatorCompactionConfigsResource.UPDATE_NUM_RETRY)
-    ).set(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any()
-    );
+    response = 
resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
+
+    final DataSourceCompactionConfig latestDatasourceConfig
+        = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI), 
DataSourceCompactionConfig.class);
+    Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig);
+
+    final CoordinatorCompactionConfig fullCompactionConfig
+        = verifyAndGetPayload(resource.getClusterCompactionConfig(), 
CoordinatorCompactionConfig.class);
+    Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
+    Assert.assertEquals(updatedDatasourceConfig, 
fullCompactionConfig.getCompactionConfigs().get(0));
   }
 
   @Test
-  public void testUpdateShouldNotRetryIfNotRetryableException()
+  public void testDeleteDatasourceConfig()
   {
-    Mockito.when(
-        mockJacksonConfigManager.set(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any()
-        )
-    ).thenReturn(ConfigManager.SetResult.failure(new ISE("retryable")));
+    final DataSourceCompactionConfig datasourceConfig
+        = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
+    Response response = 
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
 
-    coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        NEW_CONFIG,
+    response = resource.deleteCompactionConfig(DS.WIKI, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
+
+    response = resource.getDatasourceCompactionConfig(DS.WIKI);
+    verifyStatus(Response.Status.NOT_FOUND, response);
+  }
+
+  @Test
+  public void testDeleteUnknownDatasourceConfigThrowsNotFound()
+  {
+    Response response = resource.deleteCompactionConfig(DS.WIKI, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.NOT_FOUND, response);
+  }
+
+  @Test
+  public void testUpdateIsRetriedIfFailureIsRetryable()
+  {
+    configManager.configUpdateResult
+        = ConfigManager.SetResult.retryableFailure(new Exception("retryable"));
+    resource.addOrUpdateDatasourceCompactionConfig(
+        DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
         mockHttpServletRequest
     );
 
-    // Verify that the update is tried only once
-    Mockito.verify(mockJacksonConfigManager, Mockito.times(1)).set(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any()
+    Assert.assertEquals(
+        CoordinatorCompactionConfigsResource.MAX_UPDATE_RETRIES,
+        configManager.numUpdateAttempts
     );
   }
 
   @Test
-  public void testSetCompactionTaskLimitWithoutExistingConfig()
+  public void testUpdateIsNotRetriedIfFailureIsNotRetryable()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(null);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(null),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(CoordinatorCompactionConfig.empty());
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-
-    double compactionTaskSlotRatio = 0.5;
-    int maxCompactionTaskSlots = 9;
-    Response result = 
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
-        compactionTaskSlotRatio,
-        maxCompactionTaskSlots,
-        true,
+    configManager.configUpdateResult
+        = ConfigManager.SetResult.failure(new Exception("not retryable"));
+    resource.addOrUpdateDatasourceCompactionConfig(
+        DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
         mockHttpServletRequest
     );
-    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
result.getStatus());
-    Assert.assertNull(oldConfigCaptor.getValue());
-    Assert.assertNotNull(newConfigCaptor.getValue());
-    
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), 
maxCompactionTaskSlots);
-    Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
-    Assert.assertEquals(compactionTaskSlotRatio, 
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
+
+    Assert.assertEquals(1, configManager.numUpdateAttempts);
   }
 
   @Test
-  public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
+  public void testGetDatasourceConfigHistory()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(null);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(null),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(CoordinatorCompactionConfig.empty());
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-
-    final DataSourceCompactionConfig newConfig = new 
DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        null,
-        CompactionEngine.MSQ,
-        ImmutableMap.of("key", "val")
-    );
-    String author = "maytas";
-    String comment = "hello";
-    Response result = 
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        newConfig,
-        mockHttpServletRequest
-    );
-    Assert.assertEquals(Response.Status.OK.getStatusCode(), 
result.getStatus());
-    Assert.assertNull(oldConfigCaptor.getValue());
-    Assert.assertNotNull(newConfigCaptor.getValue());
-    Assert.assertEquals(1, 
newConfigCaptor.getValue().getCompactionConfigs().size());
-    Assert.assertEquals(newConfig, 
newConfigCaptor.getValue().getCompactionConfigs().get(0));
-    Assert.assertEquals(newConfig.getEngine(), 
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+    final DataSourceCompactionConfigBuilder builder
+        = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI);
+
+    final DataSourceCompactionConfig configV1 = builder.build();
+    resource.addOrUpdateDatasourceCompactionConfig(configV1, 
mockHttpServletRequest);
+
+    final DataSourceCompactionConfig configV2 = 
builder.withEngine(CompactionEngine.NATIVE).build();
+    resource.addOrUpdateDatasourceCompactionConfig(configV2, 
mockHttpServletRequest);
+
+    final DataSourceCompactionConfig configV3 = builder
+        .withEngine(CompactionEngine.MSQ)
+        .withSkipOffsetFromLatest(Period.hours(1))
+        .build();
+    resource.addOrUpdateDatasourceCompactionConfig(configV3, 
mockHttpServletRequest);
+
+    Response response = resource.getCompactionConfigHistory(DS.WIKI, null, 
null);
+    verifyStatus(Response.Status.OK, response);
+
+    final List<DataSourceCompactionConfigAuditEntry> history
+        = (List<DataSourceCompactionConfigAuditEntry>) response.getEntity();
+    Assert.assertEquals(3, history.size());
+    Assert.assertEquals(configV1, history.get(0).getCompactionConfig());
+    Assert.assertEquals(configV2, history.get(1).getCompactionConfig());
+    Assert.assertEquals(configV3, history.get(2).getCompactionConfig());
   }
 
   @Test
-  public void 
testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull()
+  public void testGetHistoryOfUnknownDatasourceReturnsEmpty()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(null);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(null),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(CoordinatorCompactionConfig.empty());
-    final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
-    final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(
-        CoordinatorCompactionConfig.class);
-    Mockito.when(mockJacksonConfigManager.set(
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-                     oldConfigCaptor.capture(),
-                     newConfigCaptor.capture(),
-                     ArgumentMatchers.any()
-                 )
-    ).thenReturn(ConfigManager.SetResult.ok());
-
-    final DataSourceCompactionConfig newConfig = new 
DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        null,
-        null,
-        ImmutableMap.of("key", "val")
-    );
-    coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        newConfig,
-        mockHttpServletRequest
-    );
-    Assert.assertEquals(null, 
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+    Response response = resource.getCompactionConfigHistory(DS.WIKI, null, 
null);
+    verifyStatus(Response.Status.OK, response);
+    Assert.assertTrue(((List<?>) response.getEntity()).isEmpty());
   }
 
   @Test
-  public void 
testAddOrUpdateCompactionConfigWithInvalidMaxNumTasksForMSQEngine()
+  public void testAddInvalidDatasourceConfigThrowsBadRequest()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(null);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(null),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(CoordinatorCompactionConfig.empty());
-
-    int maxNumTasks = 1;
-
-    final DataSourceCompactionConfig newConfig = new 
DataSourceCompactionConfig(
-        "dataSource",
-        null,
-        500L,
-        null,
-        new Period(3600),
-        null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
-        null,
-        null,
-        null,
-        null,
-        CompactionEngine.MSQ,
-        ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks)
-    );
-    Response response = 
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
-        newConfig,
-        mockHttpServletRequest
-    );
-    
Assert.assertEquals(DruidException.Category.INVALID_INPUT.getExpectedStatus(), 
response.getStatus());
+    final DataSourceCompactionConfig datasourceConfig = 
DataSourceCompactionConfig
+        .builder()
+        .forDataSource(DS.WIKI)
+        
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 
1))
+        .withEngine(CompactionEngine.MSQ)
+        .build();
+
+    final Response response = 
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.BAD_REQUEST, response);
+    Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
     Assert.assertEquals(
-        "Compaction config not supported. Reason[MSQ context maxNumTasks [1] 
cannot be less than 2, "
-        + "since at least 1 controller and 1 worker is necessary.].",
+        "Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]"
+        + " must be at least 2 (1 controller + 1 worker)].",
         ((ErrorResponse) 
response.getEntity()).getUnderlyingException().getMessage()
     );
   }
 
   @Test
-  public void 
testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
+  public void 
testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest()
   {
-    Mockito.when(mockConnector.lookup(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.eq("name"),
-                     ArgumentMatchers.eq("payload"),
-                     
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
-                 )
-    ).thenReturn(null);
-    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
-                     ArgumentMatchers.eq(null),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
-                     ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
-                 )
-    ).thenReturn(CoordinatorCompactionConfig.empty());
-    Response result = 
coordinatorCompactionConfigsResource.deleteCompactionConfig(
-        DATASOURCE_NOT_EXISTS,
+    final DataSourceCompactionConfig datasourceConfig = 
DataSourceCompactionConfig
+        .builder()
+        .forDataSource(DS.WIKI)
+        
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 
1))
+        .build();
+    Response response = 
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, 
mockHttpServletRequest);
+    verifyStatus(Response.Status.OK, response);
+
+    response = resource.updateClusterCompactionConfig(
+        new CompactionConfigUpdateRequest(null, null, null, 
CompactionEngine.MSQ),
         mockHttpServletRequest
     );
-    Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
result.getStatus());
+    verifyStatus(Response.Status.BAD_REQUEST, response);
+    Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
+    Assert.assertEquals(
+        "Cannot update engine to [msq] as it does not support compaction 
config of DataSource[wiki]."
+        + " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 
controller + 1 worker)].",
+        ((ErrorResponse) 
response.getEntity()).getUnderlyingException().getMessage()
+    );
   }
 
-  @Test
-  public void 
testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnEmptyList()
+  @SuppressWarnings("unchecked")
+  private <T> T verifyAndGetPayload(Response response, Class<T> type)
   {
-    Response response = 
coordinatorCompactionConfigsResource.getCompactionConfigHistory(
-        DATASOURCE_NOT_EXISTS,
-        null,
-        null
-    );
     Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus());
-    Assert.assertTrue(((Collection) response.getEntity()).isEmpty());
+
+    Assert.assertTrue(type.isInstance(response.getEntity()));
+    return (T) response.getEntity();
+  }
+
+  private void verifyStatus(Response.Status expectedStatus, Response response)
+  {
+    Assert.assertEquals(expectedStatus.getStatusCode(), response.getStatus());
+  }
+
+  /**
+   * Test implementation of AuditManager that keeps audit entries in memory.
+   */
+  private static class TestAuditManager implements AuditManager
+  {
+    private final List<AuditEntry> audits = new ArrayList<>();
+
+    @Override
+    public void doAudit(AuditEntry event, Handle handle)
+    {
+      // do nothing
+    }
+
+    @Override
+    public void doAudit(AuditEntry event)
+    {
+      final String json;
+      try {
+        json = OBJECT_MAPPER.writeValueAsString(event.getPayload().raw());
+      }
+      catch (JsonProcessingException e) {
+        throw new RuntimeException(e);
+      }
+
+      final AuditEntry eventWithSerializedPayload
+          = AuditEntry.builder()
+                      .key(event.getKey())
+                      .type(event.getType())
+                      .auditInfo(event.getAuditInfo())
+                      .auditTime(event.getAuditTime())
+                      .request(event.getRequest())
+                      .serializedPayload(json)
+                      .build();
+      audits.add(eventWithSerializedPayload);
+    }
+
+    @Override
+    public List<AuditEntry> fetchAuditHistory(String key, String type, 
Interval interval)
+    {
+      return audits;
+    }
+
+    @Override
+    public List<AuditEntry> fetchAuditHistory(String type, int limit)
+    {
+      return audits;
+    }
+
+    @Override
+    public List<AuditEntry> fetchAuditHistory(String type, Interval interval)
+    {
+      return audits;
+    }
+
+    @Override
+    public List<AuditEntry> fetchAuditHistory(String key, String type, int 
limit)
+    {
+      return audits;
+    }
+
+    @Override
+    public int removeAuditLogsOlderThan(long timestamp)
+    {
+      return 0;
+    }
+  }
+
+  /**
+   * Test implementation of CoordinatorConfigManager to track number of update 
attempts.
+   */
+  private static class TestCoordinatorConfigManager extends 
CoordinatorConfigManager
+  {
+    private final ConfigManager delegate;
+    private int numUpdateAttempts;
+    private ConfigManager.SetResult configUpdateResult;
+
+    static TestCoordinatorConfigManager create(AuditManager auditManager)
+    {
+      final MetadataStorageTablesConfig tablesConfig = new 
TestMetadataStorageTablesConfig()
+      {
+        @Override
+        public String getConfigTable()
+        {
+          return "druid_config";
+        }
+      };
+
+      final TestDBConnector dbConnector = new TestDBConnector();
+      final ConfigManager configManager = new ConfigManager(
+          dbConnector,
+          Suppliers.ofInstance(tablesConfig),
+          Suppliers.ofInstance(new TestConfigManagerConfig())
+      );
+
+      return new TestCoordinatorConfigManager(configManager, dbConnector, 
tablesConfig, auditManager);
+    }
+
+    TestCoordinatorConfigManager(
+        ConfigManager configManager,
+        TestDBConnector dbConnector,
+        MetadataStorageTablesConfig tablesConfig,
+        AuditManager auditManager
+    )
+    {
+      super(
+          new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager),
+          dbConnector,
+          tablesConfig
+      );
+      this.delegate = configManager;
+    }
+
+    @Override
+    public ConfigManager.SetResult getAndUpdateCompactionConfig(
+        UnaryOperator<CoordinatorCompactionConfig> operator,
+        AuditInfo auditInfo
+    )
+    {
+      ++numUpdateAttempts;
+      if (configUpdateResult == null) {
+        return super.getAndUpdateCompactionConfig(operator, auditInfo);
+      } else {
+        return configUpdateResult;
+      }
+    }
+  }
+
+  /**
+   * Test implementation for in-memory insert, lookup and compareAndSwap 
operations.
+   */
+  private static class TestDBConnector extends TestMetadataStorageConnector
+  {
+    private final Map<List<String>, byte[]> values = new HashMap<>();
+
+    @Override
+    public Void insertOrUpdate(String tableName, String keyColumn, String 
valueColumn, String key, byte[] value)
+    {
+      values.put(
+          Arrays.asList(tableName, keyColumn, valueColumn, key),
+          value
+      );
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public byte[] lookup(String tableName, String keyColumn, String 
valueColumn, String key)
+    {
+      return values.get(Arrays.asList(tableName, keyColumn, valueColumn, key));
+    }
+
+    @Override
+    public boolean compareAndSwap(List<MetadataCASUpdate> updates)
+    {
+      for (MetadataCASUpdate update : updates) {
+        final List<String> key = Arrays.asList(
+            update.getTableName(),
+            update.getKeyColumn(),
+            update.getValueColumn(),
+            update.getKey()
+        );
+
+        final byte[] currentValue = values.get(key);
+        if (currentValue == null || Arrays.equals(currentValue, 
update.getOldValue())) {
+          values.put(key, update.getNewValue());
+        } else {
+          return false;
+        }
+      }
+
+      return true;
+    }
+  }
+
+  private static class DS
+  {
+    static final String WIKI = "wiki";
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to