This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9343cbc  Fix CompactionTask to consider only latest segments (#6429)
9343cbc is described below

commit 9343cbc63aa006a1dadaab2e530e3321ee9e29f9
Author: Jihoon Son <[email protected]>
AuthorDate: Mon Oct 8 21:53:16 2018 -0700

    Fix CompactionTask to consider only latest segments (#6429)
    
    * CompactionTask should consider only latest segments
    
    * fix test
---
 .../druid/indexing/common/task/CompactionTask.java | 30 +++++++++++++++-------
 .../indexing/common/task/CompactionTaskTest.java   |  4 ++-
 2 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 4b0da2a..3c69923 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -71,6 +71,7 @@ import 
org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -92,6 +93,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
 
 public class CompactionTask extends AbstractTask
 {
@@ -634,19 +636,29 @@ public class CompactionTask extends AbstractTask
 
     List<DataSegment> checkAndGetSegments(TaskToolbox toolbox) throws 
IOException
     {
-      final List<DataSegment> usedSegments = toolbox.getTaskActionClient()
-                                                    .submit(new 
SegmentListUsedAction(dataSource, interval, null));
+      final List<DataSegment> usedSegments = 
toolbox.getTaskActionClient().submit(
+          new SegmentListUsedAction(dataSource, interval, null)
+      );
+      final TimelineLookup<String, DataSegment> timeline = 
VersionedIntervalTimeline.forSegments(usedSegments);
+      final List<DataSegment> latestSegments = timeline
+          .lookup(interval)
+          .stream()
+          .map(TimelineObjectHolder::getObject)
+          .flatMap(partitionHolder -> 
StreamSupport.stream(partitionHolder.spliterator(), false))
+          .map(PartitionChunk::getObject)
+          .collect(Collectors.toList());
+
       if (segments != null) {
-        Collections.sort(usedSegments);
+        Collections.sort(latestSegments);
         Collections.sort(segments);
 
-        if (!usedSegments.equals(segments)) {
+        if (!latestSegments.equals(segments)) {
           final List<DataSegment> unknownSegments = segments.stream()
-                                                            .filter(segment -> 
!usedSegments.contains(segment))
+                                                            .filter(segment -> 
!latestSegments.contains(segment))
                                                             
.collect(Collectors.toList());
-          final List<DataSegment> missingSegments = usedSegments.stream()
-                                                                
.filter(segment -> !segments.contains(segment))
-                                                                
.collect(Collectors.toList());
+          final List<DataSegment> missingSegments = latestSegments.stream()
+                                                                  
.filter(segment -> !segments.contains(segment))
+                                                                  
.collect(Collectors.toList());
           throw new ISE(
               "Specified segments in the spec are different from the current 
used segments. "
               + "There are unknown segments[%s] and missing segments[%s] in 
the spec.",
@@ -655,7 +667,7 @@ public class CompactionTask extends AbstractTask
           );
         }
       }
-      return usedSegments;
+      return latestSegments;
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 837b33a..eb80011 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -698,7 +698,9 @@ public class CompactionTaskTest
     expectedException.expectMessage(CoreMatchers.containsString("are different 
from the current used segments"));
 
     final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
-    segments.remove(0);
+    Collections.sort(segments);
+    // Remove one segment in the middle
+    segments.remove(segments.size() / 2);
     CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(segments),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to