jon-wei commented on a change in pull request #7547: Add support minor
compaction with segment locking
URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r297856978
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
##########
@@ -195,63 +205,155 @@ public TaskStatus run(final TaskToolbox toolbox) throws
Exception
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
- final List<DataSegment> pushedSegments = generateAndPushSegments(
+ final Set<DataSegment> pushedSegments = generateAndPushSegments(
toolbox,
taskClient,
firehoseFactory,
firehoseTempDir
);
- taskClient.report(supervisorTaskId, pushedSegments);
+
+ // Find inputSegments overshadowed by pushedSegments
+ final Set<DataSegment> allSegments = new HashSet<>(getAllInputSegments());
+ allSegments.addAll(pushedSegments);
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(allSegments);
+ final Set<DataSegment> oldSegments = timeline.findFullyOvershadowed()
+ .stream()
+ .flatMap(holder ->
holder.getObject().stream())
+
.map(PartitionChunk::getObject)
+ .collect(Collectors.toSet());
+ taskClient.report(supervisorTaskId, oldSegments, pushedSegments);
return TaskStatus.success(getId());
}
- private void tryAcquireExclusiveSurrogateLocks(
- TaskActionClient client,
- SortedSet<Interval> intervals
- )
+ @Override
+ public boolean requireLockInputSegments()
+ {
+ return !ingestionSchema.getIOConfig().isAppendToExisting();
+ }
+
+ @Override
+ public List<DataSegment> findInputSegments(TaskActionClient
taskActionClient, List<Interval> intervals)
throws IOException
{
- for (Interval interval : Tasks.computeCompactIntervals(intervals)) {
- Preconditions.checkNotNull(
- client.submit(
- new SurrogateAction<>(supervisorTaskId, new
LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval))
- ),
- "Cannot acquire a lock for interval[%s]", interval
- );
- }
+ return IndexTask.findInputSegments(
+ getDataSource(),
+ taskActionClient,
+ intervals,
+ ingestionSchema.getIOConfig().getFirehoseFactory()
+ );
}
- private SegmentAllocator createSegmentAllocator(
- TaskToolbox toolbox,
- ParallelIndexTaskClient taskClient,
- ParallelIndexIngestionSpec ingestionSchema
- )
+ @Override
+ public boolean checkIfChangeSegmentGranularity(List<Interval>
intervalOfExistingSegments)
{
- final DataSchema dataSchema = ingestionSchema.getDataSchema();
- final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
- if (ioConfig.isAppendToExisting()) {
- return new ActionBasedSegmentAllocator(
- toolbox.getTaskActionClient(),
- dataSchema,
- (schema, row, sequenceName, previousSegmentId,
skipSegmentLineageCheck) -> new SurrogateAction<>(
- supervisorTaskId,
- new SegmentAllocateAction(
- schema.getDataSource(),
- row.getTimestamp(),
- schema.getGranularitySpec().getQueryGranularity(),
- schema.getGranularitySpec().getSegmentGranularity(),
- sequenceName,
- previousSegmentId,
- skipSegmentLineageCheck
- )
- )
- );
+ final Granularity segmentGranularity =
ingestionSchema.getDataSchema().getGranularitySpec().getSegmentGranularity();
+ return intervalOfExistingSegments.stream().anyMatch(interval ->
!segmentGranularity.match(interval));
+ }
+
+ @Override
+ public boolean isPerfectRollup()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Granularity getSegmentGranularity(Interval interval)
+ {
+ final GranularitySpec granularitySpec =
ingestionSchema.getDataSchema().getGranularitySpec();
+ if (granularitySpec instanceof ArbitraryGranularitySpec) {
+ return null;
} else {
- return (row, sequenceName, previousSegmentId, skipSegmentLineageCheck)
-> taskClient.allocateSegment(
- supervisorTaskId,
- row.getTimestamp()
- );
+ return granularitySpec.getSegmentGranularity();
+ }
+ }
+
+ @VisibleForTesting
+ SegmentAllocator createSegmentAllocator(TaskToolbox toolbox,
ParallelIndexTaskClient taskClient)
+ {
+ return new WrappingSegmentAllocator(toolbox, taskClient);
+ }
+
+ private class WrappingSegmentAllocator implements SegmentAllocator
Review comment:
Suggest calling this `LazyInitializationSegmentAllocator` instead
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]