somu-imply commented on code in PR #12443:
URL: https://github.com/apache/druid/pull/12443#discussion_r869455415
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -901,13 +906,43 @@ public static Map<Interval, Integer>
determineNumShardsFromCardinalityReport(
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
+ return computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
+ }
+ @Nonnull
+ @VisibleForTesting
+ static Map<Interval, Integer> computeIntervalToNumShards(
+ int maxRowsPerSegment,
+ Map<Interval, Union> finalCollectors
+ )
+ {
return CollectionUtils.mapValues(
finalCollectors,
union -> {
final double estimatedCardinality = union.getEstimate();
- // determine numShards based on maxRowsPerSegment and the cardinality
- final long estimatedNumShards = Math.round(estimatedCardinality /
maxRowsPerSegment);
+ final long estimatedNumShards;
+ if (estimatedCardinality <= 0) {
+ // I don't think we can use the estimate in any way being
negative, seven sounds like a nice prime number
+ // it is ok if we end up not filling them all, the ingestion code
handles that
+ // Seven on the other hand will at least create some shards rather
than potentially a single huge one
+ estimatedNumShards = 7L;
Review Comment:
instead of setting this 7 here, we should move this as a final static
variable up top. Something like `DEFAULT_NUM_SHARDS`
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -901,13 +906,43 @@ public static Map<Interval, Integer>
determineNumShardsFromCardinalityReport(
{
// aggregate all the sub-reports
Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
+ return computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
+ }
+ @Nonnull
+ @VisibleForTesting
+ static Map<Interval, Integer> computeIntervalToNumShards(
+ int maxRowsPerSegment,
+ Map<Interval, Union> finalCollectors
+ )
+ {
return CollectionUtils.mapValues(
finalCollectors,
union -> {
final double estimatedCardinality = union.getEstimate();
- // determine numShards based on maxRowsPerSegment and the cardinality
- final long estimatedNumShards = Math.round(estimatedCardinality /
maxRowsPerSegment);
+ final long estimatedNumShards;
+ if (estimatedCardinality <= 0) {
+ // I don't think we can use the estimate in any way being
negative, seven sounds like a nice prime number
+ // it is ok if we end up not filling them all, the ingestion code
handles that
+ // Seven on the other hand will at least create some shards rather
than potentially a single huge one
+ estimatedNumShards = 7L;
Review Comment:
Just a question, why we went with 7 and say why not 3 ? Any rationale behind
it ?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -703,6 +704,10 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox
toolbox) throws Except
cardinalityRunner.getReports().values(),
effectiveMaxRowsPerSegment
);
+
+ // This is for potential debugging in case we suspect bad estimation
of cardinalities etc,
+ LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString());
+
Review Comment:
Apart from putting it in the log, do we need a metric around it too ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]