This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7462b0b Allow missing intervals for Parallel task with hash/range
partitioning (#10592)
7462b0b is described below
commit 7462b0b953e87890a683a0bee7b7480465450342
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Nov 25 14:50:22 2020 -0800
Allow missing intervals for Parallel task with hash/range partitioning
(#10592)
* Allow missing intervals for Parallel task
* fix row filter
* fix tests
* fix log
---
.../common/actions/SurrogateTaskActionClient.java | 45 ++++++
.../task/OverlordCoordinatingSegmentAllocator.java | 48 +++---
.../batch/parallel/ParallelIndexIngestionSpec.java | 5 +
.../parallel/ParallelIndexSupervisorTask.java | 180 +++++++++++++--------
.../parallel/PartialDimensionCardinalityTask.java | 31 ++--
.../parallel/PartialDimensionDistributionTask.java | 31 ++--
...HashSegmentGenerateParallelIndexTaskRunner.java | 11 +-
.../parallel/PartialHashSegmentGenerateTask.java | 47 ++++--
.../parallel/PartialRangeSegmentGenerateTask.java | 8 +-
.../batch/parallel/PartialSegmentGenerateTask.java | 5 +
.../batch/parallel/PartialSegmentMergeTask.java | 4 +
.../batch/parallel/PerfectRollupWorkerTask.java | 7 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 6 +-
.../AbstractMultiPhaseParallelIndexingTest.java | 4 +-
.../parallel/DimensionCardinalityReportTest.java | 60 +++++--
...ashPartitionMultiPhaseParallelIndexingTest.java | 99 +++++++++---
.../ParallelIndexSupervisorTaskSerdeTest.java | 19 ---
.../PartialDimensionCardinalityTaskTest.java | 13 --
.../PartialDimensionDistributionTaskTest.java | 13 --
.../PartialGenericSegmentMergeTaskTest.java | 28 ++++
.../PartialHashSegmentGenerateTaskTest.java | 65 ++++++++
.../parallel/PerfectRollupWorkerTaskTest.java | 11 --
...ngePartitionMultiPhaseParallelIndexingTest.java | 23 +--
23 files changed, 521 insertions(+), 242 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
new file mode 100644
index 0000000..deafd20
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import java.io.IOException;
+
+/**
+ * A {@link TaskActionClient} that wraps a given {@link TaskAction} with
{@link SurrogateAction}.
+ * All subtasks of {@link
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask}
must
+ * use this client or wrap taskActions manually.
+ */
+public class SurrogateTaskActionClient implements TaskActionClient
+{
+ private final String supervisorTaskId;
+ private final TaskActionClient delegate;
+
+ public SurrogateTaskActionClient(String supervisorTaskId, TaskActionClient
delegate)
+ {
+ this.supervisorTaskId = supervisorTaskId;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws
IOException
+ {
+ return delegate.submit(new SurrogateAction<>(supervisorTaskId,
taskAction));
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index 87daaa8..6efd39a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -25,7 +25,8 @@ import
org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.actions.SurrogateAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
import
org.apache.druid.indexing.common.task.TaskLockHelper.OverwritingRootGenerationPartitions;
import
org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
import org.apache.druid.java.util.common.ISE;
@@ -58,8 +59,12 @@ public class OverlordCoordinatingSegmentAllocator implements
SegmentAllocatorFor
final PartitionsSpec partitionsSpec
)
{
+ final TaskActionClient taskActionClient =
+ supervisorTaskAccess == null
+ ? toolbox.getTaskActionClient()
+ : new
SurrogateTaskActionClient(supervisorTaskAccess.getSupervisorTaskId(),
toolbox.getTaskActionClient());
this.internalAllocator = new ActionBasedSegmentAllocator(
- toolbox.getTaskActionClient(),
+ taskActionClient,
dataSchema,
(schema, row, sequenceName, previousSegmentId,
skipSegmentLineageCheck) -> {
final GranularitySpec granularitySpec = schema.getGranularitySpec();
@@ -72,34 +77,17 @@ public class OverlordCoordinatingSegmentAllocator
implements SegmentAllocatorFor
taskLockHelper,
interval
);
- if (supervisorTaskAccess != null) {
- return new SurrogateAction<>(
- supervisorTaskAccess.getSupervisorTaskId(),
- new SegmentAllocateAction(
- schema.getDataSource(),
- row.getTimestamp(),
- schema.getGranularitySpec().getQueryGranularity(),
- schema.getGranularitySpec().getSegmentGranularity(),
- sequenceName,
- previousSegmentId,
- skipSegmentLineageCheck,
- partialShardSpec,
- taskLockHelper.getLockGranularityToUse()
- )
- );
- } else {
- return new SegmentAllocateAction(
- schema.getDataSource(),
- row.getTimestamp(),
- schema.getGranularitySpec().getQueryGranularity(),
- schema.getGranularitySpec().getSegmentGranularity(),
- sequenceName,
- previousSegmentId,
- skipSegmentLineageCheck,
- partialShardSpec,
- taskLockHelper.getLockGranularityToUse()
- );
- }
+ return new SegmentAllocateAction(
+ schema.getDataSource(),
+ row.getTimestamp(),
+ schema.getGranularitySpec().getQueryGranularity(),
+ schema.getGranularitySpec().getSegmentGranularity(),
+ sequenceName,
+ previousSegmentId,
+ skipSegmentLineageCheck,
+ partialShardSpec,
+ taskLockHelper.getLockGranularityToUse()
+ );
}
);
this.sequenceNameFunction = new
LinearlyPartitionedSequenceNameFunction(taskId);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
index 06baae4..5d057cd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
@@ -63,6 +63,11 @@ public class ParallelIndexIngestionSpec extends
IngestionSpec<ParallelIndexIOCon
this.tuningConfig = tuningConfig == null ?
ParallelIndexTuningConfig.defaultConfig() : tuningConfig;
}
+ public ParallelIndexIngestionSpec withDataSchema(DataSchema dataSchema)
+ {
+ return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
+ }
+
@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 5cbfc01..3d272b4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -188,10 +188,6 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
if (isGuaranteedRollup(ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
-
- if
(ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
- throw new ISE("forceGuaranteedRollup is set but intervals is missing
in granularitySpec");
- }
}
this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
@@ -290,7 +286,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner
createPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
- Integer numShardsOverride
+ ParallelIndexIngestionSpec ingestionSchema,
+ @Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
return new PartialHashSegmentGenerateParallelIndexTaskRunner(
@@ -299,7 +296,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
getGroupId(),
ingestionSchema,
getContext(),
- numShardsOverride
+ intervalToNumShardsOverride
);
}
@@ -318,7 +315,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@VisibleForTesting
PartialRangeSegmentGenerateParallelIndexTaskRunner
createPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
- Map<Interval, PartitionBoundaries> intervalToPartitions
+ Map<Interval, PartitionBoundaries> intervalToPartitions,
+ ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
@@ -334,16 +332,17 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
@VisibleForTesting
PartialGenericSegmentMergeParallelIndexTaskRunner
createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
- List<PartialGenericSegmentMergeIOConfig> ioConfigs
+ List<PartialGenericSegmentMergeIOConfig> ioConfigs,
+ ParallelIndexIngestionSpec ingestionSchema
)
{
return new PartialGenericSegmentMergeParallelIndexTaskRunner(
toolbox,
getId(),
getGroupId(),
- getIngestionSchema().getDataSchema(),
+ ingestionSchema.getDataSchema(),
ioConfigs,
- getIngestionSchema().getTuningConfig(),
+ ingestionSchema.getTuningConfig(),
getContext()
);
}
@@ -529,9 +528,30 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
: runHashPartitionMultiPhaseParallel(toolbox);
}
+ private static ParallelIndexIngestionSpec
rewriteIngestionSpecWithIntervalsIfMissing(
+ ParallelIndexIngestionSpec ingestionSchema,
+ Collection<Interval> intervals
+ )
+ {
+ if
(ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
+ return ingestionSchema
+ .withDataSchema(
+ ingestionSchema.getDataSchema().withGranularitySpec(
+ ingestionSchema
+ .getDataSchema()
+ .getGranularitySpec()
+ .withIntervals(new ArrayList<>(intervals))
+ )
+ );
+ } else {
+ return ingestionSchema;
+ }
+ }
+
private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox)
throws Exception
{
TaskState state;
+ ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof
HashedPartitionsSpec)) {
// only range and hash partitioning is supported for multiphase parallel
ingestion, see runMultiPhaseParallel()
@@ -541,49 +561,64 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
);
}
- final Integer numShardsOverride;
+ final Map<Interval, Integer> intervalToNumShards;
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec)
ingestionSchema.getTuningConfig().getPartitionsSpec();
- if (partitionsSpec.getNumShards() == null) {
- // 0. need to determine numShards by scanning the data
- LOG.info("numShards is unspecified, beginning %s phase.",
PartialDimensionCardinalityTask.TYPE);
+ final boolean needsInputSampling =
+ partitionsSpec.getNumShards() == null
+ ||
ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
+ if (needsInputSampling) {
+ // 0. need to determine intervals and numShards by scanning the data
+ LOG.info("Needs to determine intervals or numShards, beginning %s
phase.", PartialDimensionCardinalityTask.TYPE);
ParallelIndexTaskRunner<PartialDimensionCardinalityTask,
DimensionCardinalityReport> cardinalityRunner =
createRunner(
toolbox,
this::createPartialDimensionCardinalityRunner
);
- if (cardinalityRunner == null) {
- throw new ISE("Could not create cardinality runner for hash
partitioning.");
- }
-
state = runNextPhase(cardinalityRunner);
if (state.isFailure()) {
return TaskStatus.failure(getId());
}
- int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment()
== null
- ?
PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
- : partitionsSpec.getMaxRowsPerSegment();
- LOG.info("effective maxRowsPerSegment is: " +
effectiveMaxRowsPerSegment);
+ if (cardinalityRunner.getReports().isEmpty()) {
+ String msg = "No valid rows for hash partitioning."
+ + " All rows may have invalid timestamps or have been
filtered out.";
+ LOG.warn(msg);
+ return TaskStatus.success(getId(), msg);
+ }
+
+ if (partitionsSpec.getNumShards() == null) {
+ int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment()
== null
+ ?
PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
+ :
partitionsSpec.getMaxRowsPerSegment();
+ LOG.info("effective maxRowsPerSegment is: " +
effectiveMaxRowsPerSegment);
- if (cardinalityRunner.getReports() == null) {
- throw new ISE("Could not determine cardinalities for hash
partitioning.");
+ intervalToNumShards = determineNumShardsFromCardinalityReport(
+ cardinalityRunner.getReports().values(),
+ effectiveMaxRowsPerSegment
+ );
+ } else {
+ intervalToNumShards = CollectionUtils.mapValues(
+ mergeCardinalityReports(cardinalityRunner.getReports().values()),
+ k -> partitionsSpec.getNumShards()
+ );
}
- numShardsOverride = determineNumShardsFromCardinalityReport(
- cardinalityRunner.getReports().values(),
- effectiveMaxRowsPerSegment
- );
- LOG.info("Automatically determined numShards: " + numShardsOverride);
+ ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
+ ingestionSchemaToUse,
+ intervalToNumShards.keySet()
+ );
} else {
- numShardsOverride = null;
+ // numShards will be determined in PartialHashSegmentGenerateTask
+ intervalToNumShards = null;
}
// 1. Partial segment generation phase
+ final ParallelIndexIngestionSpec segmentCreateIngestionSpec =
ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialHashSegmentGenerateTask,
GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
createRunner(
toolbox,
- f -> createPartialHashSegmentGenerateRunner(toolbox,
numShardsOverride)
+ f -> createPartialHashSegmentGenerateRunner(toolbox,
segmentCreateIngestionSpec, intervalToNumShards)
);
state = runNextPhase(indexingRunner);
@@ -600,9 +635,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
partitionToLocations
);
+ final ParallelIndexIngestionSpec segmentMergeIngestionSpec =
ingestionSchemaToUse;
final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask,
PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
- tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
+ tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs,
segmentMergeIngestionSpec)
);
state = runNextPhase(mergeRunner);
if (state.isSuccess()) {
@@ -615,6 +651,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox)
throws Exception
{
+ ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
ParallelIndexTaskRunner<PartialDimensionDistributionTask,
DimensionDistributionReport> distributionRunner =
createRunner(
toolbox,
@@ -631,13 +668,22 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
if (intervalToPartitions.isEmpty()) {
String msg = "No valid rows for single dimension partitioning."
- + " All rows may have invalid timestamps or multiple dimension
values.";
+ + " All rows may have invalid timestamps or multiple
dimension values.";
LOG.warn(msg);
return TaskStatus.success(getId(), msg);
}
+ ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
+ ingestionSchemaToUse,
+ intervalToPartitions.keySet()
+ );
+
+ final ParallelIndexIngestionSpec segmentCreateIngestionSpec =
ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask,
GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
- createRunner(toolbox, tb ->
createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions));
+ createRunner(
+ toolbox,
+ tb -> createPartialRangeSegmentGenerateRunner(tb,
intervalToPartitions, segmentCreateIngestionSpec)
+ );
TaskState indexingState = runNextPhase(indexingRunner);
if (indexingState.isFailure()) {
@@ -652,9 +698,10 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
partitionToLocations
);
+ final ParallelIndexIngestionSpec segmentMergeIngestionSpec =
ingestionSchemaToUse;
ParallelIndexTaskRunner<PartialGenericSegmentMergeTask,
PushedSegmentsReport> mergeRunner = createRunner(
toolbox,
- tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
+ tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs,
segmentMergeIngestionSpec)
);
TaskState mergeState = runNextPhase(mergeRunner);
if (mergeState.isSuccess()) {
@@ -664,48 +711,45 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
return TaskStatus.fromCode(getId(), mergeState);
}
- @VisibleForTesting
- public static int determineNumShardsFromCardinalityReport(
- Collection<DimensionCardinalityReport> reports,
- int maxRowsPerSegment
- )
+ private static Map<Interval, Union>
mergeCardinalityReports(Collection<DimensionCardinalityReport> reports)
{
- // aggregate all the sub-reports
Map<Interval, Union> finalCollectors = new HashMap<>();
reports.forEach(report -> {
Map<Interval, byte[]> intervalToCardinality =
report.getIntervalToCardinalities();
for (Map.Entry<Interval, byte[]> entry :
intervalToCardinality.entrySet()) {
- Union union = finalCollectors.computeIfAbsent(
- entry.getKey(),
- (key) -> {
- return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
- }
- );
HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
- union.update(entryHll);
+ finalCollectors.computeIfAbsent(
+ entry.getKey(),
+ k -> new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K)
+ ).update(entryHll);
}
});
+ return finalCollectors;
+ }
- // determine the highest cardinality in any interval
- long maxCardinality = 0;
- for (Union union : finalCollectors.values()) {
- maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
- }
-
- LOG.info("Estimated max cardinality: " + maxCardinality);
-
- // determine numShards based on maxRowsPerSegment and the highest
per-interval cardinality
- long numShards = maxCardinality / maxRowsPerSegment;
- if (maxCardinality % maxRowsPerSegment != 0) {
- // if there's a remainder add 1 so we stay under maxRowsPerSegment
- numShards += 1;
- }
- try {
- return Math.toIntExact(numShards);
- }
- catch (ArithmeticException ae) {
- throw new ISE("Estimated numShards [%s] exceeds integer bounds.",
numShards);
- }
+ @VisibleForTesting
+ public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(
+ Collection<DimensionCardinalityReport> reports,
+ int maxRowsPerSegment
+ )
+ {
+ // aggregate all the sub-reports
+ Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
+
+ return CollectionUtils.mapValues(
+ finalCollectors,
+ union -> {
+ final double estimatedCardinality = union.getEstimate();
+ // determine numShards based on maxRowsPerSegment and the cardinality
+ final long estimatedNumShards = Math.round(estimatedCardinality /
maxRowsPerSegment);
+ try {
+ return Math.max(Math.toIntExact(estimatedNumShards), 1);
+ }
+ catch (ArithmeticException ae) {
+ throw new ISE("Estimated numShards [%s] exceeds integer bounds.",
estimatedNumShards);
+ }
+ }
+ );
}
private Map<Interval, PartitionBoundaries>
determineAllRangePartitions(Collection<DimensionDistributionReport> reports)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index f6759ec..48be3e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.data.input.InputFormat;
@@ -32,12 +33,12 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -52,11 +53,11 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_cardinality";
- private static final Logger LOG = new
Logger(PartialDimensionCardinalityTask.class);
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@@ -125,10 +126,14 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- return tryTimeChunkLock(
- taskActionClient,
-
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
- );
+ if
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
+ return tryTimeChunkLock(
+ new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+ );
+ } else {
+ return true;
+ }
}
@Override
@@ -159,6 +164,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
+ final boolean determineIntervals =
granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator =
AbstractBatchIndexTask.inputSourceReader(
@@ -166,7 +172,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
dataSchema,
inputSource,
inputFormat,
- AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+ determineIntervals ? Objects::nonNull :
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);
@@ -197,8 +203,15 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
InputRow inputRow = inputRowIterator.next();
// null rows are filtered out by FilteringCloseableInputRowIterator
DateTime timestamp = inputRow.getTimestamp();
- //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns
rows with present intervals)
- Interval interval = granularitySpec.bucketInterval(timestamp).get();
+ final Interval interval;
+ if (granularitySpec.inputIntervals().isEmpty()) {
+ interval =
granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+ } else {
+ final Optional<Interval> optInterval =
granularitySpec.bucketInterval(inputRow.getTimestamp());
+ // this interval must exist since it passed the rowFilter
+ assert optInterval.isPresent();
+ interval = optInterval.get();
+ }
Granularity queryGranularity = granularitySpec.getQueryGranularity();
HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 53019dd..6bec35d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.hash.BloomFilter;
@@ -34,6 +35,7 @@ import org.apache.druid.data.input.Rows;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
@@ -55,6 +57,7 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Supplier;
/**
@@ -163,10 +166,14 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
- return tryTimeChunkLock(
- taskActionClient,
-
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
- );
+ if
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
+ return tryTimeChunkLock(
+ new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+ );
+ } else {
+ return true;
+ }
}
@Override
@@ -195,6 +202,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
+ final boolean determineIntervals =
granularitySpec.inputIntervals().isEmpty();
try (
final CloseableIterator<InputRow> inputRowIterator =
AbstractBatchIndexTask.inputSourceReader(
@@ -202,7 +210,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
dataSchema,
inputSource,
inputFormat,
- AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+ determineIntervals ? Objects::nonNull :
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
buildSegmentsMeters,
parseExceptionHandler
);
@@ -243,10 +251,15 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
continue;
}
- DateTime timestamp = inputRow.getTimestamp();
-
- //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns
rows with present intervals)
- Interval interval = granularitySpec.bucketInterval(timestamp).get();
+ final Interval interval;
+ if (granularitySpec.inputIntervals().isEmpty()) {
+ interval =
granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+ } else {
+ final Optional<Interval> optInterval =
granularitySpec.bucketInterval(inputRow.getTimestamp());
+ // this interval must exist since it passed the rowFilter
+ assert optInterval.isPresent();
+ interval = optInterval.get();
+ }
String partitionDimensionValue =
Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
index 39024a1..8358606 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
@@ -21,7 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Map;
/**
@@ -32,7 +34,8 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
{
private static final String PHASE_NAME = "partial segment generation";
- private Integer numShardsOverride;
+ @Nullable
+ private final Map<Interval, Integer> intervalToNumShardsOverride;
PartialHashSegmentGenerateParallelIndexTaskRunner(
TaskToolbox toolbox,
@@ -40,11 +43,11 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
String groupId,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
- Integer numShardsOverride
+ @Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
super(toolbox, taskId, groupId, ingestionSchema, context);
- this.numShardsOverride = numShardsOverride;
+ this.intervalToNumShardsOverride = intervalToNumShardsOverride;
}
@Override
@@ -82,7 +85,7 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
numAttempts,
subTaskIngestionSpec,
context,
- numShardsOverride
+ intervalToNumShardsOverride
);
}
};
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 3b1f7ba..b252e5d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -57,7 +58,8 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
- private final Integer numShardsOverride;
+ @Nullable
+ private final Map<Interval, Integer> intervalToNumShardsOverride;
@JsonCreator
public PartialHashSegmentGenerateTask(
@@ -69,7 +71,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
@JsonProperty("numAttempts") final int numAttempts, // zero-based
counting
@JsonProperty(PROP_SPEC) final ParallelIndexIngestionSpec
ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
- @Nullable @JsonProperty("numShardsOverride") final Integer
numShardsOverride
+ @JsonProperty("intervalToNumShardsOverride") @Nullable final
Map<Interval, Integer> intervalToNumShardsOverride
)
{
super(
@@ -85,7 +87,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
- this.numShardsOverride = numShardsOverride;
+ this.intervalToNumShardsOverride = intervalToNumShardsOverride;
}
@JsonProperty
@@ -106,6 +108,13 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
return supervisorTaskId;
}
+ @Nullable
+ @JsonProperty
+ public Map<Interval, Integer> getIntervalToNumShardsOverride()
+ {
+ return intervalToNumShardsOverride;
+ }
+
@Override
public String getType()
{
@@ -116,7 +125,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
- taskActionClient,
+ new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@@ -134,7 +143,11 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
getId(),
granularitySpec,
new SupervisorTaskAccess(supervisorTaskId, taskClient),
- createHashPartitionAnalysisFromPartitionsSpec(granularitySpec,
partitionsSpec, numShardsOverride)
+ createHashPartitionAnalysisFromPartitionsSpec(
+ granularitySpec,
+ partitionsSpec,
+ intervalToNumShardsOverride
+ )
);
}
@@ -170,22 +183,24 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
public static HashPartitionAnalysis
createHashPartitionAnalysisFromPartitionsSpec(
GranularitySpec granularitySpec,
@Nonnull HashedPartitionsSpec partitionsSpec,
- @Nullable Integer numShardsOverride
+ @Nullable Map<Interval, Integer> intervalToNumShardsOverride
)
{
- final SortedSet<Interval> intervals =
granularitySpec.bucketIntervals().get();
+ final HashPartitionAnalysis partitionAnalysis = new
HashPartitionAnalysis(partitionsSpec);
- final int numBucketsPerInterval;
- if (numShardsOverride != null) {
- numBucketsPerInterval = numShardsOverride;
+ if (intervalToNumShardsOverride != null) {
+ // Some intervals populated from granularitySpec can be missing in
intervalToNumShardsOverride
+ // because intervalToNumShardsOverride contains only the intervals which
exist in input data.
+ // We only care about the intervals in intervalToNumShardsOverride here.
+ intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
} else {
- numBucketsPerInterval = partitionsSpec.getNumShards() == null
- ? 1
- : partitionsSpec.getNumShards();
- }
+ final SortedSet<Interval> intervals =
granularitySpec.bucketIntervals().get();
+ final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
+ ? 1
+ : partitionsSpec.getNumShards();
- final HashPartitionAnalysis partitionAnalysis = new
HashPartitionAnalysis(partitionsSpec);
- intervals.forEach(interval -> partitionAnalysis.updateBucket(interval,
numBucketsPerInterval));
+ intervals.forEach(interval -> partitionAnalysis.updateBucket(interval,
numBucketsPerInterval));
+ }
return partitionAnalysis;
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 57978f4..98c84cf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -135,9 +136,12 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
}
@Override
- public boolean isReady(TaskActionClient taskActionClient)
+ public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
- return true;
+ return tryTimeChunkLock(
+ new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+ );
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index ec8530b..a69827c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -81,6 +82,10 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
context
);
+ Preconditions.checkArgument(
+
!ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(),
+ "Missing intervals in granularitySpec"
+ );
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.inputRowIteratorBuilder = inputRowIteratorBuilder;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 099b24a..fad7b07 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -101,6 +101,10 @@ abstract class PartialSegmentMergeTask<S extends
ShardSpec, P extends PartitionL
context
);
+ Preconditions.checkArgument(
+ !dataSchema.getGranularitySpec().inputIntervals().isEmpty(),
+ "Missing intervals in granularitySpec"
+ );
this.ioConfig = ioConfig;
this.numAttempts = numAttempts;
this.supervisorTaskId = supervisorTaskId;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 7cce1d3..792182d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -62,12 +62,7 @@ abstract class PerfectRollupWorkerTask extends
AbstractBatchIndexTask
checkPartitionsSpec(tuningConfig.getGivenOrDefaultPartitionsSpec());
- granularitySpec = dataSchema.getGranularitySpec();
- Preconditions.checkArgument(
- !granularitySpec.inputIntervals().isEmpty(),
- "Missing intervals in granularitySpec"
- );
-
+ this.granularitySpec = dataSchema.getGranularitySpec();
this.dataSchema = dataSchema;
this.tuningConfig = tuningConfig;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 543de61..2cc9f72 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
@@ -149,7 +150,10 @@ public class SinglePhaseSubTask extends
AbstractBatchIndexTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
- return determineLockGranularityAndTryLock(taskActionClient,
ingestionSchema.getDataSchema().getGranularitySpec());
+ return determineLockGranularityAndTryLock(
+ new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ ingestionSchema.getDataSchema().getGranularitySpec()
+ );
}
@JsonProperty
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a6e2806..b57cede 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -71,6 +72,7 @@ import java.util.Set;
abstract class AbstractMultiPhaseParallelIndexingTest extends
AbstractParallelIndexSupervisorTaskTest
{
protected static final String DATASOURCE = "dataSource";
+ protected static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new
ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
@@ -176,7 +178,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
- Granularities.DAY,
+ SEGMENT_GRANULARITY,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
index 102b5f8..29bbe5c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
@@ -109,34 +109,74 @@ public class DimensionCardinalityReportTest
reports.add(report3);
// first interval in test has cardinality 4
- int numShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ Map<Interval, Integer> intervalToNumShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
1
);
- Assert.assertEquals(4L, numShards);
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 4,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
- numShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ intervalToNumShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
2
);
- Assert.assertEquals(2L, numShards);
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 2,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
- numShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ intervalToNumShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
3
);
- Assert.assertEquals(2L, numShards);
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
- numShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ intervalToNumShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
4
);
- Assert.assertEquals(1L, numShards);
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
- numShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ intervalToNumShards =
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
reports,
5
);
- Assert.assertEquals(1L, numShards);
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index 2689584..e725ab9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -31,8 +31,10 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -86,35 +88,44 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
private static final Interval INTERVAL_TO_INDEX =
Intervals.of("2017-12/P1M");
@Parameterized.Parameters(
- name = "lockGranularity={0}, useInputFormatApi={1},
maxNumConcurrentSubTasks={2}, numShards={3}"
+ name = "lockGranularity={0}, useInputFormatApi={1},
maxNumConcurrentSubTasks={2}, intervalToIndex={3}, numShards={4}"
)
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, false, 2, 2},
- new Object[]{LockGranularity.TIME_CHUNK, true, 2, 2},
- new Object[]{LockGranularity.TIME_CHUNK, true, 1, 2},
- new Object[]{LockGranularity.SEGMENT, true, 2, 2},
- new Object[]{LockGranularity.TIME_CHUNK, true, 2, null},
- new Object[]{LockGranularity.TIME_CHUNK, true, 1, null},
- new Object[]{LockGranularity.SEGMENT, true, 2, null}
+ new Object[]{LockGranularity.TIME_CHUNK, false, 2, INTERVAL_TO_INDEX,
2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX,
2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX,
2},
+ new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX,
null},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, null},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX,
null},
+ new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, null}
);
}
private final int maxNumConcurrentSubTasks;
+ @Nullable
+ private final Interval intervalToIndex;
+ @Nullable
private final Integer numShards;
private File inputDir;
+ // sorted input intervals
+ private List<Interval> inputIntervals;
public HashPartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks,
+ @Nullable Interval intervalToIndex,
@Nullable Integer numShards
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+ this.intervalToIndex = intervalToIndex;
this.numShards = numShards;
}
@@ -122,6 +133,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
+ final Set<Interval> intervals = new HashSet<>();
// set up data
for (int i = 0; i < 10; i++) {
try (final Writer writer =
@@ -129,6 +141,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
for (int j = 0; j < 10; j++) {
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j
+ 1, i + 10, i));
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j
+ 2, i + 11, i));
+
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d",
j + 1))));
+
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d",
j + 2))));
}
}
}
@@ -139,33 +153,70 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i +
1, i + 10, i));
}
}
+ inputIntervals = new ArrayList<>(intervals);
+ inputIntervals.sort(Comparators.intervalsByStartThenEnd());
}
@Test
public void testRun() throws Exception
{
+ final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
- new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1",
"dim2")),
+ new HashedPartitionsSpec(
+ maxRowsPerSegment,
+ numShards,
+ ImmutableList.of("dim1", "dim2")
+ ),
TaskState.SUCCESS,
false
);
- // we don't specify maxRowsPerSegment so it defaults to
DEFAULT_MAX_ROWS_PER_SEGMENT,
- // which is 5 million, so assume that there will only be 1 shard if
numShards is not set.
- int expectedSegmentCount = numShards != null ? numShards : 1;
-
- assertHashedPartition(publishedSegments, expectedSegmentCount);
+ final Map<Interval, Integer> expectedIntervalToNumSegments =
computeExpectedIntervalToNumSegments(
+ maxRowsPerSegment,
+ numShards
+ );
+ assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
}
@Test
public void testRunWithHashPartitionFunction() throws Exception
{
+ final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
- new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"),
HashPartitionFunction.MURMUR3_32_ABS),
+ new HashedPartitionsSpec(
+ maxRowsPerSegment,
+ numShards,
+ ImmutableList.of("dim1", "dim2"),
+ HashPartitionFunction.MURMUR3_32_ABS
+ ),
TaskState.SUCCESS,
false
);
- assertHashedPartition(publishedSegments, 2);
+ final Map<Interval, Integer> expectedIntervalToNumSegments =
computeExpectedIntervalToNumSegments(
+ maxRowsPerSegment,
+ numShards
+ );
+ assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
+ }
+
+ private Map<Interval, Integer> computeExpectedIntervalToNumSegments(
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Integer numShards
+ )
+ {
+ final Map<Interval, Integer> expectedIntervalToNumSegments = new
HashMap<>();
+ for (int i = 0; i < inputIntervals.size(); i++) {
+ if (numShards == null) {
+ if (i == 0 || i == inputIntervals.size() - 1) {
+ expectedIntervalToNumSegments.put(inputIntervals.get(i),
Math.round((float) 10 / maxRowsPerSegment));
+ } else {
+ expectedIntervalToNumSegments.put(inputIntervals.get(i),
Math.round((float) 20 / maxRowsPerSegment));
+ }
+ } else {
+ expectedIntervalToNumSegments.put(inputIntervals.get(i), numShards);
+ }
+ }
+ return expectedIntervalToNumSegments;
}
@Test
@@ -236,7 +287,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
- INTERVAL_TO_INDEX,
+ intervalToIndex,
inputDir,
"test_*",
partitionsSpec,
@@ -250,7 +301,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
null,
null,
PARSE_SPEC,
- INTERVAL_TO_INDEX,
+ intervalToIndex,
inputDir,
"test_*",
partitionsSpec,
@@ -261,15 +312,21 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
}
}
- private void assertHashedPartition(Set<DataSegment> publishedSegments, int
expectedNumSegments) throws IOException
+ private void assertHashedPartition(
+ Set<DataSegment> publishedSegments,
+ Map<Interval, Integer> expectedIntervalToNumSegments
+ ) throws IOException
{
final Map<Interval, List<DataSegment>> intervalToSegments = new
HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k
-> new ArrayList<>()).add(segment)
);
+ Assert.assertEquals(new HashSet<>(inputIntervals),
intervalToSegments.keySet());
final File tempSegmentDir = temporaryFolder.newFolder();
- for (List<DataSegment> segmentsInInterval : intervalToSegments.values()) {
- Assert.assertEquals(expectedNumSegments, segmentsInInterval.size());
+ for (Entry<Interval, List<DataSegment>> entry :
intervalToSegments.entrySet()) {
+ Interval interval = entry.getKey();
+ List<DataSegment> segmentsInInterval = entry.getValue();
+
Assert.assertEquals(expectedIntervalToNumSegments.get(interval).intValue(),
segmentsInInterval.size());
for (DataSegment segment : segmentsInInterval) {
Assert.assertSame(HashBasedNumberedShardSpec.class,
segment.getShardSpec().getClass());
final HashBasedNumberedShardSpec shardSpec =
(HashBasedNumberedShardSpec) segment.getShardSpec();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index a80e3a8..b4a7125 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -92,25 +92,6 @@ public class ParallelIndexSupervisorTaskSerdeTest
}
@Test
- public void forceGuaranteedRollupWithMissingIntervals()
- {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage(
- "forceGuaranteedRollup is set but intervals is missing in
granularitySpec"
- );
-
- Integer numShards = 2;
- new ParallelIndexSupervisorTaskBuilder()
- .ingestionSpec(
- new ParallelIndexIngestionSpecBuilder()
- .forceGuaranteedRollup(true)
- .partitionsSpec(new HashedPartitionsSpec(null, numShards,
null))
- .build()
- )
- .build();
- }
-
- @Test
public void forceGuaranteedRollupWithHashPartitionsMissingNumShards()
{
Integer numShards = null;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index 0ad4dde..0d65f86 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -109,19 +109,6 @@ public class PartialDimensionCardinalityTaskTest
}
@Test
- public void requiresGranularitySpecInputIntervals()
- {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Missing intervals in granularitySpec");
-
- DataSchema dataSchema =
ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
-
- new PartialDimensionCardinalityTaskBuilder()
- .dataSchema(dataSchema)
- .build();
- }
-
- @Test
public void serializesDeserializes()
{
PartialDimensionCardinalityTask task = new
PartialDimensionCardinalityTaskBuilder()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 87de8c2..7dbb6e3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -112,19 +112,6 @@ public class PartialDimensionDistributionTaskTest
}
@Test
- public void requiresGranularitySpecInputIntervals()
- {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Missing intervals in granularitySpec");
-
- DataSchema dataSchema =
ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
-
- new PartialDimensionDistributionTaskBuilder()
- .dataSchema(dataSchema)
- .build();
- }
-
- @Test
public void serializesDeserializes()
{
PartialDimensionDistributionTask task = new
PartialDimensionDistributionTaskBuilder()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
index 71a4858..a579c18 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
@@ -24,7 +24,9 @@ import org.apache.druid.segment.TestHelper;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Collections;
@@ -54,6 +56,9 @@ public class PartialGenericSegmentMergeTaskTest extends
AbstractParallelIndexSup
.build()
);
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
private PartialGenericSegmentMergeTask target;
@Before
@@ -82,4 +87,27 @@ public class PartialGenericSegmentMergeTaskTest extends
AbstractParallelIndexSup
String id = target.getId();
Assert.assertThat(id,
Matchers.startsWith(PartialGenericSegmentMergeTask.TYPE));
}
+
+ @Test
+ public void requiresGranularitySpecInputIntervals()
+ {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Missing intervals in granularitySpec");
+
+ new PartialGenericSegmentMergeTask(
+ ParallelIndexTestingFactory.AUTOMATIC_ID,
+ ParallelIndexTestingFactory.GROUP_ID,
+ ParallelIndexTestingFactory.TASK_RESOURCE,
+ ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
+ ParallelIndexTestingFactory.NUM_ATTEMPTS,
+ new PartialGenericSegmentMergeIngestionSpec(
+ ParallelIndexTestingFactory.createDataSchema(null),
+ IO_CONFIG,
+ new ParallelIndexTestingFactory.TuningConfigBuilder()
+ .partitionsSpec(PARTITIONS_SPEC)
+ .build()
+ ),
+ ParallelIndexTestingFactory.CONTEXT
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
index ac32c81..845fc44 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -33,10 +34,13 @@ import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.File;
import java.util.List;
+import java.util.Map;
public class PartialHashSegmentGenerateTaskTest
{
@@ -48,6 +52,9 @@ public class PartialHashSegmentGenerateTaskTest
ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
);
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private PartialHashSegmentGenerateTask target;
@Before
@@ -102,4 +109,62 @@ public class PartialHashSegmentGenerateTaskTest
Assert.assertEquals(expectedNumBuckets,
partitionAnalysis.getBucketAnalysis(interval).intValue());
}
}
+
+ @Test
+ public void
testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsMap()
+ {
+ final List<Interval> intervals = ImmutableList.of(
+ Intervals.of("2020-01-01/2020-01-02"),
+ Intervals.of("2020-01-02/2020-01-03"),
+ Intervals.of("2020-01-03/2020-01-04")
+ );
+ final Map<Interval, Integer> intervalToNumShards = ImmutableMap.of(
+ Intervals.of("2020-01-01/2020-01-02"),
+ 1,
+ Intervals.of("2020-01-02/2020-01-03"),
+ 2,
+ Intervals.of("2020-01-03/2020-01-04"),
+ 3
+ );
+ final HashPartitionAnalysis partitionAnalysis =
PartialHashSegmentGenerateTask
+ .createHashPartitionAnalysisFromPartitionsSpec(
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.NONE,
+ intervals
+ ),
+ new HashedPartitionsSpec(null, null, null),
+ intervalToNumShards
+ );
+ Assert.assertEquals(intervals.size(),
partitionAnalysis.getNumTimePartitions());
+ for (Interval interval : intervals) {
+ Assert.assertEquals(
+ intervalToNumShards.get(interval).intValue(),
+ partitionAnalysis.getBucketAnalysis(interval).intValue()
+ );
+ }
+ }
+
+ @Test
+ public void requiresGranularitySpecInputIntervals()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Missing intervals in granularitySpec");
+
+ new PartialHashSegmentGenerateTask(
+ ParallelIndexTestingFactory.AUTOMATIC_ID,
+ ParallelIndexTestingFactory.GROUP_ID,
+ ParallelIndexTestingFactory.TASK_RESOURCE,
+ ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
+ ParallelIndexTestingFactory.NUM_ATTEMPTS,
+ ParallelIndexTestingFactory.createIngestionSpec(
+ new LocalInputSource(new File("baseDir"), "filer"),
+ new JsonInputFormat(null, null, null),
+ new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
+ ParallelIndexTestingFactory.createDataSchema(null)
+ ),
+ ParallelIndexTestingFactory.CONTEXT,
+ null
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index af4cdf3..783aadc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -64,17 +64,6 @@ public class PerfectRollupWorkerTaskTest
}
@Test
- public void requiresGranularitySpecInputIntervals()
- {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Missing intervals in granularitySpec");
-
- new PerfectRollupWorkerTaskBuilder()
- .granularitySpecInputIntervals(Collections.emptyList())
- .build();
- }
-
- @Test
public void succeedsWithValidPartitionsSpec()
{
new PerfectRollupWorkerTaskBuilder().build();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 20e4f34..598aed9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -108,15 +108,16 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
0
);
- @Parameterized.Parameters(name = "{0}, useInputFormatApi={1},
maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
+ @Parameterized.Parameters(name = "{0}, useInputFormatApi={1},
maxNumConcurrentSubTasks={2}, useMultiValueDim={3}, intervalToIndex={4}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM},
- new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM},
- new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM},
- new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1,
!USE_MULTIVALUE_DIM}, // will spawn subtask
- new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2,
USE_MULTIVALUE_DIM} // expected to fail
+ new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+ new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+ new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM, null},
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2,
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1,
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}, // will spawn subtask
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2,
USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX} // expected to fail
);
}
@@ -132,17 +133,21 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
private final int maxNumConcurrentSubTasks;
private final boolean useMultivalueDim;
+ @Nullable
+ private final Interval intervalToIndex;
public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks,
- boolean useMultivalueDim
+ boolean useMultivalueDim,
+ @Nullable Interval intervalToIndex
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
this.useMultivalueDim = useMultivalueDim;
+ this.intervalToIndex = intervalToIndex;
}
@Before
@@ -309,7 +314,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
- INTERVAL_TO_INDEX,
+ intervalToIndex,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,
@@ -323,7 +328,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
null,
null,
PARSE_SPEC,
- INTERVAL_TO_INDEX,
+ intervalToIndex,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]