fjy closed pull request #6429: Fix CompactionTask to consider only latest
segments
URL: https://github.com/apache/incubator-druid/pull/6429
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index e577161904b..710ceaaf2eb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -71,6 +71,7 @@
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -92,6 +93,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
public class CompactionTask extends AbstractTask
{
@@ -629,19 +631,29 @@ private static DimensionSchema createDimensionSchema(
List<DataSegment> checkAndGetSegments(TaskToolbox toolbox) throws
IOException
{
- final List<DataSegment> usedSegments = toolbox.getTaskActionClient()
- .submit(new
SegmentListUsedAction(dataSource, interval, null));
+ final List<DataSegment> usedSegments =
toolbox.getTaskActionClient().submit(
+ new SegmentListUsedAction(dataSource, interval, null)
+ );
+ final TimelineLookup<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(usedSegments);
+ final List<DataSegment> latestSegments = timeline
+ .lookup(interval)
+ .stream()
+ .map(TimelineObjectHolder::getObject)
+ .flatMap(partitionHolder ->
StreamSupport.stream(partitionHolder.spliterator(), false))
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
+
if (segments != null) {
- Collections.sort(usedSegments);
+ Collections.sort(latestSegments);
Collections.sort(segments);
- if (!usedSegments.equals(segments)) {
+ if (!latestSegments.equals(segments)) {
final List<DataSegment> unknownSegments = segments.stream()
- .filter(segment ->
!usedSegments.contains(segment))
+ .filter(segment ->
!latestSegments.contains(segment))
.collect(Collectors.toList());
- final List<DataSegment> missingSegments = usedSegments.stream()
-
.filter(segment -> !segments.contains(segment))
-
.collect(Collectors.toList());
+ final List<DataSegment> missingSegments = latestSegments.stream()
+
.filter(segment -> !segments.contains(segment))
+
.collect(Collectors.toList());
throw new ISE(
"Specified segments in the spec are different from the current
used segments. "
+ "There are unknown segments[%s] and missing segments[%s] in
the spec.",
@@ -650,7 +662,7 @@ private static DimensionSchema createDimensionSchema(
);
}
}
- return usedSegments;
+ return latestSegments;
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 837b33a50a0..eb80011bc1c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -698,7 +698,9 @@ public void
testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
expectedException.expectMessage(CoreMatchers.containsString("are different
from the current used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
- segments.remove(0);
+ Collections.sort(segments);
+ // Remove one segment in the middle
+ segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]