rohangarg commented on code in PR #13706:
URL: https://github.com/apache/druid/pull/13706#discussion_r1106801523


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -581,6 +582,55 @@ public void testReplaceSegmentsInsertIntoNewTable()
                      .verifyResults();
   }
 
+  @Test
+  public void testInsertCannotReplaceExistingSegmentFault()

Review Comment:
   this name looks carried forward from copy/paste of a deleted test



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,105 @@ public List<Interval> computeTombstoneIntervals() throws 
IOException
     return retVal;
   }
 
-  private DataSegment createTombstoneForTimeChunkInterval(String dataSource, 
String version, ShardSpec shardSpec, Interval timeChunkInterval)
+  public Set<DataSegment> computeTombstonesForReplace(

Review Comment:
   `computeTombstoneSegmentsForReplace` might be a better name



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -94,13 +106,32 @@ public <RetType> RetType submit(TaskAction<RetType> 
taskAction)
           0
       ));
     } else if (taskAction instanceof RetrieveUsedSegmentsAction) {
-      return (RetType) ImmutableSet.of();
+      String dataSource = ((RetrieveUsedSegmentsAction) 
taskAction).getDataSource();
+      if (!usedIntervals.containsKey(dataSource)) {
+        return (RetType) ImmutableSet.of();
+      } else {
+        return (RetType) usedIntervals.get(dataSource)
+                                      .stream()
+                                      .map(interval -> DataSegment.builder()
+                                                                 
.dataSource(dataSource)
+                                                                 
.interval(interval)
+                                                                 
.version(VERSION)
+                                                                 .size(1)
+                                                                 .build()
+                                     ).collect(Collectors.toSet());
+      }
     } else if (taskAction instanceof SegmentTransactionalInsertAction) {
       // Always OK.
       final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) 
taskAction).getSegments();
+      publishedSegments.addAll(segments);
       return (RetType) SegmentPublishResult.ok(segments);
     } else {
       return null;
     }
   }
+
+  public Set<DataSegment> getPublishedSegments()

Review Comment:
   can we maintain this state in `MSQTestBase`  instead? The action client can 
update the state on test base state. Also can the `MSQTestSegmentManager` be 
used to fetch the segments? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -147,22 +245,30 @@ private DataSegment 
createTombstoneForTimeChunkInterval(String dataSource, Strin
 
   }
 
-
   /**
    * Helper method to prune required tombstones. Only tombstones that cover 
used intervals will be created
    * since those that not cover used intervals will be redundant.
+   *
+   * @param inputIntervals   Intervals corresponding to the task
+   * @param dataSource       Datasource corresponding to the task
+   * @param taskActionClient Task action client to fetch the used intervals 
for the datasource
    * @return Intervals corresponding to used segments that overlap with any of 
the spec's input intervals
    * @throws IOException If used segments cannot be retrieved
    */
-  public List<Interval> getCondensedUsedIntervals() throws IOException
+  private List<Interval> getCondensedUsedIntervals(

Review Comment:
   why is this method taking `taskActionClient`? wouldn't that be present in 
the helper class itself?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,105 @@ public List<Interval> computeTombstoneIntervals() throws 
IOException
     return retVal;
   }
 
-  private DataSegment createTombstoneForTimeChunkInterval(String dataSource, 
String version, ShardSpec shardSpec, Interval timeChunkInterval)
+  public Set<DataSegment> computeTombstonesForReplace(
+      List<Interval> intervalsToDrop,
+      List<Interval> intervalsToReplace,
+      String dataSource,
+      Granularity replaceGranularity
+  ) throws IOException
+  {
+    Set<Interval> tombstoneIntervals = computeTombstoneIntervalsForReplace(
+        intervalsToReplace,
+        intervalsToDrop,
+        dataSource,
+        replaceGranularity
+    );
+
+    final List<TaskLock> locks = taskActionClient.submit(new LockListAction());
+
+    Set<DataSegment> tombstones = new HashSet<>();
+    for (Interval tombstoneInterval : tombstoneIntervals) {
+      String version = null;
+      for (final TaskLock lock : locks) {
+        if (lock.getInterval().contains(tombstoneInterval)) {
+          version = lock.getVersion();
+        }
+      }
+
+      if (version == null) {
+        // Unable to fetch the version number of the segment
+        throw new ISE("Unable to fetch the version of the segments in use. The 
lock for the task might "
+                      + "have been revoked");
+      }
+
+      DataSegment tombstone = createTombstoneForTimeChunkInterval(
+          dataSource,
+          version,
+          new TombstoneShardSpec(),
+          tombstoneInterval
+      );
+      tombstones.add(tombstone);
+    }
+    return tombstones;
+  }
+
+  /**
+   * @param intervalsToDrop Empty intervals in the query that need to be 
dropped
+   * @param intervalsToReplace Intervals in the query which are eligible for 
replacement with new data
+   * @param dataSource Datasource on which the replace is to be performed
+   * @param replaceGranularity Granularity of the replace query
+   * @return Intervals computed for the tombstones
+   * @throws IOException
+   */
+  public Set<Interval> computeTombstoneIntervalsForReplace(
+      List<Interval> intervalsToReplace,
+      List<Interval> intervalsToDrop,
+      String dataSource,
+      Granularity replaceGranularity
+  ) throws IOException
   {
+    Set<Interval> retVal = new HashSet<>();
+    List<Interval> usedIntervals = 
getCondensedUsedIntervals(intervalsToReplace, dataSource, taskActionClient);
 
+    for (Interval intervalToDrop : intervalsToDrop) {
+      for (Interval usedInterval : usedIntervals) {
 
+        // Overlap will always be finite (not starting from -Inf or ending at 
+Inf) and lesser than or
+        // equal to the size of the usedInterval
+        Interval overlap = intervalToDrop.overlap(usedInterval);
+
+        // No overlap of the dropped segment with the used interval due to 
which we donot need to generate any tombstone
+        if (overlap == null) {
+          continue;
+        }
+
+        // Overlap might not be aligned with the granularity if the used 
interval is not aligned with the granularity

Review Comment:
   should we add the whole `intervalToDrop` in the set of intervals? I didn't 
quite understand the part with overlap interval + iterator over it - an example 
would be great if possible.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java:
##########
@@ -115,9 +111,136 @@ public void 
tombstonesCreatedWhenNoDataInInputIntervalAndExistingSegments() thro
           new SegmentIdWithShardSpec("test", ti, "newVersion", new 
TombstoneShardSpec())
       );
     }
-    Set<DataSegment> tombstones = 
tombstoneHelper.computeTombstones(intervalToVersion);
+    Set<DataSegment> tombstones = 
tombstoneHelper.computeTombstones(dataSchema, intervalToVersion);
     Assert.assertEquals(3, tombstones.size());
     tombstones.forEach(ts -> Assert.assertTrue(ts.isTombstone()));
   }
 
+  @Test
+  public void 
tombstonesCreatedForReplaceWhenReplaceIsContainedInUsedIntervals() throws 
Exception
+  {
+    Interval usedInterval = Intervals.of("2020-02-01/2020-04-01");
+    Interval replaceInterval = Intervals.of("2020-03-01/2020-03-31");
+    Interval intervalToDrop = Intervals.of("2020-03-05/2020-03-07");
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = 
tombstoneHelper.computeTombstoneIntervalsForReplace(
+        ImmutableList.of(replaceInterval),
+        ImmutableList.of(intervalToDrop),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), 
Intervals.of("2020-03-06/2020-03-07")),
+        tombstoneIntervals
+    );
+  }
+
+  @Test
+  public void tombstonesCreatedForReplaceWhenThereIsAGapInUsedIntervals() 
throws Exception
+  {
+    List<Interval> usedIntervals = ImmutableList.of(
+        Intervals.of("2020-02-01/2020-04-01"),
+        Intervals.of("2020-07-01/2020-11-01")
+    );
+    Interval replaceInterval = Intervals.of("2020-01-01/2020-12-01");
+    Interval intervalToDrop = Intervals.of("2020-03-01/2020-09-01");
+    Granularity replaceGranularity = Granularities.MONTH;
+
+    List<DataSegment> existingUsedSegments = usedIntervals.stream().map(
+        usedInterval -> DataSegment.builder()
+                                   .dataSource("test")
+                                   .interval(usedInterval)
+                                   .version("oldVersion")
+                                   .size(100)
+                                   .build()
+    ).collect(Collectors.toList());
+    
Mockito.when(taskActionClient.submit(any(TaskAction.class))).thenReturn(existingUsedSegments);
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = 
tombstoneHelper.computeTombstoneIntervalsForReplace(
+        ImmutableList.of(replaceInterval),
+        ImmutableList.of(intervalToDrop),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(
+            Intervals.of("2020-03-01/2020-04-01"),
+            Intervals.of("2020-07-01/2020-08-01"),
+            Intervals.of("2020-08-01/2020-09-01")
+        ),
+        tombstoneIntervals
+    );
+  }
+
+  @Test
+  public void tombstonesCreatedForReplaceWhenUsedIntervalsDonotAlign() throws 
Exception

Review Comment:
   I think these tests should either be about `tombstoneIntervals` or if they 
are about `tombstoneSegments`, then they should check the segments as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to