jon-wei commented on a change in pull request #7547: Add support minor 
compaction with segment locking
URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r306596112
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 ##########
 @@ -180,198 +233,130 @@ private boolean tryTimeChunkLock(TaskActionClient 
client, List<Interval> interva
     return true;
   }
 
-  boolean tryLockWithSegments(TaskActionClient client, List<DataSegment> 
segments, boolean isInitialRequest)
-      throws IOException
+  @Nullable
+  public static Granularity findGranularityFromSegments(List<DataSegment> 
segments)
+  {
+    if (segments.isEmpty()) {
+      return null;
+    }
+    final Period firstSegmentPeriod = segments.get(0).getInterval().toPeriod();
+    final boolean allHasSameGranularity = segments
+        .stream()
+        .allMatch(segment -> 
firstSegmentPeriod.equals(segment.getInterval().toPeriod()));
+    if (allHasSameGranularity) {
+      return 
GranularityType.fromPeriod(firstSegmentPeriod).getDefaultGranularity();
+    } else {
+      return null;
+    }
+  }
+
+  LockGranularityDeterminResult determineSegmentGranularity(List<DataSegment> 
segments)
   {
     if (segments.isEmpty()) {
-      changeSegmentGranularity = false;
-      allInputSegments = Collections.emptySet();
-      overwritingRootGenPartitions = Collections.emptyMap();
-      return true;
+      log.info("Using segment lock for empty segments");
+      // Set useSegmentLock even though we don't get any locks.
+      // Note that we should get any lock before data ingestion if we are 
supposed to use timChunk lock.
+      return new LockGranularityDeterminResult(LockGranularity.SEGMENT, null, 
Collections.emptyList());
     }
 
-    if (requireLockInputSegments()) {
-      // Create a timeline to find latest segments only
+    if (requireLockExistingSegments()) {
+      final Granularity granularityFromSegments = 
findGranularityFromSegments(segments);
+      @Nullable
+      final Granularity segmentGranularityFromSpec = getSegmentGranularity();
       final List<Interval> intervals = 
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
 
-      changeSegmentGranularity = checkIfChangeSegmentGranularity(intervals);
-      if (changeSegmentGranularity) {
-        return tryTimeChunkLock(client, intervals);
+      if (granularityFromSegments == null
+          || segmentGranularityFromSpec != null
+             && (!granularityFromSegments.equals(segmentGranularityFromSpec)
+                 || segments.stream().anyMatch(segment -> 
!segmentGranularityFromSpec.isAligned(segment.getInterval())))) {
+        // This case is one of the followings:
+        // 1) Segments have different granularities.
+        // 2) Segment granularity in ingestion spec is different from the one 
of existig segments.
+        // 3) Some existing segments are not aligned with the segment 
granularity in the ingestion spec.
+        log.info("Detected segmentGranularity change. Using timeChunk lock");
+        return new LockGranularityDeterminResult(LockGranularity.TIME_CHUNK, 
intervals, null);
       } else {
-        final List<DataSegment> segmentsToLock;
+        // Use segment lock
+        // Create a timeline to find latest segments only
         final VersionedIntervalTimeline<String, DataSegment> timeline = 
VersionedIntervalTimeline.forSegments(
             segments
         );
-        segmentsToLock = timeline.lookup(JodaUtils.umbrellaInterval(intervals))
-                                 .stream()
-                                 .map(TimelineObjectHolder::getObject)
-                                 .flatMap(partitionHolder -> 
StreamSupport.stream(
-                                     partitionHolder.spliterator(),
-                                     false
-                                 ))
-                                 .map(PartitionChunk::getObject)
-                                 .collect(Collectors.toList());
 
-        if (allInputSegments == null) {
-          allInputSegments = new HashSet<>(segmentsToLock);
-          overwritingRootGenPartitions = new HashMap<>();
-        }
-
-        final Map<Interval, List<DataSegment>> intervalToSegments = new 
HashMap<>();
-        for (DataSegment segment : segmentsToLock) {
-          intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new 
ArrayList<>()).add(segment);
-        }
-        intervalToSegments.values().forEach(
-            segmentsToCheck -> 
verifyAndFindRootPartitionRangeAndMinorVersion(segmentsToCheck, 
isInitialRequest)
-        );
-        final Closer lockCloserOnError = Closer.create();
-        for (Entry<Interval, List<DataSegment>> entry : 
intervalToSegments.entrySet()) {
-          final Interval interval = entry.getKey();
-          final Set<Integer> partitionIds = entry.getValue().stream()
-                                                 .map(s -> 
s.getShardSpec().getPartitionNum())
-                                                 .collect(Collectors.toSet());
-          final List<LockResult> lockResults = client.submit(
-              new SegmentLockTryAcquireAction(
-                  TaskLockType.EXCLUSIVE,
-                  interval,
-                  entry.getValue().get(0).getMajorVersion(),
-                  partitionIds
-              )
-          );
-
-          lockResults.stream()
-                     .filter(LockResult::isOk)
-                     .map(result -> (SegmentLock) result.getTaskLock())
-                     .forEach(segmentLock -> lockCloserOnError.register(() -> 
client.submit(
-                         new 
SegmentLockReleaseAction(segmentLock.getInterval(), 
segmentLock.getPartitionId())
-                     )));
-
-          if (isInitialRequest && (lockResults.isEmpty() || 
lockResults.stream().anyMatch(result -> !result.isOk()))) {
-            lockCloserOnError.close();
-            return false;
-          }
-        }
-        return true;
+        final List<DataSegment> segmentsToLock = timeline
+            .lookup(JodaUtils.umbrellaInterval(intervals))
+            .stream()
+            .map(TimelineObjectHolder::getObject)
+            .flatMap(partitionHolder -> 
StreamSupport.stream(partitionHolder.spliterator(), false))
+            .map(PartitionChunk::getObject)
+            .collect(Collectors.toList());
+        log.info("No segmentGranularity change detected and it's not perfect 
rollup. Using segment lock");
+        return new LockGranularityDeterminResult(LockGranularity.SEGMENT, 
null, segmentsToLock);
       }
     } else {
-      changeSegmentGranularity = false;
-      allInputSegments = Collections.emptySet();
-      overwritingRootGenPartitions = Collections.emptyMap();
-      return true;
+      // Set useSegmentLock even though we don't get any locks.
+      // Note that we should get any lock before data ingestion if we are 
supposed to use timChunk lock.
+      log.info("Using segment lock since we don't have to lock existing 
segments");
+      return new LockGranularityDeterminResult(LockGranularity.SEGMENT, null, 
Collections.emptyList());
     }
   }
 
-  /**
-   * This method is called when the task overwrites existing segments with 
segment locks. It verifies the input segments
-   * can be locked together, so that output segments can overshadow existing 
ones properly.
-   * <p>
-   * This method checks two things:
-   * <p>
-   * - Are rootPartition range of inputSegments adjacent? Two rootPartition 
ranges are adjacent if they are consecutive.
-   * - All atomicUpdateGroups of inputSegments must be full. (See {@code 
AtomicUpdateGroup#isFull()}).
-   */
-  private void 
verifyAndFindRootPartitionRangeAndMinorVersion(List<DataSegment> inputSegments, 
boolean isInitialRequest)
+  protected static List<DataSegment> findInputSegments(
+      String dataSource,
+      TaskActionClient actionClient,
+      List<Interval> intervalsToFind, // TODO: must be checked somewhere? 
probably?
+      FirehoseFactory firehoseFactory
+  ) throws IOException
   {
-    if (inputSegments.isEmpty()) {
-      return;
-    }
+    if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
+      final List<WindowedSegmentId> inputSegments = 
((IngestSegmentFirehoseFactory) firehoseFactory).getSegments();
+      if (inputSegments == null) {
+        final Interval inputInterval = Preconditions.checkNotNull(
+            ((IngestSegmentFirehoseFactory) firehoseFactory).getInterval(),
+            "input interval"
+        );
 
-    final List<DataSegment> sortedSegments = new ArrayList<>(inputSegments);
-    sortedSegments.sort((s1, s2) -> {
-      if (s1.getStartRootPartitionId() != s2.getStartRootPartitionId()) {
-        return Integer.compare(s1.getStartRootPartitionId(), 
s2.getStartRootPartitionId());
+        return actionClient.submit(
+            new SegmentListUsedAction(dataSource, null, 
Collections.singletonList(inputInterval))
+        );
       } else {
-        return Integer.compare(s1.getEndRootPartitionId(), 
s2.getEndRootPartitionId());
+        final List<String> inputSegmentIds = inputSegments.stream()
+                                                          
.map(WindowedSegmentId::getSegmentId)
+                                                          
.collect(Collectors.toList());
+        final List<DataSegment> dataSegmentsInIntervals = actionClient.submit(
+            new SegmentListUsedAction(
+                dataSource,
+                null,
+                inputSegments.stream()
+                             .flatMap(windowedSegmentId -> 
windowedSegmentId.getIntervals().stream())
+                             .collect(Collectors.toSet())
+            )
+        );
+        return dataSegmentsInIntervals.stream()
+                                      .filter(segment -> 
inputSegmentIds.contains(segment.getId().toString()))
+                                      .collect(Collectors.toList());
       }
-    });
-    if (isInitialRequest) {
-      verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(sortedSegments);
+    } else {
+      return actionClient.submit(new SegmentListUsedAction(dataSource, null, 
intervalsToFind));
     }
-    final Interval interval = sortedSegments.get(0).getInterval();
-    final short prevMaxMinorVersion = (short) sortedSegments
-        .stream()
-        .mapToInt(DataSegment::getMinorVersion)
-        .max()
-        .orElseThrow(() -> new ISE("Empty inputSegments"));
-
-    overwritingRootGenPartitions.put(
-        interval,
-        new OverwritingRootGenerationPartitions(
-            sortedSegments.get(0).getStartRootPartitionId(),
-            sortedSegments.get(sortedSegments.size() - 
1).getEndRootPartitionId(),
-            prevMaxMinorVersion
-        )
-    );
-  }
-
-  public Set<DataSegment> getAllInputSegments()
-  {
-    return Preconditions.checkNotNull(allInputSegments, "allInputSegments is 
not initialized");
-  }
-
-  Map<Interval, OverwritingRootGenerationPartitions> 
getAllOverwritingSegmentMeta()
-  {
-    Preconditions.checkNotNull(overwritingRootGenPartitions, 
"overwritingRootGenPartitions is not initialized");
-    return Collections.unmodifiableMap(overwritingRootGenPartitions);
-  }
-
-  public boolean isChangeSegmentGranularity()
-  {
-    return Preconditions.checkNotNull(changeSegmentGranularity, 
"changeSegmentGranularity is not initialized");
-  }
-
-  public boolean hasInputSegments()
-  {
-    Preconditions.checkNotNull(overwritingRootGenPartitions, 
"overwritingRootGenPartitions is not initialized");
-    return !overwritingRootGenPartitions.isEmpty();
   }
 
-  @Nullable
-  public OverwritingRootGenerationPartitions 
getOverwritingSegmentMeta(Interval interval)
+  private static class LockGranularityDeterminResult
 
 Review comment:
   LockGranularityDeterminResult -> LockGranularityDetermineResult

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to