jihoonson commented on a change in pull request #9274: Refactoring some codes
around ingestion
URL: https://github.com/apache/druid/pull/9274#discussion_r373924993
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -598,59 +612,74 @@ public TaskStatus runTask(final TaskToolbox toolbox)
*
* @return a map indicating how many shardSpecs need to be created per
interval.
*/
- private Map<Interval, Pair<ShardSpecFactory, Integer>> determineShardSpecs(
+ private PartitionAnalysis determineShardSpecs(
final TaskToolbox toolbox,
final InputSource inputSource,
final File tmpDir,
- final PartitionsSpec nonNullPartitionsSpec
+ @Nonnull final PartitionsSpec partitionsSpec
) throws IOException
{
final ObjectMapper jsonMapper = toolbox.getJsonMapper();
- final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
- final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
final GranularitySpec granularitySpec =
ingestionSchema.getDataSchema().getGranularitySpec();
// Must determine intervals if unknown, since we acquire all locks before
processing any data.
final boolean determineIntervals =
!granularitySpec.bucketIntervals().isPresent();
// Must determine partitions if rollup is guaranteed and the user didn't
provide a specific value.
- final boolean determineNumPartitions =
nonNullPartitionsSpec.needsDeterminePartitions(false);
+ final boolean determineNumPartitions =
partitionsSpec.needsDeterminePartitions(false);
// if we were given number of shards per interval and the intervals, we
don't need to scan the data
if (!determineNumPartitions && !determineIntervals) {
log.info("Skipping determine partition scan");
- return createShardSpecWithoutInputScan(
- granularitySpec,
- ioConfig,
- tuningConfig,
- nonNullPartitionsSpec
- );
+ if (partitionsSpec.getType() == SecondaryPartitionType.HASH) {
+ return
PartialHashSegmentGenerateTask.createHashPartitionAnalysisFromPartitionsSpec(
+ granularitySpec,
+ (HashedPartitionsSpec) partitionsSpec
+ );
+ } else if (partitionsSpec.getType() == SecondaryPartitionType.LINEAR) {
+ return createLinearPartitionAnalysis(granularitySpec,
(DynamicPartitionsSpec) partitionsSpec);
+ } else {
+ throw new UOE("%s", partitionsSpec.getClass().getName());
+ }
Review comment:
I think it's a good idea. Unfortunately, `CompletePartitionAnalysis` needs
`TaskToolbox` which is in the `indexing-service` module. This prevents
partition analysis classes from moving to the `core` module.
However, I think I can remove `TaskToolbox` from `CompletePartitionAnalysis`
in my next PR. I'll do it later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]