clintropolis commented on code in PR #18939:
URL: https://github.com/apache/druid/pull/18939#discussion_r2826041028


##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java:
##########
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.server.compaction.IntervalGranularityInfo;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.compaction.ReindexingSegmentGranularityRule;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Template to perform period-based cascading reindexing. {@link 
ReindexingRule} are provided by a {@link ReindexingRuleProvider}
+ * Each rule specifies a period relative to the current time which is used to 
determine its applicable interval.
+ * A timeline is constructed from a condensed set of these periods and tasks 
are created for each search interval in
+ * the timeline with the applicable rules for said interval.
+ * <p>
+ * For example if you had the following rules:
+ * <ul>
+ *   <li>Rule A: period = 1 day</li>
+ *   <li>Rule B: period = 7 days</li>
+ *   <li>Rule C: period = 30 days</li>
+ *   <li>Rule D: period = 7 days</li>
+ * </ul>
+ *
+ * You would end up with the following search intervals (assuming current time 
is T):
+ * <ul>
+ *   <li>Interval 1: [T-7days, T-1day)</li>
+ *   <li>Interval 2: [T-30days, T-7days)</li>
+ *   <li>Interval 3: [-inf, T-30days)</li>
+ * </ul>
+ * <p>
+ * This template never needs to be deserialized as a {@code 
BatchIndexingJobTemplate}
+ */
+public class CascadingReindexingTemplate implements CompactionJobTemplate, 
DataSourceCompactionConfig
+{
+  private static final Logger LOG = new 
Logger(CascadingReindexingTemplate.class);
+  private static final ReindexingConfigOptimizer DELETION_RULE_OPTIMIZER = new 
ReindexingDeletionRuleOptimizer();
+
+
+  public static final String TYPE = "reindexCascade";
+
+  private final String dataSource;
+  private final ReindexingRuleProvider ruleProvider;
+  @Nullable
+  private final Map<String, Object> taskContext;
+  @Nullable
+  private final CompactionEngine engine;
+  private final int taskPriority;
+  private final long inputSegmentSizeBytes;
+  private final Period skipOffsetFromLatest;
+  private final Period skipOffsetFromNow;
+  private final Granularity defaultSegmentGranularity;
+
+  @JsonCreator
+  public CascadingReindexingTemplate(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("taskPriority") @Nullable Integer taskPriority,
+      @JsonProperty("inputSegmentSizeBytes") @Nullable Long 
inputSegmentSizeBytes,
+      @JsonProperty("ruleProvider") ReindexingRuleProvider ruleProvider,
+      @JsonProperty("engine") @Nullable CompactionEngine engine,
+      @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext,
+      @JsonProperty("skipOffsetFromLatest") @Nullable Period 
skipOffsetFromLatest,
+      @JsonProperty("skipOffsetFromNow") @Nullable Period skipOffsetFromNow,
+      @JsonProperty("defaultSegmentGranularity") Granularity 
defaultSegmentGranularity
+  )
+  {
+    this.dataSource = Objects.requireNonNull(dataSource, "'dataSource' cannot 
be null");
+    this.ruleProvider = Objects.requireNonNull(ruleProvider, "'ruleProvider' 
cannot be null");
+    this.engine = engine;
+    this.taskContext = taskContext;
+    this.taskPriority = Objects.requireNonNullElse(taskPriority, 
DEFAULT_COMPACTION_TASK_PRIORITY);
+    this.inputSegmentSizeBytes = 
Objects.requireNonNullElse(inputSegmentSizeBytes, 
DEFAULT_INPUT_SEGMENT_SIZE_BYTES);
+    this.defaultSegmentGranularity = Objects.requireNonNull(
+        defaultSegmentGranularity,
+        "'defaultSegmentGranularity' cannot be null"
+    );
+
+    if (skipOffsetFromNow != null && skipOffsetFromLatest != null) {
+      throw new IAE("Cannot set both skipOffsetFromNow and 
skipOffsetFromLatest");
+    }
+    this.skipOffsetFromNow = skipOffsetFromNow;
+    this.skipOffsetFromLatest = skipOffsetFromLatest;
+  }
+
+  @Override
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  @Nullable
+  @Override
+  public Map<String, Object> getTaskContext()
+  {
+    return taskContext;
+  }
+
+  @JsonProperty
+  @Nullable
+  @Override
+  public CompactionEngine getEngine()
+  {
+    return engine;
+  }
+
+  @JsonProperty
+  @Override
+  public int getTaskPriority()
+  {
+    return taskPriority;
+  }
+
+  @JsonProperty
+  @Override
+  public long getInputSegmentSizeBytes()
+  {
+    return inputSegmentSizeBytes;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @JsonProperty
+  private ReindexingRuleProvider getRuleProvider()
+  {
+    return ruleProvider;
+  }
+
+  @Override
+  @JsonProperty
+  @Nullable
+  public Period getSkipOffsetFromLatest()
+  {
+    return skipOffsetFromLatest;
+  }
+
+  @JsonProperty
+  @Nullable
+  private Period getSkipOffsetFromNow()
+  {
+    return skipOffsetFromNow;
+  }
+
+  @JsonProperty
+  public Granularity getDefaultSegmentGranularity()
+  {
+    return defaultSegmentGranularity;
+  }
+
+  /**
+   * Checks if the given interval's end time is after the specified boundary.
+   * Used to determine if intervals should be skipped based on skip offset 
configuration.
+   *
+   * @param interval the interval to check
+   * @param boundary the boundary time to compare against
+   * @return true if the interval ends after the boundary
+   */
+  private static boolean intervalEndsAfter(Interval interval, DateTime 
boundary)
+  {
+    return interval.getEnd().isAfter(boundary);
+  }
+
+  @Override
+  public List<CompactionJob> createCompactionJobs(
+      DruidInputSource source,
+      CompactionJobParams jobParams
+  )
+  {
+    // Check if the rule provider is ready before attempting to create jobs
+    if (!ruleProvider.isReady()) {
+      LOG.info(
+          "Rule provider [%s] is not ready, skipping reindexing job creation 
for dataSource[%s]",
+          ruleProvider.getType(),
+          dataSource
+      );
+      return Collections.emptyList();
+    }
+
+    final List<CompactionJob> allJobs = new ArrayList<>();
+    final DateTime currentTime = jobParams.getScheduleStartTime();
+
+    SegmentTimeline timeline = jobParams.getTimeline(dataSource);
+
+    if (timeline == null || timeline.isEmpty()) {
+      LOG.warn("Segment timeline null or empty for [%s] skipping creating 
compaction jobs.", dataSource);
+      return Collections.emptyList();
+    }
+
+    List<IntervalGranularityInfo> searchIntervals = 
generateAlignedSearchIntervals(currentTime);
+    if (searchIntervals.isEmpty()) {
+      LOG.warn("No search intervals generated for dataSource[%s], no 
reindexing jobs will be created", dataSource);
+      return Collections.emptyList();
+    }
+
+    // Adjust timeline interval by applying user defined skip offset (if any 
exists)
+    Interval adjustedTimelineInterval = applySkipOffset(
+        new Interval(timeline.first().getInterval().getStart(), 
timeline.last().getInterval().getEnd()),
+        jobParams.getScheduleStartTime()
+    );
+    if (adjustedTimelineInterval == null) {
+      LOG.warn("All data for dataSource[%s] is within skip offsets, no 
reindexing jobs will be created", dataSource);
+      return Collections.emptyList();
+    }
+
+    for (IntervalGranularityInfo intervalInfo : searchIntervals) {
+      Interval reindexingInterval = intervalInfo.getInterval();
+
+      if (!reindexingInterval.overlaps(adjustedTimelineInterval)) {
+        // No underlying data exists to reindex for this interval
+        LOG.debug("Search interval[%s] does not overlap with data range[%s], 
skipping", reindexingInterval, adjustedTimelineInterval);
+        continue;
+      }
+
+      // Skip intervals that extend past the skip offset boundary (not just 
data boundary)
+      // This preserves granularity alignment and ensures intervals exist in 
synthetic timeline
+      // Only apply this when a skip offset is actually configured
+      if ((skipOffsetFromNow != null || skipOffsetFromLatest != null) &&
+          intervalEndsAfter(reindexingInterval, 
adjustedTimelineInterval.getEnd())) {
+        LOG.debug("Search interval[%s] extends past skip offset boundary[%s], 
skipping to preserve alignment",
+                  reindexingInterval, adjustedTimelineInterval.getEnd());
+        continue;
+      }
+
+      InlineSchemaDataSourceCompactionConfig.Builder builder = 
createBaseBuilder();
+
+      ReindexingConfigBuilder configBuilder = new ReindexingConfigBuilder(
+          ruleProvider,
+          reindexingInterval,
+          currentTime,
+          searchIntervals
+      );
+      int ruleCount = configBuilder.applyTo(builder);
+
+      if (ruleCount > 0) {
+        LOG.debug("Creating reindexing jobs for interval[%s] with [%d] rules 
selected", reindexingInterval, ruleCount);
+        allJobs.addAll(
+            createJobsForSearchInterval(
+                createJobTemplateForInterval(builder.build()),
+                reindexingInterval,
+                source,
+                jobParams
+            )
+        );
+      } else {
+        LOG.debug("No applicable reindexing rules found for interval[%s]", 
reindexingInterval);
+      }
+    }
+    return allJobs;
+  }
+
+  @VisibleForTesting
+  protected CompactionJobTemplate createJobTemplateForInterval(
+      InlineSchemaDataSourceCompactionConfig config
+  )
+  {
+    return new CompactionConfigBasedJobTemplate(config, 
DELETION_RULE_OPTIMIZER);
+  }
+
+  /**
+   * Applies the configured skip offset to an interval by adjusting its end 
time. Uses either
+   * skipOffsetFromNow (relative to reference time) or skipOffsetFromLatest 
(relative to interval end).
+   * Returns null if the adjusted end would be before the interval start.
+   *
+   * @param interval the interval to adjust
+   * @param skipFromNowReferenceTime the reference time for skipOffsetFromNow 
calculation
+   * @return the interval with adjusted end time, or null if the result would 
be invalid
+   */
+  @Nullable
+  private Interval applySkipOffset(
+      Interval interval,
+      DateTime skipFromNowReferenceTime
+  )
+  {
+    DateTime maybeAdjustedEnd = interval.getEnd();
+    if (skipOffsetFromNow != null) {
+      maybeAdjustedEnd = skipFromNowReferenceTime.minus(skipOffsetFromNow);
+    } else if (skipOffsetFromLatest != null) {
+      maybeAdjustedEnd = maybeAdjustedEnd.minus(skipOffsetFromLatest);
+    }
+    if (maybeAdjustedEnd.isBefore(interval.getStart())) {
+      return null;
+    } else {
+      return new Interval(interval.getStart(), maybeAdjustedEnd);
+    }
+  }
+
+  private InlineSchemaDataSourceCompactionConfig.Builder createBaseBuilder()
+  {
+    return InlineSchemaDataSourceCompactionConfig.builder()
+        .forDataSource(dataSource)
+        .withTaskPriority(taskPriority)
+        .withInputSegmentSizeBytes(inputSegmentSizeBytes)
+        .withEngine(engine)
+        .withTaskContext(taskContext)
+        .withSkipOffsetFromLatest(Period.ZERO); // We handle skip offsets at 
the timeline level, we know we want to cover the entirety of the interval
+  }
+
+  /**
+   * Generates granularity-aligned search intervals based on segment 
granularity rules,
+   * then splits them at non-segment-granularity rule thresholds where safe to 
do so.
+   * <p>
+   * Algorithm:
+   * <ol>
+   *   <li>Generate base timeline from segment granularity rules with interval 
boundaries aligned with segment granularity of the underlying rules</li>
+   *   <li>Collect olderThan application thresholds from all 
non-segment-granularity rules</li>
+   *   <li>For each interval in the base timeline:
+   *   <ul>
+   *     <li>find olderThan thresholds for non-segment granularity rules that 
fall within it</li>
+   *     <li>Align those thresholds to the interval's targeted segment 
granularity using bucketStart on the threshold date</li>
+   *     <li>Split base intervals at the granularity aligned thresholds that 
were found inside of them</li>
+   *   </ul>
+   *   <li>Return the timeline of non-overlapping intervals split for most 
precise possible rule application (due to segment gran alignment, sometimes 
rules will be applied later than their explicitly defined period)</li>
+   * </ol>
+   *
+   * @param referenceTime the reference time for calculating period thresholds
+   * @return list of split and aligned intervals with their granularities and 
source rules, ordered from oldest to newest
+   * @throws IAE if no reindexing rules are configured
+   * @throws SegmentGranularityTimelineValidationException if granularities 
become coarser over time
+   */
+  List<IntervalGranularityInfo> generateAlignedSearchIntervals(DateTime 
referenceTime)
+  {
+    List<IntervalGranularityInfo> baseTimeline = 
generateBaseSegmentGranularityAlignedTimeline(referenceTime);
+    List<DateTime> nonSegmentGranThresholds = 
collectNonSegmentGranularityThresholds(referenceTime);
+
+    List<IntervalGranularityInfo> finalIntervals = new ArrayList<>();
+    for (IntervalGranularityInfo baseInterval : baseTimeline) {
+      List<DateTime> splitPoints = 
findGranularityAlignedSplitPoints(baseInterval, nonSegmentGranThresholds);
+      finalIntervals.addAll(splitIntervalAtPoints(baseInterval, splitPoints));
+    }
+
+    return finalIntervals;
+  }
+
+  /**
+   * Finds split points within a base interval by aligning 
non-segment-granularity thresholds
+   * to the interval's segment granularity. Only includes thresholds that fall 
strictly inside
+   * the interval (not at boundaries, which would create zero-length 
intervals).
+   *
+   * @param baseInterval the interval to find split points for
+   * @param nonSegmentGranThresholds thresholds from non-segment-granularity 
rules
+   * @return sorted, distinct list of aligned split points that fall inside 
the interval
+   */
+  private List<DateTime> findGranularityAlignedSplitPoints(
+      IntervalGranularityInfo baseInterval,
+      List<DateTime> nonSegmentGranThresholds
+  )
+  {
+    List<DateTime> splitPoints = new ArrayList<>();
+
+    for (DateTime threshold : nonSegmentGranThresholds) {
+      // Check if threshold falls inside this interval
+      if (threshold.isAfter(baseInterval.getInterval().getStart()) &&
+          threshold.isBefore(baseInterval.getInterval().getEnd())) {
+
+        // Align threshold to this interval's segment granularity
+        DateTime alignedThreshold = 
baseInterval.getGranularity().bucketStart(threshold);
+
+        // Only add if it's not at the boundaries (would create zero-length 
interval)
+        if (alignedThreshold.isAfter(baseInterval.getInterval().getStart()) &&
+            alignedThreshold.isBefore(baseInterval.getInterval().getEnd())) {
+          splitPoints.add(alignedThreshold);
+        }
+      }
+    }
+
+    // Remove duplicates and sort
+    return splitPoints.stream()
+        .distinct()
+        .sorted()
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Splits a base interval at the given split points, preserving the 
interval's granularity
+   * and source rule. If no split points exist, returns the original interval 
unchanged.
+   *
+   * @param baseInterval the interval to split
+   * @param splitPoints sorted list of points to split at (must be inside the 
interval)
+   * @return list of split intervals, or singleton list with original interval 
if no splits
+   */
+  private List<IntervalGranularityInfo> splitIntervalAtPoints(
+      IntervalGranularityInfo baseInterval,
+      List<DateTime> splitPoints
+  )
+  {
+    if (splitPoints.isEmpty()) {
+      LOG.debug("No splits for interval [%s]", baseInterval.getInterval());
+      return Collections.singletonList(baseInterval);
+    }
+
+    LOG.debug("Splitting interval [%s] at [%d] points", 
baseInterval.getInterval(), splitPoints.size());
+
+    List<IntervalGranularityInfo> result = new ArrayList<>();
+    DateTime start = baseInterval.getInterval().getStart();
+
+    for (DateTime splitPoint : splitPoints) {
+      result.add(new IntervalGranularityInfo(
+          new Interval(start, splitPoint),
+          baseInterval.getGranularity(),
+          baseInterval.getSourceRule()  // Preserve source rule from base 
interval
+      ));
+      start = splitPoint;
+    }
+
+    // Add final interval from last split point to end
+    result.add(new IntervalGranularityInfo(
+        new Interval(start, baseInterval.getInterval().getEnd()),
+        baseInterval.getGranularity(),
+        baseInterval.getSourceRule()  // Preserve source rule from base 
interval
+    ));
+
+    return result;
+  }
+
+  /**
+   * Generates a base timeline aligned to segment granularities found in 
segment granularity rules and if necessary,
+   * the default granularity for the supervisor.
+   * <p>
+   * Algorithm:
+   * <ol>
+   *   <li>If no segment granularity rules exist:
+   *     <ol type="a">
+   *       <li>Find the most recent threshold from non-segment-granularity 
rules</li>
+   *       <li>Use the default granularity to granularity align an interval 
from [-inf, most recent threshold)</li>
+   *     </ol>
+   *   </li>
+   *   <li>If segment granularity rules exist:
+   *     <ol type="a">
+   *       <li>Sort rules by period from longest to shortest (oldest to most 
recent threshold)</li>
+   *       <li>Create intervals for each rule, adjusting the interval end to 
be aligned to the rule's segment granularity</li>
+   *       <li>If non-segment-granularity thresholds exist that are more 
recent than the most recent segment granularity rule's end:
+   *         <ol type="i">
+   *           <li>Prepend an interval from [most recent segment granularity 
rule interval end, most recent non-segment-granularity threshold)</li>
+   *         </ol>
+   *       </li>
+   *     </ol>
+   *   </li>
+   * </ol>
+   *
+   * @param referenceTime the reference time for calculating period thresholds
+   * @return base timeline with granularity-aligned intervals, ordered from 
oldest to newest
+   * @throws IAE if no reindexing rules are configured
+   * @throws SegmentGranularityTimelineValidationException if granularities 
become coarser over time
+   */
+  private List<IntervalGranularityInfo> 
generateBaseSegmentGranularityAlignedTimeline(DateTime referenceTime)
+  {
+    List<ReindexingSegmentGranularityRule> segmentGranRules = 
ruleProvider.getSegmentGranularityRules();
+    List<DateTime> nonSegmentGranThresholds = 
collectNonSegmentGranularityThresholds(referenceTime);
+
+    List<IntervalGranularityInfo> baseTimeline;
+
+    if (segmentGranRules.isEmpty()) {
+      baseTimeline = 
createDefaultGranularityTimeline(nonSegmentGranThresholds);
+    } else {
+      baseTimeline = createSegmentGranularityTimeline(segmentGranRules, 
referenceTime);
+      baseTimeline = maybePrependRecentInterval(baseTimeline, 
nonSegmentGranThresholds);
+    }
+
+    validateSegmentGranularityTimeline(baseTimeline);
+    return baseTimeline;
+  }
+
+  /**
+   * Creates a timeline using the default segment granularity when no segment 
granularity rules exist.
+   * Uses the most recent threshold from non-segment-granularity rules to 
determine the end boundary.
+   *
+   * @param nonSegmentGranThresholds thresholds from non-segment-granularity 
rules
+   * @return single-interval timeline from MIN to most recent threshold, 
aligned to default granularity
+   * @throws IAE if no non-segment-granularity rules exist either
+   */
+  private List<IntervalGranularityInfo> 
createDefaultGranularityTimeline(List<DateTime> nonSegmentGranThresholds)
+  {
+    if (nonSegmentGranThresholds.isEmpty()) {
+      throw new IAE(
+          "CascadingReindexingTemplate requires at least one reindexing rule "
+          + "(segment granularity or other type)"
+      );
+    }
+
+    // Find the smallest period (most recent threshold = largest DateTime 
value)
+    DateTime mostRecentThreshold = Collections.max(nonSegmentGranThresholds);
+    DateTime alignedEnd = 
defaultSegmentGranularity.bucketStart(mostRecentThreshold);
+
+    LOG.debug(
+        "No segment granularity rules found for cascading supervisor[%s]. 
Creating base interval with "
+        + "default granularity [%s] and threshold [%s] (aligned: [%s])",
+        dataSource,
+        defaultSegmentGranularity,
+        mostRecentThreshold,
+        alignedEnd
+    );
+
+    return Collections.singletonList(new IntervalGranularityInfo(
+        new Interval(DateTimes.MIN, alignedEnd),
+        defaultSegmentGranularity,
+        null  // No source rule when using default granularity
+    ));
+  }
+
+  /**
+   * Creates a timeline by processing segment granularity rules in 
chronological order (oldest to newest).
+   * Each rule defines an interval with its specific segment granularity, with 
boundaries aligned to that granularity.
+   *
+   * @param segmentGranRules segment granularity rules to process
+   * @param referenceTime reference time for computing rule thresholds
+   * @return timeline of intervals with their granularities, ordered from 
oldest to newest
+   */
+  private List<IntervalGranularityInfo> createSegmentGranularityTimeline(
+      List<ReindexingSegmentGranularityRule> segmentGranRules,
+      DateTime referenceTime
+  )
+  {
+    // Sort rules by period from longest to shortest (oldest to most recent 
threshold)
+    List<ReindexingSegmentGranularityRule> sortedRules = 
segmentGranRules.stream()
+        .sorted(Comparator.comparingLong(rule -> {
+          DateTime threshold = referenceTime.minus(rule.getOlderThan());
+          return threshold.getMillis();
+        }))
+        .collect(Collectors.toList());
+
+    // Build base timeline with granularities tracked
+    List<IntervalGranularityInfo> baseTimeline = new ArrayList<>();
+    DateTime previousAlignedEnd = null;
+
+    for (ReindexingSegmentGranularityRule rule : sortedRules) {
+      DateTime rawEnd = referenceTime.minus(rule.getOlderThan());
+      DateTime alignedEnd = rule.getSegmentGranularity().bucketStart(rawEnd);
+      DateTime alignedStart = (previousAlignedEnd != null) ? 
previousAlignedEnd : DateTimes.MIN;

Review Comment:
   i think it would be fine to start out super restrictive, we can always 
revisit this part later, probably easier to do that than start out super 
permissive and try to rein it in later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to