This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 122caec Add support targetCompactionSizeBytes for compactionTask
(#6203)
122caec is described below
commit 122caec7b187820a0a7fc89e85eb8216dd57df21
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Sep 28 11:16:35 2018 -0700
Add support targetCompactionSizeBytes for compactionTask (#6203)
* Add support targetCompactionSizeBytes for compactionTask
* fix test
* fix a bug in keepSegmentGranularity
* fix wrong noinspection comment
* address comments
---
.../druid/timeline/partition/PartitionHolder.java | 7 +
.../druid/indexing/common/task/CompactionTask.java | 205 +++++++++--
.../druid/indexing/common/task/IndexTask.java | 182 +++++++---
.../task/batch/parallel/ParallelIndexSubTask.java | 20 +-
.../indexing/common/task/CompactionTaskTest.java | 387 +++++++++++++++++++--
.../druid/indexing/common/task/TaskSerdeTest.java | 23 +-
.../coordinator/DataSourceCompactionConfig.java | 3 +-
7 files changed, 693 insertions(+), 134 deletions(-)
diff --git
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 1ed0ae6..5e5f676 100644
---
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
+import java.util.Spliterator;
import java.util.TreeSet;
/**
@@ -130,6 +131,12 @@ public class PartitionHolder<T> implements
Iterable<PartitionChunk<T>>
return holderSet.iterator();
}
+ @Override
+ public Spliterator<PartitionChunk<T>> spliterator()
+ {
+ return holderSet.spliterator();
+ }
+
public Iterable<T> payloads()
{
return Iterables.transform(this, PartitionChunk::getObject);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index ea1186a..cce1a74 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -68,6 +68,7 @@ import
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@@ -86,6 +87,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -101,13 +103,15 @@ public class CompactionTask extends AbstractTask
private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec;
private final boolean keepSegmentGranularity;
+ @Nullable
+ private final Long targetCompactionSizeBytes;
+ @Nullable
private final IndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
-
@JsonIgnore
- private List<IndexTask> indexTaskSpecs;
+ private final PartitionConfigurationManager partitionConfigurationManager;
@JsonIgnore
private final AuthorizerMapper authorizerMapper;
@@ -118,6 +122,9 @@ public class CompactionTask extends AbstractTask
@JsonIgnore
private final RowIngestionMetersFactory rowIngestionMetersFactory;
+ @JsonIgnore
+ private List<IndexTask> indexTaskSpecs;
+
@JsonCreator
public CompactionTask(
@JsonProperty("id") final String id,
@@ -127,6 +134,7 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec
dimensionsSpec,
@Nullable @JsonProperty("keepSegmentGranularity") final Boolean
keepSegmentGranularity,
+ @Nullable @JsonProperty("targetCompactionSizeBytes") final Long
targetCompactionSizeBytes,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig
tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@@ -149,9 +157,11 @@ public class CompactionTask extends AbstractTask
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
: keepSegmentGranularity;
+ this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource,
interval) : new SegmentProvider(segments);
+ this.partitionConfigurationManager = new
PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@@ -181,6 +191,14 @@ public class CompactionTask extends AbstractTask
return keepSegmentGranularity;
}
+ @Nullable
+ @JsonProperty
+ public Long getTargetCompactionSizeBytes()
+ {
+ return targetCompactionSizeBytes;
+ }
+
+ @Nullable
@JsonProperty
public IndexTuningConfig getTuningConfig()
{
@@ -220,9 +238,9 @@ public class CompactionTask extends AbstractTask
indexTaskSpecs = createIngestionSchema(
toolbox,
segmentProvider,
+ partitionConfigurationManager,
dimensionsSpec,
keepSegmentGranularity,
- tuningConfig,
jsonMapper
).stream()
.map(spec -> new IndexTask(
@@ -271,12 +289,12 @@ public class CompactionTask extends AbstractTask
*/
@VisibleForTesting
static List<IndexIngestionSpec> createIngestionSchema(
- TaskToolbox toolbox,
- SegmentProvider segmentProvider,
- DimensionsSpec dimensionsSpec,
- boolean keepSegmentGranularity,
- IndexTuningConfig tuningConfig,
- ObjectMapper jsonMapper
+ final TaskToolbox toolbox,
+ final SegmentProvider segmentProvider,
+ final PartitionConfigurationManager partitionConfigurationManager,
+ final DimensionsSpec dimensionsSpec,
+ final boolean keepSegmentGranularity,
+ final ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException
{
Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String,
DataSegment>>> pair = prepareSegments(
@@ -290,26 +308,52 @@ public class CompactionTask extends AbstractTask
return Collections.emptyList();
}
+ // find metadata for interval
+ final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments =
loadSegments(
+ timelineSegments,
+ segmentFileMap,
+ toolbox.getIndexIO()
+ );
+
+ final IndexTuningConfig compactionTuningConfig =
partitionConfigurationManager.computeTuningConfig(
+ queryableIndexAndSegments
+ );
+
if (keepSegmentGranularity) {
- // if keepSegmentGranularity = true, create indexIngestionSpec per
segment interval, so that we can run an index
+ // If keepSegmentGranularity = true, create indexIngestionSpec per
segment interval, so that we can run an index
// task per segment interval.
- final List<IndexIngestionSpec> specs = new
ArrayList<>(timelineSegments.size());
- for (TimelineObjectHolder<String, DataSegment> holder :
timelineSegments) {
+
+ //noinspection unchecked,ConstantConditions
+ final Map<Interval, List<Pair<QueryableIndex, DataSegment>>>
intervalToSegments = queryableIndexAndSegments
+ .stream()
+ .collect(
+ Collectors.toMap(
+ // rhs can't be null here so we skip null checking and
supress the warning with the above comment
+ p -> p.rhs.getInterval(),
+ Lists::newArrayList,
+ (l1, l2) -> {
+ l1.addAll(l2);
+ return l1;
+ }
+ )
+ );
+ final List<IndexIngestionSpec> specs = new
ArrayList<>(intervalToSegments.size());
+ for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry :
intervalToSegments.entrySet()) {
+ final Interval interval = entry.getKey();
+ final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact =
entry.getValue();
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
- holder.getInterval(),
- Collections.singletonList(holder),
+ interval,
+ segmentsToCompact,
dimensionsSpec,
- toolbox.getIndexIO(),
- jsonMapper,
- segmentFileMap
+ jsonMapper
);
specs.add(
new IndexIngestionSpec(
dataSchema,
- createIoConfig(toolbox, dataSchema, holder.getInterval()),
- tuningConfig
+ createIoConfig(toolbox, dataSchema, interval),
+ compactionTuningConfig
)
);
}
@@ -319,18 +363,16 @@ public class CompactionTask extends AbstractTask
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
- timelineSegments,
+ queryableIndexAndSegments,
dimensionsSpec,
- toolbox.getIndexIO(),
- jsonMapper,
- segmentFileMap
+ jsonMapper
);
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
- tuningConfig
+ compactionTuningConfig
)
);
}
@@ -368,21 +410,11 @@ public class CompactionTask extends AbstractTask
private static DataSchema createDataSchema(
String dataSource,
Interval totalInterval,
- List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
+ List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
DimensionsSpec dimensionsSpec,
- IndexIO indexIO,
- ObjectMapper jsonMapper,
- Map<DataSegment, File> segmentFileMap
+ ObjectMapper jsonMapper
)
- throws IOException
{
- // find metadata for interval
- final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments =
loadSegments(
- timelineObjectHolder,
- segmentFileMap,
- indexIO
- );
-
// find merged aggregators
for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
@@ -621,4 +653,105 @@ public class CompactionTask extends AbstractTask
return usedSegments;
}
}
+
+ @VisibleForTesting
+ static class PartitionConfigurationManager
+ {
+ @Nullable
+ private final Long targetCompactionSizeBytes;
+ @Nullable
+ private final IndexTuningConfig tuningConfig;
+
+ PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes,
@Nullable IndexTuningConfig tuningConfig)
+ {
+ this.targetCompactionSizeBytes =
getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig);
+ this.tuningConfig = tuningConfig;
+ }
+
+ @Nullable
+ IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex,
DataSegment>> queryableIndexAndSegments)
+ {
+ if (!hasPartitionConfig(tuningConfig)) {
+ final long nonNullTargetCompactionSizeBytes =
Preconditions.checkNotNull(
+ targetCompactionSizeBytes,
+ "targetCompactionSizeBytes"
+ );
+ // Find IndexTuningConfig.targetPartitionSize which is the number of
rows per segment.
+ // Assume that the segment size is proportional to the number of rows.
We can improve this later.
+ final long totalNumRows = queryableIndexAndSegments
+ .stream()
+ .mapToLong(queryableIndexAndDataSegment ->
queryableIndexAndDataSegment.lhs.getNumRows())
+ .sum();
+ final long totalSizeBytes = queryableIndexAndSegments
+ .stream()
+ .mapToLong(queryableIndexAndDataSegment ->
queryableIndexAndDataSegment.rhs.getSize())
+ .sum();
+
+ if (totalSizeBytes == 0L) {
+ throw new ISE("Total input segment size is 0 byte");
+ }
+
+ final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
+ final int targetPartitionSize =
Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
+ Preconditions.checkState(targetPartitionSize > 0, "Negative
targetPartitionSize[%s]", targetPartitionSize);
+
+ log.info(
+ "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] *
targetCompactionSizeBytes[%d]",
+ targetPartitionSize,
+ avgRowsPerByte,
+ nonNullTargetCompactionSizeBytes
+ );
+ return (tuningConfig == null ? IndexTuningConfig.createDefault() :
tuningConfig)
+ .withTargetPartitionSize(targetPartitionSize);
+ } else {
+ return tuningConfig;
+ }
+ }
+
+ /**
+ * Check the validity of {@link #targetCompactionSizeBytes} and return a
valid value. Note that
+ * targetCompactionSizeBytes cannot be used with {@link
IndexTuningConfig#targetPartitionSize},
+ * {@link IndexTuningConfig#maxTotalRows}, or {@link
IndexTuningConfig#numShards} together.
+ * {@link #hasPartitionConfig} checks one of those configs is set.
+ *
+ * This throws an {@link IllegalArgumentException} if
targetCompactionSizeBytes is set and hasPartitionConfig
+ * returns true. If targetCompactionSizeBytes is not set, this returns
null or
+ * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES}
according to the result of
+ * hasPartitionConfig.
+ */
+ @Nullable
+ private static Long getValidTargetCompactionSizeBytes(
+ @Nullable Long targetCompactionSizeBytes,
+ @Nullable IndexTuningConfig tuningConfig
+ )
+ {
+ if (targetCompactionSizeBytes != null) {
+ Preconditions.checkArgument(
+ !hasPartitionConfig(tuningConfig),
+ "targetCompactionSizeBytes[%s] cannot be used with
targetPartitionSize[%s], maxTotalRows[%s],"
+ + " or numShards[%s] of tuningConfig",
+ targetCompactionSizeBytes,
+ tuningConfig == null ? null :
tuningConfig.getTargetPartitionSize(),
+ tuningConfig == null ? null : tuningConfig.getMaxTotalRows(),
+ tuningConfig == null ? null : tuningConfig.getNumShards()
+ );
+ return targetCompactionSizeBytes;
+ } else {
+ return hasPartitionConfig(tuningConfig)
+ ? null
+ :
DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+ }
+ }
+
+ private static boolean hasPartitionConfig(@Nullable IndexTuningConfig
tuningConfig)
+ {
+ if (tuningConfig != null) {
+ return tuningConfig.getTargetPartitionSize() != null
+ || tuningConfig.getMaxTotalRows() != null
+ || tuningConfig.getNumShards() != null;
+ } else {
+ return false;
+ }
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5d6bef1..f00cd0a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -427,7 +427,12 @@ public class IndexTask extends AbstractTask implements
ChatHandler
FileUtils.forceMkdir(firehoseTempDir);
ingestionState = IngestionState.DETERMINE_PARTITIONS;
- final ShardSpecs shardSpecs = determineShardSpecs(toolbox,
firehoseFactory, firehoseTempDir);
+
+ // Initialize maxRowsPerSegment and maxTotalRows lazily
+ final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
+ @Nullable final Integer targetPartitionSize =
getValidTargetPartitionSize(tuningConfig);
+ @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
+ final ShardSpecs shardSpecs = determineShardSpecs(toolbox,
firehoseFactory, firehoseTempDir, targetPartitionSize);
final DataSchema dataSchema;
final Map<Interval, String> versions;
if (determineIntervals) {
@@ -457,7 +462,16 @@ public class IndexTask extends AbstractTask implements
ChatHandler
}
ingestionState = IngestionState.BUILD_SEGMENTS;
- return generateAndPublishSegments(toolbox, dataSchema, shardSpecs,
versions, firehoseFactory, firehoseTempDir);
+ return generateAndPublishSegments(
+ toolbox,
+ dataSchema,
+ shardSpecs,
+ versions,
+ firehoseFactory,
+ firehoseTempDir,
+ targetPartitionSize,
+ maxTotalRows
+ );
}
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
@@ -583,7 +597,8 @@ public class IndexTask extends AbstractTask implements
ChatHandler
private ShardSpecs determineShardSpecs(
final TaskToolbox toolbox,
final FirehoseFactory firehoseFactory,
- final File firehoseTempDir
+ final File firehoseTempDir,
+ @Nullable final Integer targetPartitionSize
) throws IOException
{
final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@@ -618,7 +633,8 @@ public class IndexTask extends AbstractTask implements
ChatHandler
granularitySpec,
tuningConfig,
determineIntervals,
- determineNumPartitions
+ determineNumPartitions,
+ targetPartitionSize
);
}
}
@@ -666,7 +682,8 @@ public class IndexTask extends AbstractTask implements
ChatHandler
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean determineIntervals,
- boolean determineNumPartitions
+ boolean determineNumPartitions,
+ @Nullable Integer targetPartitionSize
) throws IOException
{
log.info("Determining intervals and shardSpecs");
@@ -690,8 +707,10 @@ public class IndexTask extends AbstractTask implements
ChatHandler
final int numShards;
if (determineNumPartitions) {
- final long numRows = collector.estimateCardinalityRound();
- numShards = (int) Math.ceil((double) numRows /
tuningConfig.getTargetPartitionSize());
+ final long numRows = Preconditions.checkNotNull(collector, "HLL
collector").estimateCardinalityRound();
+ numShards = (int) Math.ceil(
+ (double) numRows / Preconditions.checkNotNull(targetPartitionSize,
"targetPartitionSize")
+ );
log.info("Estimated [%,d] rows of data for interval [%s], creating
[%,d] shards", numRows, interval, numShards);
} else {
numShards = defaultNumShards;
@@ -856,7 +875,9 @@ public class IndexTask extends AbstractTask implements
ChatHandler
final ShardSpecs shardSpecs,
final Map<Interval, String> versions,
final FirehoseFactory firehoseFactory,
- final File firehoseTempDir
+ final File firehoseTempDir,
+ @Nullable final Integer targetPartitionSize,
+ @Nullable final Long maxTotalRows
) throws IOException, InterruptedException
{
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
@@ -1004,8 +1025,8 @@ public class IndexTask extends AbstractTask implements
ChatHandler
if (addResult.isOk()) {
// incremental segment publishment is allowed only when rollup
don't have to be perfect.
if (!isGuaranteedRollup &&
- (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(),
tuningConfig) ||
-
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(),
tuningConfig))) {
+ (exceedMaxRowsInSegment(targetPartitionSize,
addResult.getNumRowsInSegment()) ||
+ exceedMaxRowsInAppenderator(maxTotalRows,
addResult.getTotalNumRowsInAppenderator()))) {
// There can be some segments waiting for being published even
though any rows won't be added to them.
// If those segments are not published here, the available space
in appenderator will be kept to be small
// which makes the size of segments smaller.
@@ -1069,6 +1090,40 @@ public class IndexTask extends AbstractTask implements
ChatHandler
}
}
+ /**
+ * Return the valid target partition size. If {@link
IndexTuningConfig#numShards} is valid, this returns null.
+ * Otherwise, this returns {@link
IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given
+ * {@link IndexTuningConfig#targetPartitionSize}.
+ */
+ public static Integer getValidTargetPartitionSize(IndexTuningConfig
tuningConfig)
+ {
+ @Nullable final Integer numShards = tuningConfig.numShards;
+ @Nullable final Integer targetPartitionSize =
tuningConfig.targetPartitionSize;
+ if (numShards == null || numShards == -1) {
+ return targetPartitionSize == null || targetPartitionSize.equals(-1)
+ ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE
+ : targetPartitionSize;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Return the valid target partition size. If {@link
IndexTuningConfig#numShards} is valid, this returns null.
+ * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS}
or the given
+ * {@link IndexTuningConfig#maxTotalRows}.
+ */
+ public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig)
+ {
+ @Nullable final Integer numShards = tuningConfig.numShards;
+ @Nullable final Long maxTotalRows = tuningConfig.maxTotalRows;
+ if (numShards == null || numShards == -1) {
+ return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS :
maxTotalRows;
+ } else {
+ return null;
+ }
+ }
+
private void handleParseException(ParseException e)
{
if (e.isFromPartiallyValidRow()) {
@@ -1092,17 +1147,20 @@ public class IndexTask extends AbstractTask implements
ChatHandler
}
}
- private static boolean exceedMaxRowsInSegment(int numRowsInSegment,
IndexTuningConfig indexTuningConfig)
+ private static boolean exceedMaxRowsInSegment(
+ @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if
numShards is set in indexTuningConfig
+ int numRowsInSegment
+ )
{
- // maxRowsInSegment should be null if numShards is set in indexTuningConfig
- final Integer maxRowsInSegment =
indexTuningConfig.getTargetPartitionSize();
return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
}
- private static boolean exceedMaxRowsInAppenderator(long
numRowsInAppenderator, IndexTuningConfig indexTuningConfig)
+ private static boolean exceedMaxRowsInAppenderator(
+ // maxRowsInAppenderator can be null if numShards is set in
indexTuningConfig
+ @Nullable final Long maxRowsInAppenderator,
+ long numRowsInAppenderator
+ )
{
- // maxRowsInAppenderator should be null if numShards is set in
indexTuningConfig
- final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
return maxRowsInAppenderator != null && maxRowsInAppenderator <=
numRowsInAppenderator;
}
@@ -1271,7 +1329,9 @@ public class IndexTask extends AbstractTask implements
ChatHandler
@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig,
AppenderatorConfig
{
- private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+ static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000;
+ static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
@@ -1279,12 +1339,13 @@ public class IndexTask extends AbstractTask implements
ChatHandler
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUSH_TIMEOUT = 0;
- static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
-
+ @Nullable
private final Integer targetPartitionSize;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
+ @Nullable
private final Long maxTotalRows;
+ @Nullable
private final Integer numShards;
private final IndexSpec indexSpec;
private final File basePersistDirectory;
@@ -1312,6 +1373,11 @@ public class IndexTask extends AbstractTask implements
ChatHandler
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+ public static IndexTuningConfig createDefault()
+ {
+ return new IndexTuningConfig();
+ }
+
@JsonCreator
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Nullable Integer
targetPartitionSize,
@@ -1385,12 +1451,14 @@ public class IndexTask extends AbstractTask implements
ChatHandler
"targetPartitionSize and numShards cannot both be set"
);
- this.targetPartitionSize = initializeTargetPartitionSize(numShards,
targetPartitionSize);
+ this.targetPartitionSize = (targetPartitionSize != null &&
targetPartitionSize == -1)
+ ? null
+ : targetPartitionSize;
this.maxRowsInMemory = maxRowsInMemory == null ?
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
- this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows);
+ this.maxTotalRows = maxTotalRows;
this.numShards = numShards == null || numShards.equals(-1) ? null :
numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ?
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
@@ -1422,27 +1490,29 @@ public class IndexTask extends AbstractTask implements
ChatHandler
: logParseExceptions;
}
- private static Integer initializeTargetPartitionSize(Integer numShards,
Integer targetPartitionSize)
- {
- if (numShards == null || numShards == -1) {
- return targetPartitionSize == null || targetPartitionSize.equals(-1)
- ? DEFAULT_TARGET_PARTITION_SIZE
- : targetPartitionSize;
- } else {
- return null;
- }
- }
-
- private static Long initializeMaxTotalRows(Integer numShards, Long
maxTotalRows)
+ public IndexTuningConfig withBasePersistDirectory(File dir)
{
- if (numShards == null || numShards == -1) {
- return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
- } else {
- return null;
- }
+ return new IndexTuningConfig(
+ targetPartitionSize,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ maxTotalRows,
+ numShards,
+ indexSpec,
+ maxPendingPersists,
+ forceExtendableShardSpecs,
+ forceGuaranteedRollup,
+ reportParseExceptions,
+ pushTimeout,
+ dir,
+ segmentWriteOutMediumFactory,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
}
- public IndexTuningConfig withBasePersistDirectory(File dir)
+ public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize)
{
return new IndexTuningConfig(
targetPartitionSize,
@@ -1456,7 +1526,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
forceGuaranteedRollup,
reportParseExceptions,
pushTimeout,
- dir,
+ basePersistDirectory,
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
@@ -1464,6 +1534,11 @@ public class IndexTask extends AbstractTask implements
ChatHandler
);
}
+ /**
+ * Return the target number of rows per segment. This returns null if it's
not specified in tuningConfig.
+ * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get
the valid value.
+ */
+ @Nullable
@JsonProperty
public Integer getTargetPartitionSize()
{
@@ -1484,6 +1559,10 @@ public class IndexTask extends AbstractTask implements
ChatHandler
return maxBytesInMemory;
}
+ /**
+ * Return the max number of total rows in appenderator. This returns null
if it's not specified in tuningConfig.
+ * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the
valid value.
+ */
@JsonProperty
@Override
@Nullable
@@ -1633,5 +1712,28 @@ public class IndexTask extends AbstractTask implements
ChatHandler
maxSavedParseExceptions
);
}
+
+ @Override
+ public String toString()
+ {
+ return "IndexTuningConfig{" +
+ "targetPartitionSize=" + targetPartitionSize +
+ ", maxRowsInMemory=" + maxRowsInMemory +
+ ", maxBytesInMemory=" + maxBytesInMemory +
+ ", maxTotalRows=" + maxTotalRows +
+ ", numShards=" + numShards +
+ ", indexSpec=" + indexSpec +
+ ", basePersistDirectory=" + basePersistDirectory +
+ ", maxPendingPersists=" + maxPendingPersists +
+ ", forceExtendableShardSpecs=" + forceExtendableShardSpecs +
+ ", forceGuaranteedRollup=" + forceGuaranteedRollup +
+ ", reportParseExceptions=" + reportParseExceptions +
+ ", pushTimeout=" + pushTimeout +
+ ", logParseExceptions=" + logParseExceptions +
+ ", maxParseExceptions=" + maxParseExceptions +
+ ", maxSavedParseExceptions=" + maxSavedParseExceptions +
+ ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
+ '}';
+ }
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 46cf03e..20871af 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -303,7 +303,10 @@ public class ParallelIndexSubTask extends AbstractTask
);
}
+ // Initialize maxRowsPerSegment and maxTotalRows lazily
final ParallelIndexTuningConfig tuningConfig =
ingestionSchema.getTuningConfig();
+ @Nullable final Integer targetPartitionSize =
IndexTask.getValidTargetPartitionSize(tuningConfig);
+ @Nullable final Long maxTotalRows =
IndexTask.getValidMaxTotalRows(tuningConfig);
final long pushTimeout = tuningConfig.getPushTimeout();
final boolean explicitIntervals =
granularitySpec.bucketIntervals().isPresent();
final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox,
taskClient, ingestionSchema);
@@ -348,8 +351,8 @@ public class ParallelIndexSubTask extends AbstractTask
final AppenderatorDriverAddResult addResult = driver.add(inputRow,
sequenceName);
if (addResult.isOk()) {
- if (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(),
tuningConfig) ||
-
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(),
tuningConfig)) {
+ if (exceedMaxRowsInSegment(targetPartitionSize,
addResult.getNumRowsInSegment()) ||
+ exceedMaxRowsInAppenderator(maxTotalRows,
addResult.getTotalNumRowsInAppenderator())) {
// There can be some segments waiting for being published even
though any rows won't be added to them.
// If those segments are not published here, the available space
in appenderator will be kept to be small
// which makes the size of segments smaller.
@@ -384,22 +387,19 @@ public class ParallelIndexSubTask extends AbstractTask
}
private static boolean exceedMaxRowsInSegment(
- int numRowsInSegment,
- ParallelIndexTuningConfig indexTuningConfig
+ @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if
numShards is set in indexTuningConfig
+ int numRowsInSegment
)
{
- // maxRowsInSegment should be null if numShards is set in indexTuningConfig
- final Integer maxRowsInSegment =
indexTuningConfig.getTargetPartitionSize();
return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
}
private static boolean exceedMaxRowsInAppenderator(
- long numRowsInAppenderator,
- ParallelIndexTuningConfig indexTuningConfig
+ // maxRowsInAppenderator can be null if numShards is set in
indexTuningConfig
+ @Nullable Long maxRowsInAppenderator,
+ long numRowsInAppenderator
)
{
- // maxRowsInAppenderator should be null if numShards is set in
indexTuningConfig
- final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
return maxRowsInAppenderator != null && maxRowsInAppenderator <=
numRowsInAppenderator;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 381b51c..5cd5f1f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.inject.Binder;
-import com.google.inject.Module;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -49,6 +47,7 @@ import
org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import
org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
@@ -59,6 +58,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@@ -71,8 +71,14 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SimpleQueryableIndex;
+import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.column.Column;
-import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.GenericColumn;
+import org.apache.druid.segment.column.SpatialIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
@@ -84,6 +90,7 @@ import
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.transform.TransformingInputRowParser;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthTestUtils;
@@ -120,6 +127,8 @@ import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class CompactionTaskTest
{
+ private static final long SEGMENT_SIZE_BYTES = 100;
+ private static final int NUM_ROWS_PER_SEGMENT = 10;
private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
@@ -204,7 +213,7 @@ public class CompactionTaskTest
new ArrayList<>(AGGREGATORS.keySet()),
new NumberedShardSpec(0, 1),
0,
- 1
+ SEGMENT_SIZE_BYTES
),
new File("file_" + i)
);
@@ -225,16 +234,11 @@ public class CompactionTaskTest
);
GuiceInjectableValues injectableValues = new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.<Module>of(
- new Module()
- {
- @Override
- public void configure(Binder binder)
- {
-
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
- binder.bind(ChatHandlerProvider.class).toInstance(new
NoopChatHandlerProvider());
-
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
- }
+ ImmutableList.of(
+ binder -> {
+
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+ binder.bind(ChatHandlerProvider.class).toInstance(new
NoopChatHandlerProvider());
+
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
}
)
)
@@ -265,7 +269,7 @@ public class CompactionTaskTest
private static IndexTuningConfig createTuningConfig()
{
return new IndexTuningConfig(
- 5000000,
+ null, // null to compute targetPartitionSize automatically
500000,
1000000L,
null,
@@ -329,6 +333,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@@ -359,6 +364,7 @@ public class CompactionTaskTest
SEGMENTS,
null,
null,
+ null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@@ -373,19 +379,21 @@ public class CompactionTaskTest
Assert.assertEquals(task.getInterval(), fromJson.getInterval());
Assert.assertEquals(task.getSegments(), fromJson.getSegments());
Assert.assertEquals(task.getDimensionsSpec(),
fromJson.getDimensionsSpec());
+ Assert.assertEquals(task.isKeepSegmentGranularity(),
fromJson.isKeepSegmentGranularity());
+ Assert.assertEquals(task.getTargetCompactionSizeBytes(),
fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(task.getContext(), fromJson.getContext());
}
@Test
- public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws
IOException, SegmentLoadingException
+ public void testCreateIngestionSchema() throws IOException,
SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration(
@@ -393,6 +401,12 @@ public class CompactionTaskTest
);
if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS);
} else {
@@ -402,14 +416,39 @@ public class CompactionTaskTest
}
@Test
- public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws
IOException, SegmentLoadingException
+ public void testCreateIngestionSchemaWithTargetPartitionSize() throws
IOException, SegmentLoadingException
{
+ final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+ 5,
+ 500000,
+ 1000000L,
+ null,
+ null,
+ null,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ 5000,
+ true,
+ false,
+ true,
+ false,
+ null,
+ 100L,
+ null,
+ null,
+ null,
+ null
+ );
final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(null, tuningConfig),
null,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration(
@@ -417,11 +456,142 @@ public class CompactionTaskTest
);
if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
Assert.assertEquals(5, ingestionSpecs.size());
- assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS);
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS, tuningConfig);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
- assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL));
+ assertIngestionSchema(
+ ingestionSpecs,
+ expectedDimensionsSpec,
+ Collections.singletonList(COMPACTION_INTERVAL),
+ tuningConfig
+ );
+ }
+ }
+
+ @Test
+ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException,
SegmentLoadingException
+ {
+ final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+ null,
+ 500000,
+ 1000000L,
+ 5L,
+ null,
+ null,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ 5000,
+ true,
+ false,
+ true,
+ false,
+ null,
+ 100L,
+ null,
+ null,
+ null,
+ null
+ );
+ final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ toolbox,
+ new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(null, tuningConfig),
+ null,
+ keepSegmentGranularity,
+ objectMapper
+ );
+ final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration(
+ keepSegmentGranularity
+ );
+
+ if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
+ Assert.assertEquals(5, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS, tuningConfig);
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(
+ ingestionSpecs,
+ expectedDimensionsSpec,
+ Collections.singletonList(COMPACTION_INTERVAL),
+ tuningConfig
+ );
+ }
+ }
+
+ @Test
+ public void testCreateIngestionSchemaWithNumShards() throws IOException,
SegmentLoadingException
+ {
+ final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+ null,
+ 500000,
+ 1000000L,
+ null,
+ null,
+ 3,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ 5000,
+ true,
+ false,
+ true,
+ false,
+ null,
+ 100L,
+ null,
+ null,
+ null,
+ null
+ );
+ final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ toolbox,
+ new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(null, tuningConfig),
+ null,
+ keepSegmentGranularity,
+ objectMapper
+ );
+ final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration(
+ keepSegmentGranularity
+ );
+
+ if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
+ Assert.assertEquals(5, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS, tuningConfig);
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(
+ ingestionSpecs,
+ expectedDimensionsSpec,
+ Collections.singletonList(COMPACTION_INTERVAL),
+ tuningConfig
+ );
}
}
@@ -458,13 +628,19 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(null, TUNING_CONFIG),
customSpec,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
Assert.assertEquals(5, ingestionSpecs.size());
final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
@@ -489,9 +665,9 @@ public class CompactionTaskTest
final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(SEGMENTS),
+ new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration(
@@ -499,6 +675,12 @@ public class CompactionTaskTest
);
if (keepSegmentGranularity) {
+ ingestionSpecs.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+ s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+ s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+ )
+ );
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec,
SEGMENT_INTERVALS);
} else {
@@ -518,9 +700,9 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
+ new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
}
@@ -537,9 +719,9 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
+ new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
keepSegmentGranularity,
- TUNING_CONFIG,
objectMapper
);
}
@@ -560,6 +742,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
@@ -567,6 +750,46 @@ public class CompactionTaskTest
);
}
+ @Test
+ public void testTargetPartitionSizeWithPartitionConfig() throws IOException,
SegmentLoadingException
+ {
+ final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+ 5,
+ 500000,
+ 1000000L,
+ null,
+ null,
+ null,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ 5000,
+ true,
+ false,
+ true,
+ false,
+ null,
+ 100L,
+ null,
+ null,
+ null,
+ null
+ );
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be
used with");
+ final List<IndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ toolbox,
+ new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ new PartitionConfigurationManager(5L, tuningConfig),
+ null,
+ keepSegmentGranularity,
+ objectMapper
+ );
+ }
+
private static List<DimensionsSpec>
getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
{
if (keepSegmentGranularity) {
@@ -618,6 +841,45 @@ public class CompactionTaskTest
List<Interval> expectedSegmentIntervals
)
{
+ assertIngestionSchema(
+ ingestionSchemas,
+ expectedDimensionsSpecs,
+ expectedSegmentIntervals,
+ new IndexTuningConfig(
+ 41943040, // automatically computed targetPartitionSize
+ 500000,
+ 1000000L,
+ null,
+ null,
+ null,
+ new IndexSpec(
+ new RoaringBitmapSerdeFactory(true),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ 5000,
+ true,
+ false,
+ true,
+ false,
+ null,
+ 100L,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ }
+
+ private static void assertIngestionSchema(
+ List<IndexIngestionSpec> ingestionSchemas,
+ List<DimensionsSpec> expectedDimensionsSpecs,
+ List<Interval> expectedSegmentIntervals,
+ IndexTuningConfig expectedTuningConfig
+ )
+ {
Preconditions.checkArgument(
ingestionSchemas.size() == expectedDimensionsSpecs.size(),
"ingesionSchemas.size()[%s] should be same with
expectedDimensionsSpecs.size()[%s]",
@@ -677,7 +939,7 @@ public class CompactionTaskTest
);
// assert tuningConfig
- Assert.assertEquals(createTuningConfig(),
ingestionSchema.getTuningConfig());
+ Assert.assertEquals(expectedTuningConfig,
ingestionSchema.getTuningConfig());
}
}
@@ -841,17 +1103,74 @@ public class CompactionTaskTest
private static Column createColumn(DimensionSchema dimensionSchema)
{
- return new ColumnBuilder()
- .setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()))
- .setDictionaryEncodedColumn(() -> null)
- .setBitmapIndex(() -> null)
- .build();
+ return new
TestColumn(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()));
}
private static Column createColumn(AggregatorFactory aggregatorFactory)
{
- return new ColumnBuilder()
- .setType(ValueType.fromString(aggregatorFactory.getTypeName()))
- .build();
+ return new
TestColumn(ValueType.fromString(aggregatorFactory.getTypeName()));
+ }
+
+ private static class TestColumn implements Column
+ {
+ private final ColumnCapabilities columnCapabilities;
+
+ TestColumn(ValueType type)
+ {
+ columnCapabilities = new ColumnCapabilitiesImpl()
+ .setType(type)
+ .setDictionaryEncoded(type == ValueType.STRING) // set a fake value
to make string columns
+ .setHasBitmapIndexes(type == ValueType.STRING)
+ .setHasSpatialIndexes(false)
+ .setHasMultipleValues(false);
+ }
+
+ @Override
+ public ColumnCapabilities getCapabilities()
+ {
+ return columnCapabilities;
+ }
+
+ @Override
+ public int getLength()
+ {
+ return NUM_ROWS_PER_SEGMENT;
+ }
+
+ @Override
+ public DictionaryEncodedColumn getDictionaryEncoding()
+ {
+ return null;
+ }
+
+ @Override
+ public GenericColumn getGenericColumn()
+ {
+ return null;
+ }
+
+ @Override
+ public ComplexColumn getComplexColumn()
+ {
+ return null;
+ }
+
+ @Override
+ public BitmapIndex getBitmapIndex()
+ {
+ return null;
+ }
+
+ @Override
+ public SpatialIndex getSpatialIndex()
+ {
+ return null;
+ }
+
+ @Override
+ public SettableColumnValueSelector makeSettableColumnValueSelector()
+ {
+ return null;
+ }
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index a2c742e..31e2516 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -106,14 +106,14 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class
);
- Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs());
- Assert.assertEquals(false, tuningConfig.isReportParseExceptions());
+ Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs());
+ Assert.assertFalse(tuningConfig.isReportParseExceptions());
Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
Assert.assertEquals(new Period(Integer.MAX_VALUE),
tuningConfig.getIntermediatePersistPeriod());
Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
- Assert.assertEquals(null, tuningConfig.getNumShards());
- Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize());
+ Assert.assertNull(tuningConfig.getNumShards());
+ Assert.assertNull(tuningConfig.getTargetPartitionSize());
}
@Test
@@ -125,14 +125,14 @@ public class TaskSerdeTest
);
Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
- Assert.assertEquals(null, tuningConfig.getNumShards());
+ Assert.assertNull(tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\", \"numShards\":10}",
IndexTask.IndexTuningConfig.class
);
- Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+ Assert.assertNull(tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue(
@@ -140,7 +140,7 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class
);
- Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+ Assert.assertNull(tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue(
@@ -148,7 +148,7 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class
);
- Assert.assertEquals(null, tuningConfig.getNumShards());
+ Assert.assertNull(tuningConfig.getNumShards());
Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
tuningConfig = jsonMapper.readValue(
@@ -156,11 +156,8 @@ public class TaskSerdeTest
IndexTask.IndexTuningConfig.class
);
- Assert.assertEquals(null, tuningConfig.getNumShards());
- Assert.assertEquals(
- IndexTask.IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE,
- (int) tuningConfig.getTargetPartitionSize()
- );
+ Assert.assertNull(tuningConfig.getNumShards());
+ Assert.assertNull(tuningConfig.getTargetPartitionSize());
}
@Test
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 1ddcc9e..6bc1336 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -31,9 +31,10 @@ import java.util.Objects;
public class DataSourceCompactionConfig
{
+ public static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 *
1024; // 400MB
+
// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
- private static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024
* 1024; // 400MB
private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new
Period("P1D");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]