This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 47c32a9d92 Skip ALL granularity compaction  (#13304)
47c32a9d92 is described below

commit 47c32a9d929ac25df14647a971e3dd396a06a277
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Nov 7 17:55:03 2022 +0530

    Skip ALL granularity compaction  (#13304)
    
    * Skip autocompaction for datasources with ETERNITY segments
---
 docs/data-management/automatic-compaction.md       |  2 +
 .../duty/NewestSegmentFirstIterator.java           |  9 ++
 .../duty/NewestSegmentFirstPolicyTest.java         | 97 ++++++++++++++++++++++
 3 files changed, 108 insertions(+)

diff --git a/docs/data-management/automatic-compaction.md 
b/docs/data-management/automatic-compaction.md
index 9d5ae99c36..5f5a76eebe 100644
--- a/docs/data-management/automatic-compaction.md
+++ b/docs/data-management/automatic-compaction.md
@@ -174,6 +174,8 @@ The following auto-compaction configuration compacts 
existing `HOUR` segments in
 }
 ```
 
+> Auto-compaction skips datasources containing ALL granularity segments when 
the target granularity is different.
+
 ### Update partitioning scheme
 
 For your `wikipedia` datasource, you want to optimize segment access when 
regularly ingesting data without compromising compute time when querying the 
data. Your ingestion spec for batch append uses [dynamic 
partitioning](../ingestion/native-batch.md#dynamic-partitioning) to optimize 
for write-time operations, while your stream ingestion partitioning is 
configured by the stream service. You want to implement auto-compaction to 
reorganize the data with a suitable read-time partitioning us [...]
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index ca694d24c8..d746d91c34 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -129,6 +129,11 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
             // For example, if the original is interval of 
2020-01-28/2020-02-03 with WEEK granularity
             // and the configuredSegmentGranularity is MONTH, the segment will 
be split to two segments
             // of 2020-01/2020-02 and 2020-02/2020-03.
+            if (Intervals.ETERNITY.equals(segment.getInterval())) {
+              // This is to prevent the coordinator from crashing as raised in 
https://github.com/apache/druid/issues/13208
+              log.warn("Cannot compact datasource[%s] with ALL granularity", 
dataSource);
+              return;
+            }
             for (Interval interval : 
configuredSegmentGranularity.getIterable(segment.getInterval())) {
               intervalToPartitionMap.computeIfAbsent(interval, k -> new 
HashSet<>()).add(segment);
             }
@@ -627,6 +632,10 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     final List<Interval> searchIntervals = new ArrayList<>();
 
     for (Interval lookupInterval : filteredInterval) {
+      if (Intervals.ETERNITY.equals(lookupInterval)) {
+        log.warn("Cannot compact datasource[%s] since interval is ETERNITY.", 
dataSourceName);
+        return Collections.emptyList();
+      }
       final List<DataSegment> segments = timeline
           .findNonOvershadowedObjectsInInterval(lookupInterval, 
Partitions.ONLY_COMPLETE)
           .stream()
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 078a538f25..73abcb8d5e 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -69,6 +69,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -1584,6 +1585,102 @@ public class NewestSegmentFirstPolicyTest
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testSkipAllGranularityToDefault()
+  {
+    CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE,
+                        createCompactionConfig(10000,
+                                               new Period("P0D"),
+                                               null
+                        )
+        ),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            SegmentTimeline.forSegments(ImmutableSet.of(
+                                            new DataSegment(
+                                                DATA_SOURCE,
+                                                Intervals.ETERNITY,
+                                                "0",
+                                                new HashMap<>(),
+                                                new ArrayList<>(),
+                                                new ArrayList<>(),
+                                                new NumberedShardSpec(0, 0),
+                                                0,
+                                                100)
+                                        )
+            )
+        ),
+        Collections.emptyMap()
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testSkipAllToAllGranularity()
+  {
+    CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE,
+                        createCompactionConfig(10000,
+                                               new Period("P0D"),
+                                               new 
UserCompactionTaskGranularityConfig(Granularities.ALL, null, null)
+                        )
+        ),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            SegmentTimeline.forSegments(ImmutableSet.of(
+                                            new DataSegment(
+                                                DATA_SOURCE,
+                                                Intervals.ETERNITY,
+                                                "0",
+                                                new HashMap<>(),
+                                                new ArrayList<>(),
+                                                new ArrayList<>(),
+                                                new NumberedShardSpec(0, 0),
+                                                0,
+                                                100)
+                                        )
+            )
+        ),
+        Collections.emptyMap()
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testSkipAllToFinerGranularity()
+  {
+    CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE,
+                        createCompactionConfig(10000,
+                                               new Period("P0D"),
+                                               new 
UserCompactionTaskGranularityConfig(Granularities.DAY, null, null)
+                        )
+        ),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            SegmentTimeline.forSegments(ImmutableSet.of(
+                                            new DataSegment(
+                                                DATA_SOURCE,
+                                                Intervals.ETERNITY,
+                                                "0",
+                                                new HashMap<>(),
+                                                new ArrayList<>(),
+                                                new ArrayList<>(),
+                                                new NumberedShardSpec(0, 0),
+                                                0,
+                                                100)
+                                        )
+            )
+        ),
+        Collections.emptyMap()
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to