gianm commented on code in PR #13280:
URL: https://github.com/apache/druid/pull/13280#discussion_r1013179707
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -725,44 +728,45 @@ private static ParallelIndexIOConfig createIoConfig(
);
}
- private static NonnullPair<Map<DataSegment, File>,
List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
+ private static List<TimelineObjectHolder<String, DataSegment>>
retrieveRelevantTimelineHolders(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
- ) throws IOException, SegmentLoadingException
+ ) throws IOException
{
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
- final Map<DataSegment, File> segmentFileMap =
toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
- return new NonnullPair<>(segmentFileMap, timelineSegments);
+ return timelineSegments;
}
private static DataSchema createDataSchema(
String dataSource,
- List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+ Interval totalInterval,
+ Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>>
segments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nonnull ClientCompactionTaskGranularitySpec granularitySpec
)
{
- // check index metadata &
- // Decide which values to propagate (i.e. carry over) for rollup &
queryGranularity
- final SettableSupplier<Boolean> rollup = new SettableSupplier<>();
- final SettableSupplier<Granularity> queryGranularity = new
SettableSupplier<>();
- decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity,
queryableIndexAndSegments);
-
- final Interval totalInterval = JodaUtils.umbrellaInterval(
- queryableIndexAndSegments.stream().map(p ->
p.rhs.getInterval()).collect(Collectors.toList())
+ // Check index metadata & decide which values to propagate (i.e. carry
over) for rollup & queryGranularity
+ final ExistingSegmentAnalyzer existingSegmentAnalyzer = new
ExistingSegmentAnalyzer(
+ segments,
+ granularitySpec.isRollup() == null,
Review Comment:
"ExistingSegmentAnalyzer only has to deal with the specs of the downloaded
segments" is what I was thinking. I wanted to pass in the minimal amount of
information to ExistingSegmentAnalyzer, in order to keep its logic as simple as
possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]