kfaraz commented on code in PR #17033:
URL: https://github.com/apache/druid/pull/17033#discussion_r1770732466
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -55,6 +56,13 @@ public void start()
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
+ } else if (!supervisorSpec.getValidationResult().isValid()) {
+ log.error(
+ "Failed to start compaction supervisor for datasource[%s] due to
invalid compaction supervisor spec. "
Review Comment:
```suggestion
"Cannot start compaction supervisor for datasource[%s] since the
compaction supervisor spec is invalid. "
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -76,6 +84,11 @@ public SupervisorReport<AutoCompactionSnapshot> getStatus()
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build();
+ } else if (!supervisorSpec.getValidationResult().isValid()) {
+ throw InvalidInput.exception(
+ "Compaction supervisor spec is invalid. Reason[%s].",
+ supervisorSpec.getValidationResult().getReason()
+ );
Review Comment:
Let's not throw an exception here. We can either add new fields to
`AutoCompactionSnapshot` or add a new class that is returned by this method.
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -55,6 +56,13 @@ public void start()
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
+ } else if (!supervisorSpec.getValidationResult().isValid()) {
+ log.error(
Review Comment:
```suggestion
log.warn(
```
##########
server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java:
##########
@@ -161,12 +135,11 @@ public Response addOrUpdateDatasourceCompactionConfig(
)
{
UnaryOperator<DruidCompactionConfig> callable = current -> {
- CompactionConfigValidationResult validationResult =
- ClientCompactionRunnerInfo.validateCompactionConfig(newConfig,
current.getEngine());
- if (validationResult.isValid()) {
- return current.withDatasourceConfig(newConfig);
+ if (newConfig.getEngine() == CompactionEngine.MSQ) {
+ throw InvalidInput.exception(
+ "MSQ engine in compaction config only supported with
supervisor-based compaction on the Overlord.");
Review Comment:
```suggestion
"MSQ-based compaction is supported only with Compaction
Supervisors.");
```
##########
server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java:
##########
@@ -33,10 +34,12 @@
*/
public class CompactionSupervisorConfig
{
- private static final CompactionSupervisorConfig DEFAULT = new
CompactionSupervisorConfig(null);
+ private static final CompactionSupervisorConfig DEFAULT = new
CompactionSupervisorConfig(null, null);
@JsonProperty
private final boolean enabled;
+ @JsonProperty
+ private final CompactionEngine defaultEngine;
Review Comment:
Nit: Maybe we should call this property just `engine`.
##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java:
##########
@@ -146,10 +148,23 @@ private void initScheduler()
);
}
+ @Test
+ public void testCompactionSupervisorConfigSerde() throws
JsonProcessingException
+ {
+ boolean enabled = true;
+ CompactionEngine defaultEngine = CompactionEngine.MSQ;
Review Comment:
Nit: Can be final.
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java:
##########
@@ -77,6 +73,11 @@ public String getId()
return ID_PREFIX + spec.getDataSource();
}
+ public CompactionConfigValidationResult getValidationResult()
Review Comment:
Please confirm that this doesn't get serialized out. If it does, let's add a
`@JsonIgnore`.
##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java:
##########
@@ -135,9 +135,8 @@ public static CompactionConfigValidationResult
validatePartitionsSpecForMSQ(Part
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
return CompactionConfigValidationResult.failure(
- "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or
'range'",
+ "MSQ: Invalid partitioning type[%s]. Must be either dynamic or
range",
Review Comment:
I think having the quotes was better. You can even use square brackets
instead of quotes.
##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java:
##########
@@ -82,20 +82,32 @@ public void testSerdeOfSuspendedSpec()
}
@Test
- public void testInvalidSpecThrowsException()
+ public void testSupervisorWithInvalidSpecThrowsExceptionForStatus()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
final DruidException exception = Assert.assertThrows(
DruidException.class,
- () -> new CompactionSupervisorSpec(null, false, scheduler)
+ () -> new CompactionSupervisorSpec(
+ new
DataSourceCompactionConfig.Builder().forDataSource("datasource").build(),
+ false,
+ scheduler
+ ).createSupervisor().getStatus()
);
Assert.assertEquals(
- "Compaction supervisor 'spec' is invalid. Reason[bad spec].",
+ "Compaction supervisor spec is invalid. Reason[bad spec].",
exception.getMessage()
);
}
+ @Test
+ public void testInvalidSpecReturnsInvalid()
+ {
+ Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
+ .thenReturn(CompactionConfigValidationResult.failure("bad spec"));
+ Assert.assertFalse(new CompactionSupervisorSpec(null, false,
scheduler).getValidationResult().isValid());
Review Comment:
Rather than just checking `isValid()`, verify the whole validation result.
##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java:
##########
@@ -82,20 +82,32 @@ public void testSerdeOfSuspendedSpec()
}
@Test
- public void testInvalidSpecThrowsException()
+ public void testSupervisorWithInvalidSpecThrowsExceptionForStatus()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
final DruidException exception = Assert.assertThrows(
DruidException.class,
- () -> new CompactionSupervisorSpec(null, false, scheduler)
+ () -> new CompactionSupervisorSpec(
+ new
DataSourceCompactionConfig.Builder().forDataSource("datasource").build(),
+ false,
+ scheduler
+ ).createSupervisor().getStatus()
);
Assert.assertEquals(
- "Compaction supervisor 'spec' is invalid. Reason[bad spec].",
+ "Compaction supervisor spec is invalid. Reason[bad spec].",
exception.getMessage()
);
}
+ @Test
+ public void testInvalidSpecReturnsInvalid()
Review Comment:
```suggestion
public void testGetValidationResultForInvalidSpec()
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java:
##########
@@ -82,20 +82,32 @@ public void testSerdeOfSuspendedSpec()
}
@Test
- public void testInvalidSpecThrowsException()
+ public void testSupervisorWithInvalidSpecThrowsExceptionForStatus()
Review Comment:
This test might have to change a little as we shouldn't throw an exception
while getting state/status even if the spec is invalid.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]