ccaominh commented on a change in pull request #9274: Refactoring some codes
around ingestion
URL: https://github.com/apache/druid/pull/9274#discussion_r372148217
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -598,59 +612,74 @@ public TaskStatus runTask(final TaskToolbox toolbox)
*
* @return a map indicating how many shardSpecs need to be created per
interval.
*/
- private Map<Interval, Pair<ShardSpecFactory, Integer>> determineShardSpecs(
+ private PartitionAnalysis determineShardSpecs(
final TaskToolbox toolbox,
final InputSource inputSource,
final File tmpDir,
- final PartitionsSpec nonNullPartitionsSpec
+ @Nonnull final PartitionsSpec partitionsSpec
) throws IOException
{
final ObjectMapper jsonMapper = toolbox.getJsonMapper();
- final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
- final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
final GranularitySpec granularitySpec =
ingestionSchema.getDataSchema().getGranularitySpec();
// Must determine intervals if unknown, since we acquire all locks before
processing any data.
final boolean determineIntervals =
!granularitySpec.bucketIntervals().isPresent();
// Must determine partitions if rollup is guaranteed and the user didn't
provide a specific value.
- final boolean determineNumPartitions =
nonNullPartitionsSpec.needsDeterminePartitions(false);
+ final boolean determineNumPartitions =
partitionsSpec.needsDeterminePartitions(false);
// if we were given number of shards per interval and the intervals, we
don't need to scan the data
if (!determineNumPartitions && !determineIntervals) {
log.info("Skipping determine partition scan");
- return createShardSpecWithoutInputScan(
- granularitySpec,
- ioConfig,
- tuningConfig,
- nonNullPartitionsSpec
- );
+ if (partitionsSpec.getType() == SecondaryPartitionType.HASH) {
+ return
PartialHashSegmentGenerateTask.createHashPartitionAnalysisFromPartitionsSpec(
+ granularitySpec,
+ (HashedPartitionsSpec) partitionsSpec
+ );
+ } else if (partitionsSpec.getType() == SecondaryPartitionType.LINEAR) {
+ return createLinearPartitionAnalysis(granularitySpec,
(DynamicPartitionsSpec) partitionsSpec);
+ } else {
+ throw new UOE("%s", partitionsSpec.getClass().getName());
+ }
Review comment:
What do you think about adding a method to the partition spec interface so
that future addition of partition specs do not potentially require modification
to this code?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]