rohangarg commented on code in PR #13706:
URL: https://github.com/apache/druid/pull/13706#discussion_r1097095168
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -147,22 +248,30 @@ private DataSegment
createTombstoneForTimeChunkInterval(String dataSource, Strin
}
-
/**
* Helper method to prune required tombstones. Only tombstones that cover
used intervals will be created
* since those that not cover used intervals will be redundant.
+ *
+ * @param inputIntervals Intervals corresponding to the task
+ * @param dataSource Datasource corresponding to the task
+ * @param taskActionClient Task action client to fetch the used intervals
for the datasource
* @return Intervals corresponding to used segments that overlap with any of
the spec's input intervals
* @throws IOException If used segments cannot be retrieved
*/
- public List<Interval> getCondensedUsedIntervals() throws IOException
+ public static List<Interval> getCondensedUsedIntervals(
Review Comment:
why is this method static?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,108 @@ public List<Interval> computeTombstoneIntervals() throws
IOException
return retVal;
}
- private DataSegment createTombstoneForTimeChunkInterval(String dataSource,
String version, ShardSpec shardSpec, Interval timeChunkInterval)
+ public Set<DataSegment> computeTombstonesForReplace(
+ List<Interval> intervalsToDrop,
+ List<Interval> intervalsToReplace,
+ String dataSource,
+ Granularity replaceGranularity
+ ) throws IOException
{
+ Set<Interval> tombstoneIntervals = computeTombstoneIntervalsForReplace(
+ intervalsToReplace,
+ intervalsToDrop,
+ dataSource,
+ replaceGranularity
+ );
+ Set<DataSegment> tombstones = new HashSet<>();
+ for (Interval tombstoneInterval : tombstoneIntervals) {
+ final List<TaskLock> locks = taskActionClient.submit(new
LockListAction());
Review Comment:
why do we need to make a call per interval?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -2586,6 +2568,8 @@ static ClusterStatisticsMergeMode
finalizeClusterStatisticsMergeMode(
return mergeMode;
}
+
Review Comment:
extra diff
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -1127,6 +1136,36 @@ public void verifyResults()
Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row)));
}
}
+ if (!testTaskActionClient.getPublishedSegments().isEmpty()) {
+ Set<SegmentId> expectedPublishedSegmentIds =
segmentManager.getAllDataSegments()
+ .stream()
+
.map(DataSegment::getId)
+
.collect(Collectors.toSet());
+ Map<String, Object> tombstoneLoadSpec = new HashMap<>();
+ tombstoneLoadSpec.put("type", DataSegment.TOMBSTONE_LOADSPEC_TYPE);
+ tombstoneLoadSpec.put("path", null); // tombstones do not have any
backing file
+
+ if (expectedTombstoneIntervals != null) {
+
expectedPublishedSegmentIds.addAll(expectedTombstoneIntervals.stream()
Review Comment:
formatting could be better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]