cryptoe commented on code in PR #13706:
URL: https://github.com/apache/druid/pull/13706#discussion_r1097290912
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java:
##########
@@ -114,30 +111,6 @@ public void testInsertCannotOrderByDescendingFault()
.verifyResults();
}
- @Test
- public void testInsertCannotReplaceExistingSegmentFault()
- {
Review Comment:
We should have a test case which tests tombstone segments. This would give
us more confidence in the PR.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java:
##########
@@ -122,10 +125,105 @@ public List<Interval> computeTombstoneIntervals() throws
IOException
return retVal;
}
- private DataSegment createTombstoneForTimeChunkInterval(String dataSource,
String version, ShardSpec shardSpec, Interval timeChunkInterval)
+ public Set<DataSegment> computeTombstonesForReplace(
+ List<Interval> intervalsToDrop,
+ List<Interval> intervalsToReplace,
+ String dataSource,
+ Granularity replaceGranularity
+ ) throws IOException
+ {
+ Set<Interval> tombstoneIntervals = computeTombstoneIntervalsForReplace(
+ intervalsToReplace,
+ intervalsToDrop,
+ dataSource,
+ replaceGranularity
+ );
+
+ final List<TaskLock> locks = taskActionClient.submit(new LockListAction());
+
+ Set<DataSegment> tombstones = new HashSet<>();
+ for (Interval tombstoneInterval : tombstoneIntervals) {
+ String version = null;
+ for (final TaskLock lock : locks) {
+ if (lock.getInterval().contains(tombstoneInterval)) {
Review Comment:
Shouldn't we do a data source filter here ?
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -413,12 +413,7 @@ public void testReplaceWhereClauseLargerThanData()
new Object[]{946771200000L, 2.0f}
)
)
- .setExpectedSegment(ImmutableSet.of(SegmentId.of(
- "foo",
- Intervals.of("2000-01-01T/P1M"),
- "test",
- 0
- )))
+
.setExpectedTombstoneIntervals(ImmutableSet.of(Intervals.of("2001-01-01/2001-02-01")))
Review Comment:
How is this tombstone since we are generating data for this interval?
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:
##########
@@ -1127,6 +1136,35 @@ public void verifyResults()
Assert.assertTrue(segmentIdVsOutputRowsMap.get(diskSegment).contains(Arrays.asList(row)));
}
}
+ if (!testTaskActionClient.getPublishedSegments().isEmpty()) {
+ Set<SegmentId> expectedPublishedSegmentIds =
segmentManager.getAllDataSegments()
Review Comment:
Should't we substract segments which are present in
segmentIdVsOutputRowMap.keys() when we are asserting against tombstone segments
?
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -40,18 +41,29 @@
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class MSQTestTaskActionClient implements TaskActionClient
{
- private static final String VERSION = "test";
+ public static final String VERSION = "test";
private final ObjectMapper mapper;
private final ConcurrentHashMap<SegmentId, AtomicInteger>
segmentIdPartitionIdMap = new ConcurrentHashMap<>();
+ private final Map<String, List<Interval>> usedIntervals = ImmutableMap.of(
+ "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"),
Intervals.of("2000-01-01/2000-01-04")),
+ "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
+ );
+ private final Set<DataSegment> publishedSegments = new HashSet<>();
- public MSQTestTaskActionClient(ObjectMapper mapper)
+ public MSQTestTaskActionClient(
Review Comment:
Looks like we have state now in this client. Might want to mention that
somewhere. Does it work with the calciteTests for MSQ ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1247,48 +1244,33 @@ private void postResultPartitionBoundariesForStage(
/**
* Publish the list of segments. Additionally, if {@link
DataSourceMSQDestination#isReplaceTimeChunks()},
* also drop all other segments within the replacement intervals.
- * <p>
- * If any existing segments cannot be dropped because their intervals are
not wholly contained within the
- * replacement parameter, throws a {@link MSQException} with {@link
InsertCannotReplaceExistingSegmentFault}.
*/
private void publishAllSegments(final Set<DataSegment> segments) throws
IOException
{
final DataSourceMSQDestination destination =
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
- final Set<DataSegment> segmentsToDrop;
+ Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
Review Comment:
Nit: We can still make it final
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]