This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9343cbc Fix CompactionTask to consider only latest segments (#6429)
9343cbc is described below
commit 9343cbc63aa006a1dadaab2e530e3321ee9e29f9
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Oct 8 21:53:16 2018 -0700
Fix CompactionTask to consider only latest segments (#6429)
* CompactionTask should consider only latest segments
* fix test
---
.../druid/indexing/common/task/CompactionTask.java | 30 +++++++++++++++-------
.../indexing/common/task/CompactionTaskTest.java | 4 ++-
2 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 4b0da2a..3c69923 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -71,6 +71,7 @@ import
org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -92,6 +93,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
public class CompactionTask extends AbstractTask
{
@@ -634,19 +636,29 @@ public class CompactionTask extends AbstractTask
List<DataSegment> checkAndGetSegments(TaskToolbox toolbox) throws
IOException
{
- final List<DataSegment> usedSegments = toolbox.getTaskActionClient()
- .submit(new
SegmentListUsedAction(dataSource, interval, null));
+ final List<DataSegment> usedSegments =
toolbox.getTaskActionClient().submit(
+ new SegmentListUsedAction(dataSource, interval, null)
+ );
+ final TimelineLookup<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(usedSegments);
+ final List<DataSegment> latestSegments = timeline
+ .lookup(interval)
+ .stream()
+ .map(TimelineObjectHolder::getObject)
+ .flatMap(partitionHolder ->
StreamSupport.stream(partitionHolder.spliterator(), false))
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
+
if (segments != null) {
- Collections.sort(usedSegments);
+ Collections.sort(latestSegments);
Collections.sort(segments);
- if (!usedSegments.equals(segments)) {
+ if (!latestSegments.equals(segments)) {
final List<DataSegment> unknownSegments = segments.stream()
- .filter(segment ->
!usedSegments.contains(segment))
+ .filter(segment ->
!latestSegments.contains(segment))
.collect(Collectors.toList());
- final List<DataSegment> missingSegments = usedSegments.stream()
-
.filter(segment -> !segments.contains(segment))
-
.collect(Collectors.toList());
+ final List<DataSegment> missingSegments = latestSegments.stream()
+
.filter(segment -> !segments.contains(segment))
+
.collect(Collectors.toList());
throw new ISE(
"Specified segments in the spec are different from the current
used segments. "
+ "There are unknown segments[%s] and missing segments[%s] in
the spec.",
@@ -655,7 +667,7 @@ public class CompactionTask extends AbstractTask
);
}
}
- return usedSegments;
+ return latestSegments;
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 837b33a..eb80011 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -698,7 +698,9 @@ public class CompactionTaskTest
expectedException.expectMessage(CoreMatchers.containsString("are different
from the current used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
- segments.remove(0);
+ Collections.sort(segments);
+ // Remove one segment in the middle
+ segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]