This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new caedeb66cd7 Add API to update compaction engine (#16803)
caedeb66cd7 is described below
commit caedeb66cd77ac143980bbe78924303f49de0b06
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 26 20:44:51 2024 -0700
Add API to update compaction engine (#16803)
Changes:
- Add API `/druid/coordinator/v1/config/compaction/global` to update
cluster level compaction config
- Add class `CompactionConfigUpdateRequest`
- Fix bug in `CoordinatorCompactionConfig` which caused compaction engine
to not be persisted.
Use json field name `engine` instead of `compactionEngine` because JSON
field names must align
with the getter name.
- Update MSQ validation error messages
- Complete overhaul of `CoordinatorCompactionConfigResourceTest` to remove
unnecessary mocking
and add more meaningful tests.
- Add `TuningConfigBuilder` to easily build tuning configs for tests.
- Add `DatasourceCompactionConfigBuilder`
---
.../NewestSegmentFirstPolicyBenchmark.java | 21 +-
.../druid/msq/indexing/MSQCompactionRunner.java | 2 +-
.../msq/indexing/MSQCompactionRunnerTest.java | 2 +-
.../common/task/NativeCompactionRunner.java | 2 +-
.../indexing/ClientCompactionRunnerInfo.java | 42 +-
.../CompactionConfigValidationResult.java | 15 +-
.../coordinator/CoordinatorCompactionConfig.java | 17 +-
.../coordinator/DataSourceCompactionConfig.java | 7 +
.../config/DataSourceCompactionConfigBuilder.java | 155 ++++
.../server/http/CompactionConfigUpdateRequest.java | 82 ++
.../http/CoordinatorCompactionConfigsResource.java | 59 +-
.../indexing/ClientCompactionRunnerInfoTest.java | 9 +-
.../CoordinatorCompactionConfigTest.java} | 28 +-
.../DataSourceCompactionConfigTest.java | 372 +++------
.../CoordinatorCompactionConfigsResourceTest.java | 849 +++++++++++----------
15 files changed, 898 insertions(+), 764 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 98c27c4b2b8..87e92ab6fb1 100644
---
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -92,21 +92,12 @@ public class NewestSegmentFirstPolicyBenchmark
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.put(
dataSource,
- new DataSourceCompactionConfig(
- dataSource,
- 0,
- inputSegmentSizeBytes,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- )
+ DataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withTaskPriority(0)
+ .withInputSegmentSizeBytes(inputSegmentSizeBytes)
+ .build()
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index fb25097e980..28ec422e2ac 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -143,7 +143,7 @@ public class MSQCompactionRunner implements CompactionRunner
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
- .orElse(new CompactionConfigValidationResult(true,
null));
+
.orElse(CompactionConfigValidationResult.success());
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index b95243f7783..3f73a9b513e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -211,7 +211,7 @@ public class MSQCompactionRunnerTest
CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "Different name[sum_added] and fieldName(s)[[added]] for aggregator
unsupported for MSQ engine.",
+ "MSQ: Different name[sum_added] and fieldName(s)[[added]] for
aggregator",
validationResult.getReason()
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 30761b674e5..2074d14f0f9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -88,7 +88,7 @@ public class NativeCompactionRunner implements
CompactionRunner
CompactionTask compactionTask
)
{
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
}
/**
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index ed9e22dfaa2..74504184608 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfo
{
CompactionEngine compactionEngine = newConfig.getEngine() == null ?
defaultCompactionEngine : newConfig.getEngine();
if (compactionEngine == CompactionEngine.NATIVE) {
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
} else {
return compactionConfigSupportedByMSQEngine(newConfig);
}
@@ -125,7 +125,7 @@ public class ClientCompactionRunnerInfo
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
- .orElse(new CompactionConfigValidationResult(true,
null));
+
.orElse(CompactionConfigValidationResult.success());
}
/**
@@ -135,22 +135,19 @@ public class ClientCompactionRunnerInfo
{
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
- return new CompactionConfigValidationResult(
- false,
- "Invalid partitionsSpec type[%s] for MSQ engine. Type must be either
'dynamic' or 'range'.",
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or
'range'",
partitionsSpec.getClass().getSimpleName()
);
}
if (partitionsSpec instanceof DynamicPartitionsSpec
&& ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows() != null)
{
- return new CompactionConfigValidationResult(
- false,
- "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ
engine.",
- ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+ return CompactionConfigValidationResult.failure(
+ "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
);
}
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
}
/**
@@ -162,12 +159,11 @@ public class ClientCompactionRunnerInfo
)
{
if (metricsSpec != null && isRollup != null && !isRollup) {
- return new CompactionConfigValidationResult(
- false,
- "rollup in granularitySpec must be set to True if metricsSpec is
specifed for MSQ engine."
+ return CompactionConfigValidationResult.failure(
+ "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is
specified"
);
}
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
}
/**
@@ -179,14 +175,13 @@ public class ClientCompactionRunnerInfo
int maxNumTasks = QueryContext.of(context)
.getInt(ClientMSQContext.CTX_MAX_NUM_TASKS,
ClientMSQContext.DEFAULT_MAX_NUM_TASKS);
if (maxNumTasks < 2) {
- return new CompactionConfigValidationResult(false,
- "MSQ context maxNumTasks
[%,d] cannot be less than 2, "
- + "since at least 1
controller and 1 worker is necessary.",
- maxNumTasks
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Context maxNumTasks[%,d] must be at least 2 (1 controller +
1 worker)",
+ maxNumTasks
);
}
}
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
}
/**
@@ -195,7 +190,7 @@ public class ClientCompactionRunnerInfo
public static CompactionConfigValidationResult
validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
{
if (metricsSpec == null) {
- return new CompactionConfigValidationResult(true, null);
+ return CompactionConfigValidationResult.success();
}
return Arrays.stream(metricsSpec)
.filter(aggregatorFactory ->
@@ -206,11 +201,10 @@ public class ClientCompactionRunnerInfo
.equals(aggregatorFactory.getName())))
.findFirst()
.map(aggregatorFactory ->
- new CompactionConfigValidationResult(
- false,
- "Different name[%s] and fieldName(s)[%s] for
aggregator unsupported for MSQ engine.",
+ CompactionConfigValidationResult.failure(
+ "MSQ: Different name[%s] and fieldName(s)[%s]
for aggregator",
aggregatorFactory.getName(),
aggregatorFactory.requiredFields()
- )).orElse(new CompactionConfigValidationResult(true,
null));
+
)).orElse(CompactionConfigValidationResult.success());
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
index 88eaa3e923a..d482903e0d6 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
@@ -23,10 +23,23 @@ import org.apache.druid.java.util.common.StringUtils;
public class CompactionConfigValidationResult
{
+ private static final CompactionConfigValidationResult SUCCESS
+ = new CompactionConfigValidationResult(true, null);
+
private final boolean valid;
private final String reason;
- public CompactionConfigValidationResult(boolean valid, String format,
Object... args)
+ public static CompactionConfigValidationResult success()
+ {
+ return SUCCESS;
+ }
+
+ public static CompactionConfigValidationResult failure(String msgFormat,
Object... args)
+ {
+ return new CompactionConfigValidationResult(false, msgFormat, args);
+ }
+
+ private CompactionConfigValidationResult(boolean valid, String format,
Object... args)
{
this.valid = valid;
this.reason = format == null ? null : StringUtils.format(format, args);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index 036c53121e9..15e19cdbd77 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.server.http.CompactionConfigUpdateRequest;
import javax.annotation.Nullable;
import java.util.List;
@@ -54,23 +55,21 @@ public class CoordinatorCompactionConfig
baseConfig.compactionTaskSlotRatio,
baseConfig.maxCompactionTaskSlots,
baseConfig.useAutoScaleSlots,
- null
+ baseConfig.compactionEngine
);
}
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
- @Nullable Double compactionTaskSlotRatio,
- @Nullable Integer maxCompactionTaskSlots,
- @Nullable Boolean useAutoScaleSlots
+ CompactionConfigUpdateRequest update
)
{
return new CoordinatorCompactionConfig(
baseConfig.compactionConfigs,
- compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio :
compactionTaskSlotRatio,
- maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots :
maxCompactionTaskSlots,
- useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots :
useAutoScaleSlots,
- null
+ Configs.valueOrDefault(update.getCompactionTaskSlotRatio(),
baseConfig.compactionTaskSlotRatio),
+ Configs.valueOrDefault(update.getMaxCompactionTaskSlots(),
baseConfig.maxCompactionTaskSlots),
+ Configs.valueOrDefault(update.getUseAutoScaleSlots(),
baseConfig.useAutoScaleSlots),
+ Configs.valueOrDefault(update.getCompactionEngine(),
baseConfig.compactionEngine)
);
}
@@ -90,7 +89,7 @@ public class CoordinatorCompactionConfig
@JsonProperty("compactionTaskSlotRatio") @Nullable Double
compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer
maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
- @JsonProperty("compactionEngine") @Nullable CompactionEngine
compactionEngine
+ @JsonProperty("engine") @Nullable CompactionEngine compactionEngine
)
{
this.compactionConfigs = compactionConfigs;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 767e8218f31..91af8b6267d 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import
org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
import org.joda.time.Period;
import javax.annotation.Nullable;
@@ -42,6 +43,12 @@ public class DataSourceCompactionConfig
private final String dataSource;
private final int taskPriority;
private final long inputSegmentSizeBytes;
+
+ public static DataSourceCompactionConfigBuilder builder()
+ {
+ return new DataSourceCompactionConfigBuilder();
+ }
+
/**
* The number of input segments is limited because the byte size of a
serialized task spec is limited by
*
org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig.maxZnodeBytes.
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
new file mode 100644
index 00000000000..9cb00dfa614
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/config/DataSourceCompactionConfigBuilder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.config;
+
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
+import org.joda.time.Period;
+
+import java.util.Map;
+
+public class DataSourceCompactionConfigBuilder
+{
+ private String dataSource;
+ private Integer taskPriority;
+ private Long inputSegmentSizeBytes;
+ private Integer maxRowsPerSegment;
+ private Period skipOffsetFromLatest;
+ private UserCompactionTaskQueryTuningConfig tuningConfig;
+ private UserCompactionTaskGranularityConfig granularitySpec;
+ private UserCompactionTaskDimensionsConfig dimensionsSpec;
+ private AggregatorFactory[] metricsSpec;
+ private UserCompactionTaskTransformConfig transformSpec;
+ private UserCompactionTaskIOConfig ioConfig;
+ private CompactionEngine engine;
+ private Map<String, Object> taskContext;
+
+ public DataSourceCompactionConfig build()
+ {
+ return new DataSourceCompactionConfig(
+ dataSource,
+ taskPriority,
+ inputSegmentSizeBytes,
+ maxRowsPerSegment,
+ skipOffsetFromLatest,
+ tuningConfig,
+ granularitySpec,
+ dimensionsSpec,
+ metricsSpec,
+ transformSpec,
+ ioConfig,
+ engine,
+ taskContext
+ );
+ }
+
+ public DataSourceCompactionConfigBuilder forDataSource(String dataSource)
+ {
+ this.dataSource = dataSource;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withTaskPriority(Integer
taskPriority)
+ {
+ this.taskPriority = taskPriority;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withInputSegmentSizeBytes(Long
inputSegmentSizeBytes)
+ {
+ this.inputSegmentSizeBytes = inputSegmentSizeBytes;
+ return this;
+ }
+
+ @Deprecated
+ public DataSourceCompactionConfigBuilder withMaxRowsPerSegment(Integer
maxRowsPerSegment)
+ {
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withSkipOffsetFromLatest(Period
skipOffsetFromLatest)
+ {
+ this.skipOffsetFromLatest = skipOffsetFromLatest;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withTuningConfig(
+ UserCompactionTaskQueryTuningConfig tuningConfig
+ )
+ {
+ this.tuningConfig = tuningConfig;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withGranularitySpec(
+ UserCompactionTaskGranularityConfig granularitySpec
+ )
+ {
+ this.granularitySpec = granularitySpec;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withDimensionsSpec(
+ UserCompactionTaskDimensionsConfig dimensionsSpec
+ )
+ {
+ this.dimensionsSpec = dimensionsSpec;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withMetricsSpec(AggregatorFactory[]
metricsSpec)
+ {
+ this.metricsSpec = metricsSpec;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withTransformSpec(
+ UserCompactionTaskTransformConfig transformSpec
+ )
+ {
+ this.transformSpec = transformSpec;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder
withIoConfig(UserCompactionTaskIOConfig ioConfig)
+ {
+ this.ioConfig = ioConfig;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withEngine(CompactionEngine engine)
+ {
+ this.engine = engine;
+ return this;
+ }
+
+ public DataSourceCompactionConfigBuilder withTaskContext(Map<String, Object>
taskContext)
+ {
+ this.taskContext = taskContext;
+ return this;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
new file mode 100644
index 00000000000..72ab23fde8a
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.CompactionEngine;
+
+import javax.annotation.Nullable;
+
+/**
+ * Payload to update the cluster-level compaction config.
+ * All fields of this class must be nullable. A non-value indicates that the
+ * corresponding field is being updated.
+ */
+public class CompactionConfigUpdateRequest
+{
+ private final Double compactionTaskSlotRatio;
+ private final Integer maxCompactionTaskSlots;
+ private final Boolean useAutoScaleSlots;
+ private final CompactionEngine compactionEngine;
+
+ @JsonCreator
+ public CompactionConfigUpdateRequest(
+ @JsonProperty("compactionTaskSlotRatio") @Nullable Double
compactionTaskSlotRatio,
+ @JsonProperty("maxCompactionTaskSlots") @Nullable Integer
maxCompactionTaskSlots,
+ @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots,
+ @JsonProperty("compactionEngine") @Nullable CompactionEngine
compactionEngine
+ )
+ {
+ this.compactionTaskSlotRatio = compactionTaskSlotRatio;
+ this.maxCompactionTaskSlots = maxCompactionTaskSlots;
+ this.useAutoScaleSlots = useAutoScaleSlots;
+ this.compactionEngine = compactionEngine;
+ }
+
+ @Nullable
+ @JsonProperty
+ public Double getCompactionTaskSlotRatio()
+ {
+ return compactionTaskSlotRatio;
+ }
+
+ @Nullable
+ @JsonProperty
+ public Integer getMaxCompactionTaskSlots()
+ {
+ return maxCompactionTaskSlots;
+ }
+
+ @Nullable
+ @JsonProperty
+ public Boolean getUseAutoScaleSlots()
+ {
+ return useAutoScaleSlots;
+ }
+
+ @Nullable
+ @JsonProperty
+ public CompactionEngine getCompactionEngine()
+ {
+ return compactionEngine;
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index 0bba5cf63fa..e8baf825090 100644
---
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -40,6 +40,7 @@ import
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
@@ -69,7 +70,7 @@ public class CoordinatorCompactionConfigsResource
{
private static final Logger LOG = new
Logger(CoordinatorCompactionConfigsResource.class);
private static final long UPDATE_RETRY_DELAY = 1000;
- static final int UPDATE_NUM_RETRY = 5;
+ static final int MAX_UPDATE_RETRIES = 5;
private final CoordinatorConfigManager configManager;
private final AuditManager auditManager;
@@ -86,11 +87,46 @@ public class CoordinatorCompactionConfigsResource
@GET
@Produces(MediaType.APPLICATION_JSON)
- public Response getCompactionConfig()
+ public Response getClusterCompactionConfig()
{
return Response.ok(configManager.getCurrentCompactionConfig()).build();
}
+ @POST
+ @Path("/global")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateClusterCompactionConfig(
+ CompactionConfigUpdateRequest updatePayload,
+ @Context HttpServletRequest req
+ )
+ {
+ UnaryOperator<CoordinatorCompactionConfig> operator = current -> {
+ final CoordinatorCompactionConfig newConfig =
CoordinatorCompactionConfig.from(current, updatePayload);
+
+ final List<DataSourceCompactionConfig> datasourceConfigs =
newConfig.getCompactionConfigs();
+ if (CollectionUtils.isNullOrEmpty(datasourceConfigs)
+ || current.getEngine() == newConfig.getEngine()) {
+ return newConfig;
+ }
+
+ // Validate all the datasource configs against the new engine
+ for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) {
+ CompactionConfigValidationResult validationResult =
+
ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig,
newConfig.getEngine());
+ if (!validationResult.isValid()) {
+ throw InvalidInput.exception(
+ "Cannot update engine to [%s] as it does not support"
+ + " compaction config of DataSource[%s]. Reason[%s].",
+ newConfig.getEngine(), datasourceConfig.getDataSource(),
validationResult.getReason()
+ );
+ }
+ }
+
+ return newConfig;
+ };
+ return updateConfigHelper(operator,
AuthorizationUtils.buildAuditInfo(req));
+ }
+
@POST
@Path("/taskslots")
@Consumes(MediaType.APPLICATION_JSON)
@@ -101,19 +137,20 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req
)
{
- UnaryOperator<CoordinatorCompactionConfig> operator =
- current -> CoordinatorCompactionConfig.from(
- current,
+ return updateClusterCompactionConfig(
+ new CompactionConfigUpdateRequest(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
- useAutoScaleSlots
- );
- return updateConfigHelper(operator,
AuthorizationUtils.buildAuditInfo(req));
+ useAutoScaleSlots,
+ null
+ ),
+ req
+ );
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
- public Response addOrUpdateCompactionConfig(
+ public Response addOrUpdateDatasourceCompactionConfig(
final DataSourceCompactionConfig newConfig,
@Context HttpServletRequest req
)
@@ -144,7 +181,7 @@ public class CoordinatorCompactionConfigsResource
@GET
@Path("/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
- public Response getCompactionConfig(@PathParam("dataSource") String
dataSource)
+ public Response getDatasourceCompactionConfig(@PathParam("dataSource")
String dataSource)
{
final CoordinatorCompactionConfig current =
configManager.getCurrentCompactionConfig();
final Map<String, DataSourceCompactionConfig> configs = current
@@ -233,7 +270,7 @@ public class CoordinatorCompactionConfigsResource
int attemps = 0;
SetResult setResult = null;
try {
- while (attemps < UPDATE_NUM_RETRY) {
+ while (attemps < MAX_UPDATE_RETRIES) {
setResult = configManager.getAndUpdateCompactionConfig(configOperator,
auditInfo);
if (setResult.isOk() || !setResult.isRetryable()) {
break;
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index f6d4a2b6e58..01daa1da4af 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -64,8 +64,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "Invalid partitionsSpec type[HashedPartitionsSpec] for MSQ engine."
- + " Type must be either 'dynamic' or 'range'.",
+ "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either
'dynamic' or 'range'",
validationResult.getReason()
);
}
@@ -85,7 +84,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "maxTotalRows[100] in DynamicPartitionsSpec not supported for MSQ
engine.",
+ "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning",
validationResult.getReason()
);
}
@@ -144,7 +143,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "rollup in granularitySpec must be set to True if metricsSpec is
specifed for MSQ engine.",
+ "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is
specified",
validationResult.getReason()
);
}
@@ -167,7 +166,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "Different name[sum_added] and fieldName(s)[[added]] for aggregator
unsupported for MSQ engine.",
+ "MSQ: Different name[sum_added] and fieldName(s)[[added]] for
aggregator",
validationResult.getReason()
);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
similarity index 57%
copy from
server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
copy to
server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
index 88eaa3e923a..6f39f552791 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionConfigValidationResult.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfigTest.java
@@ -19,26 +19,22 @@
package org.apache.druid.server.coordinator;
-import org.apache.druid.java.util.common.StringUtils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
-public class CompactionConfigValidationResult
+public class CoordinatorCompactionConfigTest
{
- private final boolean valid;
- private final String reason;
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
- public CompactionConfigValidationResult(boolean valid, String format,
Object... args)
+ @Test
+ public void testSerdeDefaultConfig() throws Exception
{
- this.valid = valid;
- this.reason = format == null ? null : StringUtils.format(format, args);
- }
+ final CoordinatorCompactionConfig defaultConfig =
CoordinatorCompactionConfig.empty();
+ final String json = MAPPER.writeValueAsString(defaultConfig);
- public boolean isValid()
- {
- return valid;
- }
-
- public String getReason()
- {
- return reason;
+ CoordinatorCompactionConfig deserialized = MAPPER.readValue(json,
CoordinatorCompactionConfig.class);
+ Assert.assertEquals(defaultConfig, deserialized);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index a9334f077a4..67f276f7651 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -42,9 +42,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.io.IOException;
@@ -52,27 +50,15 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
{
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
- @Rule
- public final ExpectedException expectedException = ExpectedException.none();
-
@Test
public void testSerdeBasic() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- null,
- null,
- new Period(3600),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -90,21 +76,15 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeWithMaxRowsPerSegment() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- 30,
- new Period(3600),
- null,
- null,
- null,
- null,
- null,
- null,
- CompactionEngine.MSQ,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withMaxRowsPerSegment(30)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withEngine(CompactionEngine.MSQ)
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -121,41 +101,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeWithMaxTotalRows() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- new UserCompactionTaskQueryTuningConfig(
- null,
- null,
- null,
- 10000L,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- ),
- null,
- null,
- null,
- null,
- null,
- CompactionEngine.NATIVE,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withEngine(CompactionEngine.NATIVE)
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -172,42 +125,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- 10000,
- new Period(3600),
- new UserCompactionTaskQueryTuningConfig(
- null,
- null,
- null,
- 10000L,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
-
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withMaxRowsPerSegment(10000)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -301,21 +226,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpec() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -332,21 +250,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithQueryGranularity() throws Exception
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(null, Granularities.YEAR,
null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new UserCompactionTaskGranularityConfig(null,
Granularities.YEAR, null))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -366,21 +277,13 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeWithNullGranularitySpec() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -397,21 +300,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithNullValues() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(null, null, null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new UserCompactionTaskGranularityConfig(null,
null, null))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -428,21 +324,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeGranularitySpecWithRollup() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(null, null, true),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new UserCompactionTaskGranularityConfig(null,
null, true))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -462,21 +351,15 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- new UserCompactionTaskIOConfig(true),
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .withIoConfig(new UserCompactionTaskIOConfig(true))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -494,21 +377,15 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeIOConfigWithNullDropExisting() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- new UserCompactionTaskIOConfig(null),
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withGranularitySpec(new
UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))
+ .withIoConfig(new UserCompactionTaskIOConfig(null))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -526,21 +403,18 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeDimensionsSpec() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- null,
- new
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withDimensionsSpec(
+ new UserCompactionTaskDimensionsConfig(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))
+ )
+ )
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -558,21 +432,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
public void testSerdeTransformSpec() throws IOException
{
NullHandling.initializeForTests();
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- null,
- null,
- null,
- new UserCompactionTaskTransformConfig(new SelectorDimFilter("dim1",
"foo", null)),
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withTransformSpec(new UserCompactionTaskTransformConfig(new
SelectorDimFilter("dim1", "foo", null)))
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
@@ -589,21 +456,14 @@ public class DataSourceCompactionConfigTest extends
InitializedNullHandlingTest
@Test
public void testSerdeMetricsSpec() throws IOException
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- null,
- null,
- new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
+ final DataSourceCompactionConfig config = DataSourceCompactionConfig
+ .builder()
+ .forDataSource("dataSource")
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(new Period(3600))
+ .withMetricsSpec(new AggregatorFactory[]{new
CountAggregatorFactory("cnt")})
+ .withTaskContext(ImmutableMap.of("key", "val"))
+ .build();
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 17db2285477..5be533c0e46 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -19,528 +19,529 @@
package org.apache.druid.server.http;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Suppliers;
+import org.apache.druid.audit.AuditEntry;
+import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.common.config.TestConfigManagerConfig;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.CompactionEngine;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataCASUpdate;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.TestMetadataStorageConnector;
+import org.apache.druid.metadata.TestMetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import
org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import
org.apache.druid.server.coordinator.config.DataSourceCompactionConfigBuilder;
+import org.joda.time.Interval;
import org.joda.time.Period;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.skife.jdbi.v2.Handle;
+import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
@RunWith(MockitoJUnitRunner.class)
public class CoordinatorCompactionConfigsResourceTest
{
- private static final DataSourceCompactionConfig OLD_CONFIG = new
DataSourceCompactionConfig(
- "oldDataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
- private static final DataSourceCompactionConfig NEW_CONFIG = new
DataSourceCompactionConfig(
- "newDataSource",
- null,
- 500L,
- null,
- new Period(1800),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
- private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
-
- private static final CoordinatorCompactionConfig ORIGINAL_CONFIG
- = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
-
- private static final String DATASOURCE_NOT_EXISTS = "notExists";
-
- @Mock
- private JacksonConfigManager mockJacksonConfigManager;
+ private static final double DELTA = 1e-9;
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@Mock
private HttpServletRequest mockHttpServletRequest;
- @Mock
- private MetadataStorageConnector mockConnector;
-
- @Mock
- private MetadataStorageTablesConfig mockConnectorConfig;
-
- @Mock
- private AuditManager mockAuditManager;
-
- private CoordinatorCompactionConfigsResource
coordinatorCompactionConfigsResource;
+ private TestCoordinatorConfigManager configManager;
+ private CoordinatorCompactionConfigsResource resource;
@Before
public void setup()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(OLD_CONFIG_IN_BYTES);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(ORIGINAL_CONFIG);
-
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
- Mockito.when(mockAuditManager.fetchAuditHistory(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- ArgumentMatchers.any()
- )
- ).thenReturn(ImmutableList.of());
- coordinatorCompactionConfigsResource = new
CoordinatorCompactionConfigsResource(
- new CoordinatorConfigManager(mockJacksonConfigManager, mockConnector,
mockConnectorConfig),
- mockAuditManager
- );
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
+ final AuditManager auditManager = new TestAuditManager();
+ configManager = TestCoordinatorConfigManager.create(auditManager);
+ resource = new CoordinatorCompactionConfigsResource(configManager,
auditManager);
+ configManager.delegate.start();
+ }
+
+ @After
+ public void tearDown()
+ {
+ configManager.delegate.stop();
}
@Test
- public void testSetCompactionTaskLimitWithExistingConfig()
+ public void testGetDefaultClusterConfig()
{
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
-
- double compactionTaskSlotRatio = 0.5;
- int maxCompactionTaskSlots = 9;
- Response result =
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
- compactionTaskSlotRatio,
- maxCompactionTaskSlots,
- true,
- mockHttpServletRequest
- );
- Assert.assertEquals(Response.Status.OK.getStatusCode(),
result.getStatus());
- Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
- Assert.assertNotNull(newConfigCaptor.getValue());
-
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
- Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
- Assert.assertEquals(compactionTaskSlotRatio,
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
+ Response response = resource.getClusterCompactionConfig();
+ final CoordinatorCompactionConfig defaultConfig
+ = verifyAndGetPayload(response, CoordinatorCompactionConfig.class);
+
+ Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(),
DELTA);
+ Assert.assertEquals(Integer.MAX_VALUE,
defaultConfig.getMaxCompactionTaskSlots());
+ Assert.assertFalse(defaultConfig.isUseAutoScaleSlots());
+ Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty());
+ Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
}
@Test
- public void testAddOrUpdateCompactionConfigWithExistingConfig()
+ public void testUpdateGlobalConfig()
{
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
-
- final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
true),
- null,
- null,
- null,
- null,
- CompactionEngine.NATIVE,
- ImmutableMap.of("key", "val")
- );
- Response result =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- newConfig,
+ Response response = resource.updateClusterCompactionConfig(
+ new CompactionConfigUpdateRequest(0.5, 10, true, CompactionEngine.MSQ),
mockHttpServletRequest
);
- Assert.assertEquals(Response.Status.OK.getStatusCode(),
result.getStatus());
- Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
- Assert.assertNotNull(newConfigCaptor.getValue());
- Assert.assertEquals(2,
newConfigCaptor.getValue().getCompactionConfigs().size());
- Assert.assertEquals(OLD_CONFIG,
newConfigCaptor.getValue().getCompactionConfigs().get(0));
- Assert.assertEquals(newConfig,
newConfigCaptor.getValue().getCompactionConfigs().get(1));
- Assert.assertEquals(newConfig.getEngine(),
newConfigCaptor.getValue().getEngine());
+ verifyStatus(Response.Status.OK, response);
+
+ final CoordinatorCompactionConfig updatedConfig = verifyAndGetPayload(
+ resource.getClusterCompactionConfig(),
+ CoordinatorCompactionConfig.class
+ );
+
+ Assert.assertNotNull(updatedConfig);
+ Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(),
DELTA);
+ Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots());
+ Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
+ Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine());
}
@Test
- public void testDeleteCompactionConfigWithExistingConfig()
+ public void testSetCompactionTaskLimit()
{
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
- final String datasourceName = "dataSource";
- final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig(
- datasourceName,
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
- final CoordinatorCompactionConfig originalConfig =
CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(originalConfig);
-
- Response result =
coordinatorCompactionConfigsResource.deleteCompactionConfig(
- datasourceName,
- mockHttpServletRequest
- );
- Assert.assertEquals(Response.Status.OK.getStatusCode(),
result.getStatus());
- Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
- Assert.assertNotNull(newConfigCaptor.getValue());
- Assert.assertEquals(0,
newConfigCaptor.getValue().getCompactionConfigs().size());
+ final CoordinatorCompactionConfig defaultConfig
+ = verifyAndGetPayload(resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class);
+
+ Response response = resource.setCompactionTaskLimit(0.5, 9, true,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
+
+ final CoordinatorCompactionConfig updatedConfig
+ = verifyAndGetPayload(resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class);
+
+ // Verify that the task slot fields have been updated
+ Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(),
DELTA);
+ Assert.assertEquals(9, updatedConfig.getMaxCompactionTaskSlots());
+ Assert.assertTrue(updatedConfig.isUseAutoScaleSlots());
+
+ // Verify that the other fields are unchanged
+ Assert.assertEquals(defaultConfig.getCompactionConfigs(),
updatedConfig.getCompactionConfigs());
+ Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine());
+ }
+
+ @Test
+ public void testGetUnknownDatasourceConfigThrowsNotFound()
+ {
+ Response response = resource.getDatasourceCompactionConfig(DS.WIKI);
+ verifyStatus(Response.Status.NOT_FOUND, response);
+ }
+
+ @Test
+ public void testAddDatasourceConfig()
+ {
+ final DataSourceCompactionConfig newDatasourceConfig
+ = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
+ Response response =
resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
+
+ final DataSourceCompactionConfig fetchedDatasourceConfig
+ = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI),
DataSourceCompactionConfig.class);
+ Assert.assertEquals(newDatasourceConfig, fetchedDatasourceConfig);
+
+ final CoordinatorCompactionConfig fullCompactionConfig
+ = verifyAndGetPayload(resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class);
+ Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
+ Assert.assertEquals(newDatasourceConfig,
fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
- public void testUpdateShouldRetryIfRetryableException()
+ public void testUpdateDatasourceConfig()
{
- Mockito.when(
- mockJacksonConfigManager.set(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
+ final DataSourceCompactionConfig originalDatasourceConfig =
DataSourceCompactionConfig
+ .builder()
+ .forDataSource(DS.WIKI)
+ .withInputSegmentSizeBytes(500L)
+ .withSkipOffsetFromLatest(Period.hours(1))
+ .withGranularitySpec(
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
true)
)
- ).thenReturn(ConfigManager.SetResult.retryableFailure(new
ISE("retryable")));
+ .withEngine(CompactionEngine.NATIVE)
+ .build();
- coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- NEW_CONFIG,
+ Response response = resource.addOrUpdateDatasourceCompactionConfig(
+ originalDatasourceConfig,
mockHttpServletRequest
);
+ verifyStatus(Response.Status.OK, response);
+
+ final DataSourceCompactionConfig updatedDatasourceConfig =
DataSourceCompactionConfig
+ .builder()
+ .forDataSource(DS.WIKI)
+ .withInputSegmentSizeBytes(1000L)
+ .withSkipOffsetFromLatest(Period.hours(3))
+ .withGranularitySpec(
+ new UserCompactionTaskGranularityConfig(Granularities.DAY, null,
true)
+ )
+ .withEngine(CompactionEngine.MSQ)
+ .build();
- // Verify that the update is retried upto the max number of retries
- Mockito.verify(
- mockJacksonConfigManager,
- Mockito.times(CoordinatorCompactionConfigsResource.UPDATE_NUM_RETRY)
- ).set(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
- );
+ response =
resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
+
+ final DataSourceCompactionConfig latestDatasourceConfig
+ = verifyAndGetPayload(resource.getDatasourceCompactionConfig(DS.WIKI),
DataSourceCompactionConfig.class);
+ Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig);
+
+ final CoordinatorCompactionConfig fullCompactionConfig
+ = verifyAndGetPayload(resource.getClusterCompactionConfig(),
CoordinatorCompactionConfig.class);
+ Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size());
+ Assert.assertEquals(updatedDatasourceConfig,
fullCompactionConfig.getCompactionConfigs().get(0));
}
@Test
- public void testUpdateShouldNotRetryIfNotRetryableException()
+ public void testDeleteDatasourceConfig()
{
- Mockito.when(
- mockJacksonConfigManager.set(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.failure(new ISE("retryable")));
+ final DataSourceCompactionConfig datasourceConfig
+ = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build();
+ Response response =
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
- coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- NEW_CONFIG,
+ response = resource.deleteCompactionConfig(DS.WIKI,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
+
+ response = resource.getDatasourceCompactionConfig(DS.WIKI);
+ verifyStatus(Response.Status.NOT_FOUND, response);
+ }
+
+ @Test
+ public void testDeleteUnknownDatasourceConfigThrowsNotFound()
+ {
+ Response response = resource.deleteCompactionConfig(DS.WIKI,
mockHttpServletRequest);
+ verifyStatus(Response.Status.NOT_FOUND, response);
+ }
+
+ @Test
+ public void testUpdateIsRetriedIfFailureIsRetryable()
+ {
+ configManager.configUpdateResult
+ = ConfigManager.SetResult.retryableFailure(new Exception("retryable"));
+ resource.addOrUpdateDatasourceCompactionConfig(
+ DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
mockHttpServletRequest
);
- // Verify that the update is tried only once
- Mockito.verify(mockJacksonConfigManager, Mockito.times(1)).set(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
+ Assert.assertEquals(
+ CoordinatorCompactionConfigsResource.MAX_UPDATE_RETRIES,
+ configManager.numUpdateAttempts
);
}
@Test
- public void testSetCompactionTaskLimitWithoutExistingConfig()
+ public void testUpdateIsNotRetriedIfFailureIsNotRetryable()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(null);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(null),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(CoordinatorCompactionConfig.empty());
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
-
- double compactionTaskSlotRatio = 0.5;
- int maxCompactionTaskSlots = 9;
- Response result =
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
- compactionTaskSlotRatio,
- maxCompactionTaskSlots,
- true,
+ configManager.configUpdateResult
+ = ConfigManager.SetResult.failure(new Exception("not retryable"));
+ resource.addOrUpdateDatasourceCompactionConfig(
+ DataSourceCompactionConfig.builder().forDataSource(DS.WIKI).build(),
mockHttpServletRequest
);
- Assert.assertEquals(Response.Status.OK.getStatusCode(),
result.getStatus());
- Assert.assertNull(oldConfigCaptor.getValue());
- Assert.assertNotNull(newConfigCaptor.getValue());
-
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
- Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
- Assert.assertEquals(compactionTaskSlotRatio,
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
+
+ Assert.assertEquals(1, configManager.numUpdateAttempts);
}
@Test
- public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
+ public void testGetDatasourceConfigHistory()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(null);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(null),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(CoordinatorCompactionConfig.empty());
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
-
- final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- null,
- CompactionEngine.MSQ,
- ImmutableMap.of("key", "val")
- );
- String author = "maytas";
- String comment = "hello";
- Response result =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- newConfig,
- mockHttpServletRequest
- );
- Assert.assertEquals(Response.Status.OK.getStatusCode(),
result.getStatus());
- Assert.assertNull(oldConfigCaptor.getValue());
- Assert.assertNotNull(newConfigCaptor.getValue());
- Assert.assertEquals(1,
newConfigCaptor.getValue().getCompactionConfigs().size());
- Assert.assertEquals(newConfig,
newConfigCaptor.getValue().getCompactionConfigs().get(0));
- Assert.assertEquals(newConfig.getEngine(),
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+ final DataSourceCompactionConfigBuilder builder
+ = DataSourceCompactionConfig.builder().forDataSource(DS.WIKI);
+
+ final DataSourceCompactionConfig configV1 = builder.build();
+ resource.addOrUpdateDatasourceCompactionConfig(configV1,
mockHttpServletRequest);
+
+ final DataSourceCompactionConfig configV2 =
builder.withEngine(CompactionEngine.NATIVE).build();
+ resource.addOrUpdateDatasourceCompactionConfig(configV2,
mockHttpServletRequest);
+
+ final DataSourceCompactionConfig configV3 = builder
+ .withEngine(CompactionEngine.MSQ)
+ .withSkipOffsetFromLatest(Period.hours(1))
+ .build();
+ resource.addOrUpdateDatasourceCompactionConfig(configV3,
mockHttpServletRequest);
+
+ Response response = resource.getCompactionConfigHistory(DS.WIKI, null,
null);
+ verifyStatus(Response.Status.OK, response);
+
+ final List<DataSourceCompactionConfigAuditEntry> history
+ = (List<DataSourceCompactionConfigAuditEntry>) response.getEntity();
+ Assert.assertEquals(3, history.size());
+ Assert.assertEquals(configV1, history.get(0).getCompactionConfig());
+ Assert.assertEquals(configV2, history.get(1).getCompactionConfig());
+ Assert.assertEquals(configV3, history.get(2).getCompactionConfig());
}
@Test
- public void
testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull()
+ public void testGetHistoryOfUnknownDatasourceReturnsEmpty()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(null);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(null),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(CoordinatorCompactionConfig.empty());
- final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
- final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
- CoordinatorCompactionConfig.class);
- Mockito.when(mockJacksonConfigManager.set(
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- oldConfigCaptor.capture(),
- newConfigCaptor.capture(),
- ArgumentMatchers.any()
- )
- ).thenReturn(ConfigManager.SetResult.ok());
-
- final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- null,
- null,
- ImmutableMap.of("key", "val")
- );
- coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- newConfig,
- mockHttpServletRequest
- );
- Assert.assertEquals(null,
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+ Response response = resource.getCompactionConfigHistory(DS.WIKI, null,
null);
+ verifyStatus(Response.Status.OK, response);
+ Assert.assertTrue(((List<?>) response.getEntity()).isEmpty());
}
@Test
- public void
testAddOrUpdateCompactionConfigWithInvalidMaxNumTasksForMSQEngine()
+ public void testAddInvalidDatasourceConfigThrowsBadRequest()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(null);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(null),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(CoordinatorCompactionConfig.empty());
-
- int maxNumTasks = 1;
-
- final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
- "dataSource",
- null,
- 500L,
- null,
- new Period(3600),
- null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
- null,
- null,
- null,
- null,
- CompactionEngine.MSQ,
- ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks)
- );
- Response response =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
- newConfig,
- mockHttpServletRequest
- );
-
Assert.assertEquals(DruidException.Category.INVALID_INPUT.getExpectedStatus(),
response.getStatus());
+ final DataSourceCompactionConfig datasourceConfig =
DataSourceCompactionConfig
+ .builder()
+ .forDataSource(DS.WIKI)
+
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS,
1))
+ .withEngine(CompactionEngine.MSQ)
+ .build();
+
+ final Response response =
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig,
mockHttpServletRequest);
+ verifyStatus(Response.Status.BAD_REQUEST, response);
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
Assert.assertEquals(
- "Compaction config not supported. Reason[MSQ context maxNumTasks [1]
cannot be less than 2, "
- + "since at least 1 controller and 1 worker is necessary.].",
+ "Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]"
+ + " must be at least 2 (1 controller + 1 worker)].",
((ErrorResponse)
response.getEntity()).getUnderlyingException().getMessage()
);
}
@Test
- public void
testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
+ public void
testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest()
{
- Mockito.when(mockConnector.lookup(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq("name"),
- ArgumentMatchers.eq("payload"),
-
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
- )
- ).thenReturn(null);
- Mockito.when(mockJacksonConfigManager.convertByteToConfig(
- ArgumentMatchers.eq(null),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
- ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
- )
- ).thenReturn(CoordinatorCompactionConfig.empty());
- Response result =
coordinatorCompactionConfigsResource.deleteCompactionConfig(
- DATASOURCE_NOT_EXISTS,
+ final DataSourceCompactionConfig datasourceConfig =
DataSourceCompactionConfig
+ .builder()
+ .forDataSource(DS.WIKI)
+
.withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS,
1))
+ .build();
+ Response response =
resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig,
mockHttpServletRequest);
+ verifyStatus(Response.Status.OK, response);
+
+ response = resource.updateClusterCompactionConfig(
+ new CompactionConfigUpdateRequest(null, null, null,
CompactionEngine.MSQ),
mockHttpServletRequest
);
- Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
result.getStatus());
+ verifyStatus(Response.Status.BAD_REQUEST, response);
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
+ Assert.assertEquals(
+ "Cannot update engine to [msq] as it does not support compaction
config of DataSource[wiki]."
+ + " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1
controller + 1 worker)].",
+ ((ErrorResponse)
response.getEntity()).getUnderlyingException().getMessage()
+ );
}
- @Test
- public void
testGetCompactionConfigHistoryForUnknownDataSourceShouldReturnEmptyList()
+ @SuppressWarnings("unchecked")
+ private <T> T verifyAndGetPayload(Response response, Class<T> type)
{
- Response response =
coordinatorCompactionConfigsResource.getCompactionConfigHistory(
- DATASOURCE_NOT_EXISTS,
- null,
- null
- );
Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
- Assert.assertTrue(((Collection) response.getEntity()).isEmpty());
+
+ Assert.assertTrue(type.isInstance(response.getEntity()));
+ return (T) response.getEntity();
+ }
+
+ private void verifyStatus(Response.Status expectedStatus, Response response)
+ {
+ Assert.assertEquals(expectedStatus.getStatusCode(), response.getStatus());
+ }
+
+ /**
+ * Test implementation of AuditManager that keeps audit entries in memory.
+ */
+ private static class TestAuditManager implements AuditManager
+ {
+ private final List<AuditEntry> audits = new ArrayList<>();
+
+ @Override
+ public void doAudit(AuditEntry event, Handle handle)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void doAudit(AuditEntry event)
+ {
+ final String json;
+ try {
+ json = OBJECT_MAPPER.writeValueAsString(event.getPayload().raw());
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ final AuditEntry eventWithSerializedPayload
+ = AuditEntry.builder()
+ .key(event.getKey())
+ .type(event.getType())
+ .auditInfo(event.getAuditInfo())
+ .auditTime(event.getAuditTime())
+ .request(event.getRequest())
+ .serializedPayload(json)
+ .build();
+ audits.add(eventWithSerializedPayload);
+ }
+
+ @Override
+ public List<AuditEntry> fetchAuditHistory(String key, String type,
Interval interval)
+ {
+ return audits;
+ }
+
+ @Override
+ public List<AuditEntry> fetchAuditHistory(String type, int limit)
+ {
+ return audits;
+ }
+
+ @Override
+ public List<AuditEntry> fetchAuditHistory(String type, Interval interval)
+ {
+ return audits;
+ }
+
+ @Override
+ public List<AuditEntry> fetchAuditHistory(String key, String type, int
limit)
+ {
+ return audits;
+ }
+
+ @Override
+ public int removeAuditLogsOlderThan(long timestamp)
+ {
+ return 0;
+ }
+ }
+
+ /**
+ * Test implementation of CoordinatorConfigManager to track number of update
attempts.
+ */
+ private static class TestCoordinatorConfigManager extends
CoordinatorConfigManager
+ {
+ private final ConfigManager delegate;
+ private int numUpdateAttempts;
+ private ConfigManager.SetResult configUpdateResult;
+
+ static TestCoordinatorConfigManager create(AuditManager auditManager)
+ {
+ final MetadataStorageTablesConfig tablesConfig = new
TestMetadataStorageTablesConfig()
+ {
+ @Override
+ public String getConfigTable()
+ {
+ return "druid_config";
+ }
+ };
+
+ final TestDBConnector dbConnector = new TestDBConnector();
+ final ConfigManager configManager = new ConfigManager(
+ dbConnector,
+ Suppliers.ofInstance(tablesConfig),
+ Suppliers.ofInstance(new TestConfigManagerConfig())
+ );
+
+ return new TestCoordinatorConfigManager(configManager, dbConnector,
tablesConfig, auditManager);
+ }
+
+ TestCoordinatorConfigManager(
+ ConfigManager configManager,
+ TestDBConnector dbConnector,
+ MetadataStorageTablesConfig tablesConfig,
+ AuditManager auditManager
+ )
+ {
+ super(
+ new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager),
+ dbConnector,
+ tablesConfig
+ );
+ this.delegate = configManager;
+ }
+
+ @Override
+ public ConfigManager.SetResult getAndUpdateCompactionConfig(
+ UnaryOperator<CoordinatorCompactionConfig> operator,
+ AuditInfo auditInfo
+ )
+ {
+ ++numUpdateAttempts;
+ if (configUpdateResult == null) {
+ return super.getAndUpdateCompactionConfig(operator, auditInfo);
+ } else {
+ return configUpdateResult;
+ }
+ }
+ }
+
+ /**
+ * Test implementation for in-memory insert, lookup and compareAndSwap
operations.
+ */
+ private static class TestDBConnector extends TestMetadataStorageConnector
+ {
+ private final Map<List<String>, byte[]> values = new HashMap<>();
+
+ @Override
+ public Void insertOrUpdate(String tableName, String keyColumn, String
valueColumn, String key, byte[] value)
+ {
+ values.put(
+ Arrays.asList(tableName, keyColumn, valueColumn, key),
+ value
+ );
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public byte[] lookup(String tableName, String keyColumn, String
valueColumn, String key)
+ {
+ return values.get(Arrays.asList(tableName, keyColumn, valueColumn, key));
+ }
+
+ @Override
+ public boolean compareAndSwap(List<MetadataCASUpdate> updates)
+ {
+ for (MetadataCASUpdate update : updates) {
+ final List<String> key = Arrays.asList(
+ update.getTableName(),
+ update.getKeyColumn(),
+ update.getValueColumn(),
+ update.getKey()
+ );
+
+ final byte[] currentValue = values.get(key);
+ if (currentValue == null || Arrays.equals(currentValue,
update.getOldValue())) {
+ values.put(key, update.getNewValue());
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+
+ private static class DS
+ {
+ static final String WIKI = "wiki";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]