This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new 466f6dc Support both IndexTuningConfig and ParallelIndexTuningConfig
for compaction task (#9222) (#9237)
466f6dc is described below
commit 466f6dc7d99270787eae146ca331ed073bc929d4
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Jan 21 17:12:16 2020 -0800
Support both IndexTuningConfig and ParallelIndexTuningConfig for compaction
task (#9222) (#9237)
* Support both IndexTuningConfig and ParallelIndexTuningConfig for
compaction task
* tuningConfig module
* fix tests
---
.../guice/IndexingServiceTuningConfigModule.java | 51 ++++
.../druid/indexing/common/task/CompactionTask.java | 57 ++++-
.../druid/indexing/common/task/IndexTask.java | 1 -
.../batch/parallel/ParallelIndexTuningConfig.java | 2 -
.../common/task/ClientCompactQuerySerdeTest.java | 3 +
.../indexing/common/task/CompactionTaskTest.java | 257 +++++++++++++++++++++
.../druid/indexing/common/task/TaskSerdeTest.java | 6 +
.../main/java/org/apache/druid/cli/CliIndexer.java | 2 +
.../org/apache/druid/cli/CliMiddleManager.java | 2 +
.../java/org/apache/druid/cli/CliOverlord.java | 2 +
.../main/java/org/apache/druid/cli/CliPeon.java | 2 +
11 files changed, 377 insertions(+), 8 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java
new file mode 100644
index 0000000..4799f2a
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class IndexingServiceTuningConfigModule implements DruidModule
+{
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new
SimpleModule(IndexingServiceTuningConfigModule.class.getSimpleName())
+ .registerSubtypes(
+ new NamedType(IndexTuningConfig.class, "index"),
+ new NamedType(ParallelIndexTuningConfig.class,
"index_parallel")
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index e85e57c..679e6c5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -51,6 +51,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
@@ -74,6 +75,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
@@ -177,7 +179,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec
dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[]
metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity
segmentGranularity,
- @JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig
tuningConfig,
+ @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
@@ -213,10 +215,10 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
- this.tuningConfig = tuningConfig;
+ this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) :
null;
this.jsonMapper = jsonMapper;
this.segmentProvider = new SegmentProvider(dataSource,
this.ioConfig.getInputSpec());
- this.partitionConfigurationManager = new
PartitionConfigurationManager(tuningConfig);
+ this.partitionConfigurationManager = new
PartitionConfigurationManager(this.tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@@ -227,6 +229,51 @@ public class CompactionTask extends AbstractBatchIndexTask
this.appenderatorsManager = appenderatorsManager;
}
+ @VisibleForTesting
+ static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig)
+ {
+ if (tuningConfig instanceof ParallelIndexTuningConfig) {
+ return (ParallelIndexTuningConfig) tuningConfig;
+ } else if (tuningConfig instanceof IndexTuningConfig) {
+ final IndexTuningConfig indexTuningConfig = (IndexTuningConfig)
tuningConfig;
+ return new ParallelIndexTuningConfig(
+ null,
+ indexTuningConfig.getMaxRowsPerSegment(),
+ indexTuningConfig.getMaxRowsPerSegment(),
+ indexTuningConfig.getMaxBytesInMemory(),
+ indexTuningConfig.getMaxTotalRows(),
+ indexTuningConfig.getNumShards(),
+ null,
+ indexTuningConfig.getPartitionsSpec(),
+ indexTuningConfig.getIndexSpec(),
+ indexTuningConfig.getIndexSpecForIntermediatePersists(),
+ indexTuningConfig.getMaxPendingPersists(),
+ indexTuningConfig.isForceGuaranteedRollup(),
+ indexTuningConfig.isReportParseExceptions(),
+ indexTuningConfig.getPushTimeout(),
+ indexTuningConfig.getSegmentWriteOutMediumFactory(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ indexTuningConfig.isLogParseExceptions(),
+ indexTuningConfig.getMaxParseExceptions(),
+ indexTuningConfig.getMaxSavedParseExceptions()
+ );
+ } else {
+ throw new ISE(
+ "Unknown tuningConfig type: [%s], Must be either [%s] or [%s]",
+ tuningConfig.getClass().getName(),
+ ParallelIndexTuningConfig.class.getName(),
+ IndexTuningConfig.class.getName()
+ );
+ }
+ }
+
@JsonProperty
public CompactionIOConfig getIoConfig()
{
@@ -848,7 +895,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private Granularity segmentGranularity;
@Nullable
- private ParallelIndexTuningConfig tuningConfig;
+ private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
@@ -911,7 +958,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return this;
}
- public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig)
+ public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index d0733d2..4d39df4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1171,7 +1171,6 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
}
}
- @JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig,
AppenderatorConfig
{
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index dbfc73d..feed00f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -35,7 +34,6 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Objects;
-@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
private static final int DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS = 1;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
index a4ac767..a67128e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactQuery;
@@ -36,6 +37,7 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexSpec;
@@ -168,6 +170,7 @@ public class ClientCompactQuerySerdeTest
)
);
objectMapper.setInjectableValues(injectableValues);
+ objectMapper.registerSubtypes(new
NamedType(ParallelIndexTuningConfig.class, "index_parallel"));
return objectMapper;
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c72f5fe..1289944 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -19,7 +19,11 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -44,6 +48,8 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -53,10 +59,12 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import
org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
+import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@@ -97,6 +105,8 @@ import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.RealtimeTuningConfig;
+import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
@@ -118,6 +128,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -262,6 +273,7 @@ public class CompactionTaskTest
objectMapper.registerModule(
new SimpleModule().registerSubtypes(new
NamedType(NumberedShardSpec.class, "NumberedShardSpec"))
);
+ objectMapper.registerModules(new
IndexingServiceTuningConfigModule().getJacksonModules());
return objectMapper;
}
@@ -423,6 +435,125 @@ public class CompactionTaskTest
assertEquals(task, fromJson);
}
+ @Test
+ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne()
throws IOException
+ {
+ final OldCompactionTaskWithAnyTuningConfigType oldTask = new
OldCompactionTaskWithAnyTuningConfigType(
+ null,
+ null,
+ DATA_SOURCE,
+ null,
+ SEGMENTS,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new IndexTuningConfig(
+ null,
+ null, // null to compute maxRowsPerSegment automatically
+ 500000,
+ 1000000L,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ null,
+ null,
+ true,
+ false,
+ 5000L,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ OBJECT_MAPPER,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ null,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
+ );
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ OBJECT_MAPPER,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ null,
+ ROW_INGESTION_METERS_FACTORY,
+ INDEXING_SERVICE_CLIENT,
+ COORDINATOR_CLIENT,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
+ );
+
+ final CompactionTask expectedFromJson = builder
+ .segments(SEGMENTS)
+
.tuningConfig(CompactionTask.getTuningConfig(oldTask.getTuningConfig()))
+ .build();
+
+ final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper)
OBJECT_MAPPER);
+ mapper.registerSubtypes(new
NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"));
+ final byte[] bytes = mapper.writeValueAsBytes(oldTask);
+ final CompactionTask fromJson = mapper.readValue(bytes,
CompactionTask.class);
+ assertEquals(expectedFromJson, fromJson);
+ }
+
+ @Test
+ public void testSerdeWithUnknownTuningConfigThrowingError() throws
IOException
+ {
+ final OldCompactionTaskWithAnyTuningConfigType taskWithUnknownTuningConfig
=
+ new OldCompactionTaskWithAnyTuningConfigType(
+ null,
+ null,
+ DATA_SOURCE,
+ null,
+ SEGMENTS,
+ null,
+ null,
+ null,
+ null,
+ null,
+ RealtimeTuningConfig.makeDefaultTuningConfig(null),
+ null,
+ OBJECT_MAPPER,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ null,
+ ROW_INGESTION_METERS_FACTORY,
+ COORDINATOR_CLIENT,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY,
+ APPENDERATORS_MANAGER
+ );
+
+ final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper)
OBJECT_MAPPER);
+ mapper.registerSubtypes(
+ new NamedType(OldCompactionTaskWithAnyTuningConfigType.class,
"compact"),
+ new NamedType(RealtimeTuningConfig.class, "realtime")
+ );
+ final byte[] bytes = mapper.writeValueAsBytes(taskWithUnknownTuningConfig);
+
+ expectedException.expect(ValueInstantiationException.class);
+
expectedException.expectCause(CoreMatchers.instanceOf(IllegalStateException.class));
+ expectedException.expectMessage(
+ "Unknown tuningConfig type:
[org.apache.druid.segment.indexing.RealtimeTuningConfig]"
+ );
+ mapper.readValue(bytes, CompactionTask.class);
+ }
+
private static void assertEquals(CompactionTask expected, CompactionTask
actual)
{
Assert.assertEquals(expected.getType(), actual.getType());
@@ -1348,4 +1479,130 @@ public class CompactionTaskTest
return null;
}
}
+
+ /**
+ * The compaction task spec in 0.16.0 except for the tuningConfig.
+ * The original spec accepts only {@link IndexTuningConfig}, but this class
acceps any type of tuningConfig for
+ * testing.
+ */
+ private static class OldCompactionTaskWithAnyTuningConfigType extends
AbstractTask
+ {
+ private final Interval interval;
+ private final List<DataSegment> segments;
+ @Nullable
+ private final DimensionsSpec dimensionsSpec;
+ @Nullable
+ private final AggregatorFactory[] metricsSpec;
+ @Nullable
+ private final Granularity segmentGranularity;
+ @Nullable
+ private final Long targetCompactionSizeBytes;
+ @Nullable
+ private final TuningConfig tuningConfig;
+
+ @JsonCreator
+ public OldCompactionTaskWithAnyTuningConfigType(
+ @JsonProperty("id") final String id,
+ @JsonProperty("resource") final TaskResource taskResource,
+ @JsonProperty("dataSource") final String dataSource,
+ @JsonProperty("interval") @Nullable final Interval interval,
+ @JsonProperty("segments") @Nullable final List<DataSegment> segments,
+ @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
+ @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec
dimensionsSpec,
+ @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[]
metricsSpec,
+ @JsonProperty("segmentGranularity") @Nullable final Granularity
segmentGranularity,
+ @JsonProperty("targetCompactionSizeBytes") @Nullable final Long
targetCompactionSizeBytes,
+ @JsonProperty("tuningConfig") @Nullable final TuningConfig
tuningConfig,
+ @JsonProperty("context") @Nullable final Map<String, Object> context,
+ @JacksonInject ObjectMapper jsonMapper,
+ @JacksonInject AuthorizerMapper authorizerMapper,
+ @JacksonInject ChatHandlerProvider chatHandlerProvider,
+ @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+ @JacksonInject CoordinatorClient coordinatorClient,
+ @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+ @JacksonInject RetryPolicyFactory retryPolicyFactory,
+ @JacksonInject AppenderatorsManager appenderatorsManager
+ )
+ {
+ super(getOrMakeId(id, "compact", dataSource), null, taskResource,
dataSource, context);
+ this.interval = interval;
+ this.segments = segments;
+ this.dimensionsSpec = dimensionsSpec;
+ this.metricsSpec = metricsSpec;
+ this.segmentGranularity = segmentGranularity;
+ this.targetCompactionSizeBytes = targetCompactionSizeBytes;
+ this.tuningConfig = tuningConfig;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "compact";
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
+ public List<DataSegment> getSegments()
+ {
+ return segments;
+ }
+
+ @JsonProperty
+ @Nullable
+ public DimensionsSpec getDimensionsSpec()
+ {
+ return dimensionsSpec;
+ }
+
+ @JsonProperty
+ @Nullable
+ public AggregatorFactory[] getMetricsSpec()
+ {
+ return metricsSpec;
+ }
+
+ @JsonProperty
+ @Nullable
+ public Granularity getSegmentGranularity()
+ {
+ return segmentGranularity;
+ }
+
+ @Nullable
+ @JsonProperty
+ public Long getTargetCompactionSizeBytes()
+ {
+ return targetCompactionSizeBytes;
+ }
+
+ @Nullable
+ @JsonProperty
+ public TuningConfig getTuningConfig()
+ {
+ return tuningConfig;
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index c5841ea..b8ae69e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.indexing.ClientKillQuery;
@@ -38,6 +39,7 @@ import
org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -78,6 +80,10 @@ public class TaskSerdeTest
for (final Module jacksonModule : new
FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
+ jsonMapper.registerSubtypes(
+ new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
+ new NamedType(IndexTuningConfig.class, "index")
+ );
}
@Test
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 5ffd2a8..bf955a9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -201,6 +202,7 @@ public class CliIndexer extends ServerRunnable
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTuningConfigModule(),
new QueryablePeonModule(),
new CliIndexerServerModule(properties),
new LookupModule()
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 0eeb548..0d350eb 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -182,6 +183,7 @@ public class CliMiddleManager extends ServerRunnable
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTuningConfigModule(),
new LookupSerdeModule()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index f84a5e1..f63c68e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -43,6 +43,7 @@ import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JacksonConfigProvider;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
@@ -344,6 +345,7 @@ public class CliOverlord extends ServerRunnable
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(),
+ new IndexingServiceTuningConfigModule(),
new SupervisorModule(),
new LookupSerdeModule(),
new SamplerModule()
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1e82c6d..3707b82 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -46,6 +46,7 @@ import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -263,6 +264,7 @@ public class CliPeon extends GuiceRunnable
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule(),
+ new IndexingServiceTuningConfigModule(),
new ChatHandlerServerModule(properties),
new LookupModule()
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]