LakshSingla commented on code in PR #13706:
URL: https://github.com/apache/druid/pull/13706#discussion_r1116545976


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,105 @@ public List<Interval> computeTombstoneIntervals() throws 
IOException
     return retVal;
   }
 
-  private DataSegment createTombstoneForTimeChunkInterval(String dataSource, 
String version, ShardSpec shardSpec, Interval timeChunkInterval)
+  public Set<DataSegment> computeTombstonesForReplace(
+      List<Interval> intervalsToDrop,
+      List<Interval> intervalsToReplace,
+      String dataSource,
+      Granularity replaceGranularity
+  ) throws IOException
+  {
+    Set<Interval> tombstoneIntervals = computeTombstoneIntervalsForReplace(
+        intervalsToReplace,
+        intervalsToDrop,
+        dataSource,
+        replaceGranularity
+    );
+
+    final List<TaskLock> locks = taskActionClient.submit(new LockListAction());
+
+    Set<DataSegment> tombstones = new HashSet<>();
+    for (Interval tombstoneInterval : tombstoneIntervals) {
+      String version = null;
+      for (final TaskLock lock : locks) {
+        if (lock.getInterval().contains(tombstoneInterval)) {
+          version = lock.getVersion();
+        }
+      }
+
+      if (version == null) {
+        // Unable to fetch the version number of the segment
+        throw new ISE("Unable to fetch the version of the segments in use. The 
lock for the task might "
+                      + "have been revoked");
+      }
+
+      DataSegment tombstone = createTombstoneForTimeChunkInterval(
+          dataSource,
+          version,
+          new TombstoneShardSpec(),
+          tombstoneInterval
+      );
+      tombstones.add(tombstone);
+    }
+    return tombstones;
+  }
+
+  /**
+   * @param intervalsToDrop Empty intervals in the query that need to be 
dropped
+   * @param intervalsToReplace Intervals in the query which are eligible for 
replacement with new data
+   * @param dataSource Datasource on which the replace is to be performed
+   * @param replaceGranularity Granularity of the replace query
+   * @return Intervals computed for the tombstones
+   * @throws IOException
+   */
+  public Set<Interval> computeTombstoneIntervalsForReplace(
+      List<Interval> intervalsToReplace,
+      List<Interval> intervalsToDrop,
+      String dataSource,
+      Granularity replaceGranularity
+  ) throws IOException
   {
+    Set<Interval> retVal = new HashSet<>();
+    List<Interval> usedIntervals = 
getCondensedUsedIntervals(intervalsToReplace, dataSource, taskActionClient);
 
+    for (Interval intervalToDrop : intervalsToDrop) {
+      for (Interval usedInterval : usedIntervals) {
 
+        // Overlap will always be finite (not starting from -Inf or ending at 
+Inf) and lesser than or
+        // equal to the size of the usedInterval
+        Interval overlap = intervalToDrop.overlap(usedInterval);
+
+        // No overlap of the dropped segment with the used interval due to 
which we donot need to generate any tombstone
+        if (overlap == null) {
+          continue;
+        }
+
+        // Overlap might not be aligned with the granularity if the used 
interval is not aligned with the granularity

Review Comment:
   Updated with the comment in the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to