kfaraz commented on code in PR #16291:
URL: https://github.com/apache/druid/pull/16291#discussion_r1672084948
##########
server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java:
##########
@@ -83,6 +84,7 @@ public void testSerdeBasic() throws IOException
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ Assert.assertEquals(config.getEngine(), fromJson.getEngine());
Review Comment:
There should be another assertion that the engine is actually `native`,
since `null` had been passed to the constructor.
##########
server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java:
##########
@@ -27,117 +27,166 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class ClientCompactionRunnerInfoTest
{
@Test
- public void testHashedPartitionsSpecs()
+ public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format(
+ "Invalid partitionsSpec type[%s] for MSQ engine."
+ + " Type must be either 'dynamic' or 'range'.",
+ HashedPartitionsSpec.class.getSimpleName()
+ ),
+ validationResult.getReason()
+ );
}
@Test
- public void testrInvalidDynamicPartitionsSpecs()
+ public void testMSQEngineWithMaxTotalRowsIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(StringUtils.format(
Review Comment:
Don't use `StringUtils.format`
##########
server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java:
##########
@@ -407,6 +412,115 @@ public void
testAddOrUpdateCompactionConfigWithoutExistingConfig()
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(1,
newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(newConfig,
newConfigCaptor.getValue().getCompactionConfigs().get(0));
+ Assert.assertEquals(newConfig.getEngine(),
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+ }
+
+ @Test
+ public void
testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull()
+ {
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ )
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
+ )
+ ).thenReturn(CoordinatorCompactionConfig.empty());
+ final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
+ final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
+ CoordinatorCompactionConfig.class);
+ Mockito.when(mockJacksonConfigManager.set(
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ oldConfigCaptor.capture(),
+ newConfigCaptor.capture(),
+ ArgumentMatchers.any()
+ )
+ ).thenReturn(ConfigManager.SetResult.ok());
+
+ final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
+ null,
+ null,
+ null,
+ null,
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ Response ignore =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
+ newConfig,
+ mockHttpServletRequest
+ );
+ Assert.assertEquals(null,
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+ }
+
+ @Test
+ public void
testAddOrUpdateCompactionConfigWithInvalidMaxNumTasksForMSQEngine()
+ {
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ )
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
+ )
+ ).thenReturn(CoordinatorCompactionConfig.empty());
+ final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
+ final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
+ CoordinatorCompactionConfig.class);
+ Mockito.when(mockJacksonConfigManager.set(
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ oldConfigCaptor.capture(),
+ newConfigCaptor.capture(),
+ ArgumentMatchers.any()
+ )
+ ).thenReturn(ConfigManager.SetResult.ok());
+
+ int maxNumTasks = 1;
+
+ final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
+ null,
+ null,
+ null,
+ null,
+ CompactionEngine.MSQ,
+ ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks)
+ );
+ Response response =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
+ newConfig,
+ mockHttpServletRequest
+ );
+
Assert.assertEquals(DruidException.Category.INVALID_INPUT.getExpectedStatus(),
response.getStatus());
+ Assert.assertEquals(
+ StringUtils.format(
Review Comment:
No need to format messages in tests, especially when the format args
(`maxNumTasks`) are coming from the test itself. Compare against the full
formatted message itself.
##########
integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java:
##########
@@ -456,7 +466,7 @@ public void
testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1),
null);
Review Comment:
It is better to retain the old signature of `submitCompactionConfig` which
doesn't require an engine rather than having to pass null everywhere.
##########
server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java:
##########
@@ -27,117 +27,166 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class ClientCompactionRunnerInfoTest
{
@Test
- public void testHashedPartitionsSpecs()
+ public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format(
+ "Invalid partitionsSpec type[%s] for MSQ engine."
+ + " Type must be either 'dynamic' or 'range'.",
+ HashedPartitionsSpec.class.getSimpleName()
+ ),
+ validationResult.getReason()
+ );
}
@Test
- public void testrInvalidDynamicPartitionsSpecs()
+ public void testMSQEngineWithMaxTotalRowsIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(StringUtils.format(
+ "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ
engine.",
+ 100
+ ), validationResult.getReason());
}
@Test
- public void testDynamicPartitionsSpecs()
+ public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testDimensionRangePartitionsSpecs()
+ public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DimensionRangePartitionsSpec(100, null,
ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testQueryGranularityAll()
+ public void testMSQEngineWithQueryGranularityAllIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL,
Granularities.ALL, false),
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testRollupFalseWithMetricsSpec()
+ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "rollup in granularitySpec must be set to True if metricsSpec is
specifed for MSQ engine.",
+ validationResult.getReason()
+ );
+ }
+
+ @Test
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+ {
+ final String inputColName = "added";
+ final String outputColName = "sum_added";
+ DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+ new DynamicPartitionsSpec(3, null),
+ Collections.emptyMap(),
+ new UserCompactionTaskGranularityConfig(null, null, null),
+ new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName,
inputColName)}
+ );
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format("Different name[%s] and fieldName(s)[%s] for
aggregator unsupported for MSQ engine.",
Review Comment:
Don't use `StringUtils.format`.
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -640,7 +655,12 @@ public void testRunMultipleCompactionTaskSlots()
final CoordinatorRunStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Compaction.MAX_SLOTS));
- Assert.assertEquals(3, stats.get(Stats.Compaction.SUBMITTED_TASKS));
+ // Native takes up 1 task slot by default whereas MSQ takes up all
available upto 5.
Review Comment:
I am not sure if this comment properly explains what is being asserted next.
We are asserting 3 for native but 1 for msq.
##########
server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java:
##########
@@ -407,6 +412,115 @@ public void
testAddOrUpdateCompactionConfigWithoutExistingConfig()
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(1,
newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(newConfig,
newConfigCaptor.getValue().getCompactionConfigs().get(0));
+ Assert.assertEquals(newConfig.getEngine(),
newConfigCaptor.getValue().getCompactionConfigs().get(0).getEngine());
+ }
+
+ @Test
+ public void
testAddOrUpdateCompactionConfigWithoutExistingConfigAndEngineAsNull()
+ {
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ )
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
+ )
+ ).thenReturn(CoordinatorCompactionConfig.empty());
+ final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
+ final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(
+ CoordinatorCompactionConfig.class);
+ Mockito.when(mockJacksonConfigManager.set(
+
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ oldConfigCaptor.capture(),
+ newConfigCaptor.capture(),
+ ArgumentMatchers.any()
+ )
+ ).thenReturn(ConfigManager.SetResult.ok());
+
+ final DataSourceCompactionConfig newConfig = new
DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
+ null,
+ null,
+ null,
+ null,
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ Response ignore =
coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
Review Comment:
Please remove the `ignore` variable if you don't intend to use it.
##########
server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java:
##########
@@ -27,117 +27,166 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class ClientCompactionRunnerInfoTest
{
@Test
- public void testHashedPartitionsSpecs()
+ public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format(
+ "Invalid partitionsSpec type[%s] for MSQ engine."
+ + " Type must be either 'dynamic' or 'range'.",
+ HashedPartitionsSpec.class.getSimpleName()
+ ),
+ validationResult.getReason()
+ );
}
@Test
- public void testrInvalidDynamicPartitionsSpecs()
+ public void testMSQEngineWithMaxTotalRowsIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(100, 100L),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(StringUtils.format(
+ "maxTotalRows[%d] in DynamicPartitionsSpec not supported for MSQ
engine.",
+ 100
+ ), validationResult.getReason());
}
@Test
- public void testDynamicPartitionsSpecs()
+ public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(100, null),
Collections.emptyMap(),
null,
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testDimensionRangePartitionsSpecs()
+ public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DimensionRangePartitionsSpec(100, null,
ImmutableList.of("partitionDim"), false),
Collections.emptyMap(),
null,
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testQueryGranularityAll()
+ public void testMSQEngineWithQueryGranularityAllIsValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(Granularities.ALL,
Granularities.ALL, false),
null
);
-
assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
+
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
.isValid());
}
@Test
- public void testRollupFalseWithMetricsSpec()
+ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInValid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new DynamicPartitionsSpec(3, null),
Collections.emptyMap(),
new UserCompactionTaskGranularityConfig(null, null, false),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "rollup in granularitySpec must be set to True if metricsSpec is
specifed for MSQ engine.",
+ validationResult.getReason()
+ );
+ }
+
+ @Test
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
Review Comment:
What exactly is the unsupported metrics spec? Either add a comment about it
or rename the test method to indicate that.
```suggestion
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
```
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -91,9 +90,10 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
status -> null != status &&
COMPACTION_TASK_TYPE.equals(status.getType());
- // An artificial limit imposed just to avoid taking up too many compaction
task slots for a single MSQ compaction
- // task. Can be updated if needed.
- private static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 8;
+ /**
+ * Limit to ensure that an MSQ compaction task doesn't take up all task
slots in a cluster.
+ */
+ static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 5;
Review Comment:
This constant should be moved to `ClientMSQContext`.
##########
server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java:
##########
@@ -27,117 +27,166 @@
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class ClientCompactionRunnerInfoTest
{
@Test
- public void testHashedPartitionsSpecs()
+ public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createCompactionConfig(
new HashedPartitionsSpec(100, null, null),
Collections.emptyMap(),
null,
null
);
-
assertFalse(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
CompactionEngine.NATIVE)
- .isValid());
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ StringUtils.format(
Review Comment:
Don't use `StringUtils.format`, type out the full expected message.
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -1918,10 +1956,13 @@ private void addMoreData(String dataSource, int day)
private List<DataSourceCompactionConfig> createCompactionConfigs()
{
- return createCompactionConfigs(null);
+ return createCompactionConfigs(null, null);
}
- private List<DataSourceCompactionConfig> createCompactionConfigs(@Nullable
Integer maxNumConcurrentSubTasks)
+ private List<DataSourceCompactionConfig> createCompactionConfigs(
Review Comment:
Instead of doing this, have two separate methods:
`createCompactionConfigsForNative`, `createCompactionConfigsForMsq`
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java:
##########
@@ -130,50 +132,63 @@ public class CompactSegmentsTest
private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
private static final NewestSegmentFirstPolicy SEARCH_POLICY = new
NewestSegmentFirstPolicy(JSON_MAPPER);
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "scenario={0}, engine={2}")
public static Collection<Object[]> constructorFeeder()
{
final MutableInt nextRangePartitionBoundary = new MutableInt(0);
+
+ final DynamicPartitionsSpec dynamicPartitionsSpec = new
DynamicPartitionsSpec(300000, Long.MAX_VALUE);
+ final BiFunction<Integer, Integer, ShardSpec> numberedShardSpecCreator =
NumberedShardSpec::new;
+
+ final HashedPartitionsSpec hashedPartitionsSpec = new
HashedPartitionsSpec(null, 2, ImmutableList.of("dim"));
+ final BiFunction<Integer, Integer, ShardSpec>
hashBasedNumberedShardSpecCreator =
+ (bucketId, numBuckets) -> new HashBasedNumberedShardSpec(
+ bucketId,
+ numBuckets,
+ bucketId,
+ numBuckets,
+ ImmutableList.of("dim"),
+ null,
+ JSON_MAPPER
+ );
+
+ final SingleDimensionPartitionsSpec singleDimensionPartitionsSpec =
+ new SingleDimensionPartitionsSpec(300000, null, "dim", false);
+ final BiFunction<Integer, Integer, ShardSpec>
singleDimensionShardSpeCreator =
Review Comment:
```suggestion
final BiFunction<Integer, Integer, ShardSpec>
singleDimensionShardSpecCreator =
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]