jon-wei commented on a change in pull request #7547: Add support minor
compaction with segment locking
URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r306596112
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -180,198 +233,130 @@ private boolean tryTimeChunkLock(TaskActionClient
client, List<Interval> interva
return true;
}
- boolean tryLockWithSegments(TaskActionClient client, List<DataSegment>
segments, boolean isInitialRequest)
- throws IOException
+ @Nullable
+ public static Granularity findGranularityFromSegments(List<DataSegment>
segments)
+ {
+ if (segments.isEmpty()) {
+ return null;
+ }
+ final Period firstSegmentPeriod = segments.get(0).getInterval().toPeriod();
+ final boolean allHasSameGranularity = segments
+ .stream()
+ .allMatch(segment ->
firstSegmentPeriod.equals(segment.getInterval().toPeriod()));
+ if (allHasSameGranularity) {
+ return
GranularityType.fromPeriod(firstSegmentPeriod).getDefaultGranularity();
+ } else {
+ return null;
+ }
+ }
+
+ LockGranularityDeterminResult determineSegmentGranularity(List<DataSegment>
segments)
{
if (segments.isEmpty()) {
- changeSegmentGranularity = false;
- allInputSegments = Collections.emptySet();
- overwritingRootGenPartitions = Collections.emptyMap();
- return true;
+ log.info("Using segment lock for empty segments");
+ // Set useSegmentLock even though we don't get any locks.
+ // Note that we should get any lock before data ingestion if we are
supposed to use timChunk lock.
+ return new LockGranularityDeterminResult(LockGranularity.SEGMENT, null,
Collections.emptyList());
}
- if (requireLockInputSegments()) {
- // Create a timeline to find latest segments only
+ if (requireLockExistingSegments()) {
+ final Granularity granularityFromSegments =
findGranularityFromSegments(segments);
+ @Nullable
+ final Granularity segmentGranularityFromSpec = getSegmentGranularity();
final List<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
- changeSegmentGranularity = checkIfChangeSegmentGranularity(intervals);
- if (changeSegmentGranularity) {
- return tryTimeChunkLock(client, intervals);
+ if (granularityFromSegments == null
+ || segmentGranularityFromSpec != null
+ && (!granularityFromSegments.equals(segmentGranularityFromSpec)
+ || segments.stream().anyMatch(segment ->
!segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
+ // This case is one of the followings:
+ // 1) Segments have different granularities.
+ // 2) Segment granularity in ingestion spec is different from the one
of existig segments.
+ // 3) Some existing segments are not aligned with the segment
granularity in the ingestion spec.
+ log.info("Detected segmentGranularity change. Using timeChunk lock");
+ return new LockGranularityDeterminResult(LockGranularity.TIME_CHUNK,
intervals, null);
} else {
- final List<DataSegment> segmentsToLock;
+ // Use segment lock
+ // Create a timeline to find latest segments only
final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(
segments
);
- segmentsToLock = timeline.lookup(JodaUtils.umbrellaInterval(intervals))
- .stream()
- .map(TimelineObjectHolder::getObject)
- .flatMap(partitionHolder ->
StreamSupport.stream(
- partitionHolder.spliterator(),
- false
- ))
- .map(PartitionChunk::getObject)
- .collect(Collectors.toList());
- if (allInputSegments == null) {
- allInputSegments = new HashSet<>(segmentsToLock);
- overwritingRootGenPartitions = new HashMap<>();
- }
-
- final Map<Interval, List<DataSegment>> intervalToSegments = new
HashMap<>();
- for (DataSegment segment : segmentsToLock) {
- intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new
ArrayList<>()).add(segment);
- }
- intervalToSegments.values().forEach(
- segmentsToCheck ->
verifyAndFindRootPartitionRangeAndMinorVersion(segmentsToCheck,
isInitialRequest)
- );
- final Closer lockCloserOnError = Closer.create();
- for (Entry<Interval, List<DataSegment>> entry :
intervalToSegments.entrySet()) {
- final Interval interval = entry.getKey();
- final Set<Integer> partitionIds = entry.getValue().stream()
- .map(s ->
s.getShardSpec().getPartitionNum())
- .collect(Collectors.toSet());
- final List<LockResult> lockResults = client.submit(
- new SegmentLockTryAcquireAction(
- TaskLockType.EXCLUSIVE,
- interval,
- entry.getValue().get(0).getMajorVersion(),
- partitionIds
- )
- );
-
- lockResults.stream()
- .filter(LockResult::isOk)
- .map(result -> (SegmentLock) result.getTaskLock())
- .forEach(segmentLock -> lockCloserOnError.register(() ->
client.submit(
- new
SegmentLockReleaseAction(segmentLock.getInterval(),
segmentLock.getPartitionId())
- )));
-
- if (isInitialRequest && (lockResults.isEmpty() ||
lockResults.stream().anyMatch(result -> !result.isOk()))) {
- lockCloserOnError.close();
- return false;
- }
- }
- return true;
+ final List<DataSegment> segmentsToLock = timeline
+ .lookup(JodaUtils.umbrellaInterval(intervals))
+ .stream()
+ .map(TimelineObjectHolder::getObject)
+ .flatMap(partitionHolder ->
StreamSupport.stream(partitionHolder.spliterator(), false))
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
+ log.info("No segmentGranularity change detected and it's not perfect
rollup. Using segment lock");
+ return new LockGranularityDeterminResult(LockGranularity.SEGMENT,
null, segmentsToLock);
}
} else {
- changeSegmentGranularity = false;
- allInputSegments = Collections.emptySet();
- overwritingRootGenPartitions = Collections.emptyMap();
- return true;
+ // Set useSegmentLock even though we don't get any locks.
+ // Note that we should get any lock before data ingestion if we are
supposed to use timChunk lock.
+ log.info("Using segment lock since we don't have to lock existing
segments");
+ return new LockGranularityDeterminResult(LockGranularity.SEGMENT, null,
Collections.emptyList());
}
}
- /**
- * This method is called when the task overwrites existing segments with
segment locks. It verifies the input segments
- * can be locked together, so that output segments can overshadow existing
ones properly.
- * <p>
- * This method checks two things:
- * <p>
- * - Are rootPartition range of inputSegments adjacent? Two rootPartition
ranges are adjacent if they are consecutive.
- * - All atomicUpdateGroups of inputSegments must be full. (See {@code
AtomicUpdateGroup#isFull()}).
- */
- private void
verifyAndFindRootPartitionRangeAndMinorVersion(List<DataSegment> inputSegments,
boolean isInitialRequest)
+ protected static List<DataSegment> findInputSegments(
+ String dataSource,
+ TaskActionClient actionClient,
+ List<Interval> intervalsToFind, // TODO: must be checked somewhere?
probably?
+ FirehoseFactory firehoseFactory
+ ) throws IOException
{
- if (inputSegments.isEmpty()) {
- return;
- }
+ if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
+ final List<WindowedSegmentId> inputSegments =
((IngestSegmentFirehoseFactory) firehoseFactory).getSegments();
+ if (inputSegments == null) {
+ final Interval inputInterval = Preconditions.checkNotNull(
+ ((IngestSegmentFirehoseFactory) firehoseFactory).getInterval(),
+ "input interval"
+ );
- final List<DataSegment> sortedSegments = new ArrayList<>(inputSegments);
- sortedSegments.sort((s1, s2) -> {
- if (s1.getStartRootPartitionId() != s2.getStartRootPartitionId()) {
- return Integer.compare(s1.getStartRootPartitionId(),
s2.getStartRootPartitionId());
+ return actionClient.submit(
+ new SegmentListUsedAction(dataSource, null,
Collections.singletonList(inputInterval))
+ );
} else {
- return Integer.compare(s1.getEndRootPartitionId(),
s2.getEndRootPartitionId());
+ final List<String> inputSegmentIds = inputSegments.stream()
+
.map(WindowedSegmentId::getSegmentId)
+
.collect(Collectors.toList());
+ final List<DataSegment> dataSegmentsInIntervals = actionClient.submit(
+ new SegmentListUsedAction(
+ dataSource,
+ null,
+ inputSegments.stream()
+ .flatMap(windowedSegmentId ->
windowedSegmentId.getIntervals().stream())
+ .collect(Collectors.toSet())
+ )
+ );
+ return dataSegmentsInIntervals.stream()
+ .filter(segment ->
inputSegmentIds.contains(segment.getId().toString()))
+ .collect(Collectors.toList());
}
- });
- if (isInitialRequest) {
- verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(sortedSegments);
+ } else {
+ return actionClient.submit(new SegmentListUsedAction(dataSource, null,
intervalsToFind));
}
- final Interval interval = sortedSegments.get(0).getInterval();
- final short prevMaxMinorVersion = (short) sortedSegments
- .stream()
- .mapToInt(DataSegment::getMinorVersion)
- .max()
- .orElseThrow(() -> new ISE("Empty inputSegments"));
-
- overwritingRootGenPartitions.put(
- interval,
- new OverwritingRootGenerationPartitions(
- sortedSegments.get(0).getStartRootPartitionId(),
- sortedSegments.get(sortedSegments.size() -
1).getEndRootPartitionId(),
- prevMaxMinorVersion
- )
- );
- }
-
- public Set<DataSegment> getAllInputSegments()
- {
- return Preconditions.checkNotNull(allInputSegments, "allInputSegments is
not initialized");
- }
-
- Map<Interval, OverwritingRootGenerationPartitions>
getAllOverwritingSegmentMeta()
- {
- Preconditions.checkNotNull(overwritingRootGenPartitions,
"overwritingRootGenPartitions is not initialized");
- return Collections.unmodifiableMap(overwritingRootGenPartitions);
- }
-
- public boolean isChangeSegmentGranularity()
- {
- return Preconditions.checkNotNull(changeSegmentGranularity,
"changeSegmentGranularity is not initialized");
- }
-
- public boolean hasInputSegments()
- {
- Preconditions.checkNotNull(overwritingRootGenPartitions,
"overwritingRootGenPartitions is not initialized");
- return !overwritingRootGenPartitions.isEmpty();
}
- @Nullable
- public OverwritingRootGenerationPartitions
getOverwritingSegmentMeta(Interval interval)
+ private static class LockGranularityDeterminResult
Review comment:
LockGranularityDeterminResult -> LockGranularityDetermineResult
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]