clintropolis commented on code in PR #13852:
URL: https://github.com/apache/druid/pull/13852#discussion_r1119616973
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java:
##########
@@ -20,34 +20,78 @@
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
+import java.time.Clock;
import java.util.List;
import java.util.Map;
/**
* This policy searches segments for compaction from the newest one to oldest
one.
+ * The {@link #resetIfNeeded} functionality is inspired by {@link
com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
*/
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{
private final ObjectMapper objectMapper;
+ private final long durationMillis;
+ private transient volatile NewestSegmentFirstIterator iterator;
+ // The special value 0 means "not yet initialized".
+ private transient volatile long expirationMillis;
+ private final Clock clock;
@Inject
- public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
+ public NewestSegmentFirstPolicy(ObjectMapper objectMapper,
DruidCoordinatorConfig config, Clock clock)
{
this.objectMapper = objectMapper;
+ this.durationMillis =
config.getCompactionSearchPolicyRefreshPeriod().getMillis();
+ this.clock = clock;
+ Preconditions.checkArgument(durationMillis > 0);
}
@Override
- public CompactionSegmentIterator reset(
+ public Pair<CompactionSegmentIterator, Boolean> resetIfNeeded(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
)
{
- return new NewestSegmentFirstIterator(objectMapper, compactionConfigs,
dataSources, skipIntervals);
+ long millis = expirationMillis;
+ long now = clock.millis();
+ if (millis == 0 || now - millis >= 0) {
+ synchronized (this) {
Review Comment:
oops, i can't read, sorry 🙃
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]