suneet-s commented on code in PR #13852:
URL: https://github.com/apache/druid/pull/13852#discussion_r1119598945


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java:
##########
@@ -137,4 +137,7 @@ public boolean getCompactionSkipLockedIntervals()
     return true;
   }
 
+  @Config("druid.coordinator.compaction.searchPolicyRefreshPeriod")
+  @Default("PT5M")

Review Comment:
   I had considered placing the config in the `CoordinatorCompactionConfig` 
instead, but decided against it because that config is dynamic, but changing 
that value would not necessarily update the expiry time of the iterator, so 
this seemed like a better UX
   
   `druid.coordinator.compaction. searchPolicyRefreshPeriod` could use a better 
name. It's intent is to provide operators with the ability to choose when to 
re-build the view of the segment interval timeline to be considered for 
compaction. Today, every time the compact segments duty is run, the view is 
re-built. So the property does not need to be tied to the CompactSegments duty



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java:
##########
@@ -20,34 +20,78 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
+import java.time.Clock;
 import java.util.List;
 import java.util.Map;
 
 /**
  * This policy searches segments for compaction from the newest one to oldest 
one.
+ * The {@link #resetIfNeeded} functionality is inspired by {@link 
com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
  */
 public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
 {
   private final ObjectMapper objectMapper;
+  private final long durationMillis;
+  private transient volatile NewestSegmentFirstIterator iterator;
+  // The special value 0 means "not yet initialized".
+  private transient volatile long expirationMillis;
+  private final Clock clock;
 
   @Inject
-  public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
+  public NewestSegmentFirstPolicy(ObjectMapper objectMapper, 
DruidCoordinatorConfig config, Clock clock)
   {
     this.objectMapper = objectMapper;
+    this.durationMillis = 
config.getCompactionSearchPolicyRefreshPeriod().getMillis();
+    this.clock = clock;
+    Preconditions.checkArgument(durationMillis > 0);
   }
 
   @Override
-  public CompactionSegmentIterator reset(
+  public Pair<CompactionSegmentIterator, Boolean> resetIfNeeded(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
       Map<String, SegmentTimeline> dataSources,
       Map<String, List<Interval>> skipIntervals
   )
   {
-    return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, 
dataSources, skipIntervals);
+    long millis = expirationMillis;
+    long now = clock.millis();
+    if (millis == 0 || now - millis >= 0) {
+      synchronized (this) {

Review Comment:
   `reset` does have synchronized in the function definition.
   
   I will add more comments explaining the thread safety aspects in the next 
patch. The need primarily arises from the `force compaction` API that is 
currently only marked safe for testing. With this change, it should be safe to 
call it in all scenarios.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java:
##########
@@ -20,34 +20,78 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
+import java.time.Clock;
 import java.util.List;
 import java.util.Map;
 
 /**
  * This policy searches segments for compaction from the newest one to oldest 
one.
+ * The {@link #resetIfNeeded} functionality is inspired by {@link 
com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
  */
 public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
 {
   private final ObjectMapper objectMapper;
+  private final long durationMillis;
+  private transient volatile NewestSegmentFirstIterator iterator;

Review Comment:
   I think transient just means don't serialize this object because it is 
computed. This search policy shouldn't be serialized, but this pattern is used 
in Suppliers.memoizingSupplierWithExpiration, so I figured I'd stick with it 
since it is battle tested. I meant to add a comment providing credit for the 
inspiration of this pattern - I'll update the docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to