cecemei commented on code in PR #19511:
URL: https://github.com/apache/druid/pull/19511#discussion_r3337535391
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -499,6 +502,42 @@ public CompactionSimulateResult
simulateRunWithConfigUpdate(ClusterCompactionCon
}
}
+ @Override
+ public ReindexingTimelineView previewReindexingTimeline(SupervisorSpec spec,
DateTime referenceTime)
+ {
+ if (!(spec instanceof CompactionSupervisorSpec)) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "Supervisor[%s] is not a compaction supervisor",
+ spec.getId()
+ );
+ }
+ final CompactionSupervisorSpec compactionSpec = (CompactionSupervisorSpec)
spec;
+ final CompactionJobTemplate template = compactionSpec.getTemplate();
+ if (!(template instanceof CascadingReindexingTemplate)) {
Review Comment:
nit: `cascadingTemplate` can go into this line
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.compaction.IntervalPartitioningInfo;
+import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds a {@link ReindexingPlan} from a {@link CascadingReindexingTemplate}
for a given reference
+ * time and the live segment timeline. The plan is the single source of truth
for both
+ * compaction-job creation and the API timeline view, ensuring the two cannot
drift.
+ *
+ * <p>The data range bounds the planned intervals (no-overlap intervals become
+ * {@link ReindexingPlan.IntervalDisposition#SKIPPED_NO_DATA}), {@code
skipOffsetFromLatest} is
+ * anchored on the live data, and intervals that extend past the skip boundary
are truncated or
+ * marked {@link ReindexingPlan.IntervalDisposition#SKIPPED_BEYOND_BOUNDARY}.
+ *
+ * <p>Validation failures (misconfigured granularity ordering, no rules) are
caught and surfaced
+ * on the returned plan as a {@link ReindexingPlan.PlanValidationError} so
HTTP callers can render
+ * them in the response rather than as 500s.
+ */
+final class ReindexingPlanner
+{
+ private static final Logger LOG = new Logger(ReindexingPlanner.class);
+
+ private final CascadingReindexingTemplate template;
+
+ ReindexingPlanner(CascadingReindexingTemplate template)
+ {
+ this.template = template;
+ }
+
+ /**
+ * Build a plan for the given reference time. The plan reflects exactly what
+ * {@link CascadingReindexingTemplate#createCompactionJobs} would do given
the same
+ * {@code timeline} and reference time.
+ *
+ * @param timeline live segment timeline for the datasource; must be
non-null and non-empty —
+ * callers are responsible for short-circuiting when no data
exists rather than
+ * calling this with a fabricated empty timeline.
+ */
+ ReindexingPlan plan(DateTime referenceTime, SegmentTimeline timeline)
+ {
+ final String dataSource = template.getDataSource();
+
+ if (!template.getReindexingRuleProvider().isReady()) {
+ LOG.info(
+ "Rule provider [%s] is not ready for dataSource[%s], returning empty
plan",
+ template.getReindexingRuleProvider().getType(),
+ dataSource
+ );
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final List<IntervalPartitioningInfo> searchIntervals;
+ try {
+ searchIntervals = template.generateAlignedSearchIntervals(referenceTime);
+ }
+ catch (SegmentGranularityTimelineValidationException e) {
+ LOG.warn(e, "Granularity timeline validation failed for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+
ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE,
+ e.getMessage(),
+ e.getOlderInterval(),
+ e.getOlderGranularity(),
+ e.getNewerInterval(),
+ e.getNewerGranularity()
+ )
+ );
+ }
+ catch (IAE e) {
+ LOG.warn(e, "Validation failed while planning timeline for
dataSource[%s]", dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+ ReindexingPlan.PlanValidationError.Type.VALIDATION_ERROR,
+ e.getMessage()
+ )
+ );
+ }
+
+ if (searchIntervals.isEmpty()) {
+ LOG.debug("No search intervals generated for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final ReindexingPlan.SkipOffsetResolution skipOffsetResolution =
buildSkipOffsetResolution(timeline, referenceTime);
+ final Interval dataRangeWithSkipOffset =
computeDataRangeWithSkipOffset(timeline, referenceTime);
Review Comment:
it seems like this could be derived from skipOffsetResolution?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -499,6 +502,42 @@ public CompactionSimulateResult
simulateRunWithConfigUpdate(ClusterCompactionCon
}
}
+ @Override
+ public ReindexingTimelineView previewReindexingTimeline(SupervisorSpec spec,
DateTime referenceTime)
+ {
+ if (!(spec instanceof CompactionSupervisorSpec)) {
Review Comment:
nit: `compactionSpec` can go into this line
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.server.compaction.ReindexingDataSchemaRule;
+import org.apache.druid.server.compaction.ReindexingDeletionRule;
+import org.apache.druid.server.compaction.ReindexingIndexSpecRule;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Public API DTO describing the timeline of search intervals and their
associated reindexing
+ * configurations for a cascading reindexing supervisor. Surfaced through the
+ * {@code /supervisor/{id}/reindexingTimeline} endpoint so operators (and the
web console)
+ * can visualize how rules are applied across time.
+ *
+ * <p>Always produced via {@link #fromPlan(ReindexingPlan)} so the API view is
a pure projection
+ * of the same plan that drives job creation — the two cannot drift.
+ */
+public class ReindexingTimelineView
+{
+ private final String dataSource;
+ private final DateTime referenceTime;
+ @Nullable
+ private final SkipOffsetInfo skipOffset;
+ private final List<IntervalConfig> intervals;
+ @Nullable
+ private final ValidationError validationError;
+
+ @JsonCreator
+ public ReindexingTimelineView(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("referenceTime") DateTime referenceTime,
+ @JsonProperty("skipOffset") @Nullable SkipOffsetInfo skipOffset,
+ @JsonProperty("intervals") List<IntervalConfig> intervals,
+ @JsonProperty("validationError") @Nullable ValidationError
validationError
+ )
+ {
+ this.dataSource = dataSource;
+ this.referenceTime = referenceTime;
+ this.skipOffset = skipOffset;
+ this.intervals = Collections.unmodifiableList(intervals);
+ this.validationError = validationError;
+ }
+
+ /**
+ * Projects a {@link ReindexingPlan} into the API view. Intervals that have
rules applied
+ * are emitted with their resolved config; intervals fully eclipsed by a
skip-offset boundary
+ * are emitted with zero rules so the UI can render the skipped span.
Intervals with no data
+ * overlap (a job-path concern) are not surfaced — operators see only ranges
that would
+ * actually be compacted or that are explicitly skipped.
+ */
+ static ReindexingTimelineView fromPlan(ReindexingPlan plan)
+ {
+ final List<IntervalConfig> intervalConfigs = new ArrayList<>();
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ switch (planned.getDisposition()) {
+ case INCLUDED:
+ case TRUNCATED:
+ if (planned.getRuleCount() > 0) {
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ planned.getRuleCount(),
+ planned.getConfig(),
+ planned.getAppliedRules()
+ ));
+ }
+ break;
+ case SKIPPED_BEYOND_BOUNDARY:
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ 0,
+ null,
+ Collections.emptyList()
+ ));
+ break;
+ case SKIPPED_NO_DATA:
+ // Operator-visible timeline only shows ranges that would be
compacted or are
+ // explicitly skipped by configuration. Ranges with no underlying
data are omitted.
+ break;
+ }
+ }
+
+ return new ReindexingTimelineView(
+ plan.getDataSource(),
+ plan.getReferenceTime(),
+ SkipOffsetInfo.fromPlan(plan.getSkipOffset()),
+ intervalConfigs,
+ ValidationError.fromPlan(plan.getValidationError())
+ );
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public DateTime getReferenceTime()
+ {
+ return referenceTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public SkipOffsetInfo getSkipOffset()
+ {
+ return skipOffset;
+ }
+
+ @JsonProperty
+ public List<IntervalConfig> getIntervals()
+ {
+ return intervals;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public ValidationError getValidationError()
+ {
+ return validationError;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReindexingTimelineView that = (ReindexingTimelineView) o;
+ return Objects.equals(dataSource, that.dataSource)
+ && Objects.equals(referenceTime, that.referenceTime)
+ && Objects.equals(skipOffset, that.skipOffset)
+ && Objects.equals(intervals, that.intervals)
+ && Objects.equals(validationError, that.validationError);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dataSource, referenceTime, skipOffset, intervals,
validationError);
+ }
+
+ /**
+ * Describes a granularity-ordering or configuration error encountered while
building the timeline.
+ * Surfaced in the response body rather than as a 5xx so clients can render
meaningful messages.
+ */
+ public static class ValidationError
+ {
+ private final String errorType;
+ private final String message;
+ @Nullable
+ private final String olderInterval;
+ @Nullable
+ private final String olderGranularity;
+ @Nullable
+ private final String newerInterval;
+ @Nullable
+ private final String newerGranularity;
+
+ @JsonCreator
+ public ValidationError(
+ @JsonProperty("errorType") String errorType,
+ @JsonProperty("message") String message,
+ @JsonProperty("olderInterval") @Nullable String olderInterval,
+ @JsonProperty("olderGranularity") @Nullable String olderGranularity,
+ @JsonProperty("newerInterval") @Nullable String newerInterval,
+ @JsonProperty("newerGranularity") @Nullable String newerGranularity
+ )
+ {
+ this.errorType = errorType;
+ this.message = message;
+ this.olderInterval = olderInterval;
+ this.olderGranularity = olderGranularity;
+ this.newerInterval = newerInterval;
+ this.newerGranularity = newerGranularity;
+ }
+
+ @Nullable
+ static ValidationError fromPlan(@Nullable
ReindexingPlan.PlanValidationError error)
Review Comment:
this is almost identical as `ReindexingPlan.PlanValidationError`, can we
merge them?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.compaction.IntervalPartitioningInfo;
+import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds a {@link ReindexingPlan} from a {@link CascadingReindexingTemplate}
for a given reference
+ * time and the live segment timeline. The plan is the single source of truth
for both
+ * compaction-job creation and the API timeline view, ensuring the two cannot
drift.
+ *
+ * <p>The data range bounds the planned intervals (no-overlap intervals become
+ * {@link ReindexingPlan.IntervalDisposition#SKIPPED_NO_DATA}), {@code
skipOffsetFromLatest} is
+ * anchored on the live data, and intervals that extend past the skip boundary
are truncated or
+ * marked {@link ReindexingPlan.IntervalDisposition#SKIPPED_BEYOND_BOUNDARY}.
+ *
+ * <p>Validation failures (misconfigured granularity ordering, no rules) are
caught and surfaced
+ * on the returned plan as a {@link ReindexingPlan.PlanValidationError} so
HTTP callers can render
+ * them in the response rather than as 500s.
+ */
+final class ReindexingPlanner
+{
+ private static final Logger LOG = new Logger(ReindexingPlanner.class);
+
+ private final CascadingReindexingTemplate template;
+
+ ReindexingPlanner(CascadingReindexingTemplate template)
+ {
+ this.template = template;
+ }
+
+ /**
+ * Build a plan for the given reference time. The plan reflects exactly what
+ * {@link CascadingReindexingTemplate#createCompactionJobs} would do given
the same
+ * {@code timeline} and reference time.
+ *
+ * @param timeline live segment timeline for the datasource; must be
non-null and non-empty —
+ * callers are responsible for short-circuiting when no data
exists rather than
+ * calling this with a fabricated empty timeline.
+ */
+ ReindexingPlan plan(DateTime referenceTime, SegmentTimeline timeline)
+ {
+ final String dataSource = template.getDataSource();
+
+ if (!template.getReindexingRuleProvider().isReady()) {
+ LOG.info(
+ "Rule provider [%s] is not ready for dataSource[%s], returning empty
plan",
+ template.getReindexingRuleProvider().getType(),
+ dataSource
+ );
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final List<IntervalPartitioningInfo> searchIntervals;
+ try {
+ searchIntervals = template.generateAlignedSearchIntervals(referenceTime);
+ }
+ catch (SegmentGranularityTimelineValidationException e) {
+ LOG.warn(e, "Granularity timeline validation failed for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+
ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE,
+ e.getMessage(),
+ e.getOlderInterval(),
+ e.getOlderGranularity(),
+ e.getNewerInterval(),
+ e.getNewerGranularity()
+ )
+ );
+ }
+ catch (IAE e) {
+ LOG.warn(e, "Validation failed while planning timeline for
dataSource[%s]", dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+ ReindexingPlan.PlanValidationError.Type.VALIDATION_ERROR,
+ e.getMessage()
+ )
+ );
+ }
+
+ if (searchIntervals.isEmpty()) {
+ LOG.debug("No search intervals generated for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final ReindexingPlan.SkipOffsetResolution skipOffsetResolution =
buildSkipOffsetResolution(timeline, referenceTime);
+ final Interval dataRangeWithSkipOffset =
computeDataRangeWithSkipOffset(timeline, referenceTime);
+ if (dataRangeWithSkipOffset == null) {
+ LOG.debug("All data for dataSource[%s] is within skip offsets; no
intervals will be planned", dataSource);
+ return new ReindexingPlan(dataSource, referenceTime,
skipOffsetResolution, Collections.emptyList(), null);
+ }
+
+ final List<ReindexingPlan.PlannedInterval> planned = new
ArrayList<>(searchIntervals.size());
+ for (int i = 0; i < searchIntervals.size(); i++) {
+ final IntervalPartitioningInfo originalInfo = searchIntervals.get(i);
+ final Interval originalInterval = originalInfo.getInterval();
+
+ if (!originalInterval.overlaps(dataRangeWithSkipOffset)) {
+ planned.add(noRuleEntry(originalInterval, originalInterval,
ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA, originalInfo));
+ continue;
+ }
+
+ // When a skip offset is configured, intervals extending past the
adjusted data-range end
+ // must be truncated (or skipped entirely if the truncation eats the
whole interval).
+ final DateTime truncationBoundary =
+ (template.getSkipOffsetFromNowOrNull() != null ||
template.getSkipOffsetFromLatestOrNull() != null)
+ ? dataRangeWithSkipOffset.getEnd()
+ : null;
+
+ Interval effectiveInterval = originalInterval;
+ ReindexingPlan.IntervalDisposition disposition =
ReindexingPlan.IntervalDisposition.INCLUDED;
+
+ if (truncationBoundary != null &&
originalInterval.getEnd().isAfter(truncationBoundary)) {
+ final DateTime alignedEnd =
originalInfo.getGranularity().bucketStart(truncationBoundary);
+ if (!alignedEnd.isAfter(originalInterval.getStart())) {
+ planned.add(noRuleEntry(originalInterval, originalInterval,
ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY, originalInfo));
+ continue;
+ }
+ effectiveInterval = new Interval(originalInterval.getStart(),
alignedEnd);
+ disposition = ReindexingPlan.IntervalDisposition.TRUNCATED;
+ // Replace the entry so the synthetic-timeline lookup in
ReindexingConfigBuilder matches the truncated interval.
+ searchIntervals.set(
+ i,
+ new IntervalPartitioningInfo(effectiveInterval,
originalInfo.getSourceRule(), originalInfo.isRuleSynthetic())
+ );
+ }
+
+ final InlineSchemaDataSourceCompactionConfig.Builder builder =
template.createBaseConfigBuilder();
+ final ReindexingConfigBuilder configBuilder = new
ReindexingConfigBuilder(
+ template.getReindexingRuleProvider(),
+ effectiveInterval,
+ referenceTime,
+ searchIntervals,
+ template.getTuningConfig()
+ );
+ final ReindexingConfigBuilder.BuildResult buildResult =
configBuilder.applyToWithDetails(builder);
+
+ if (buildResult.getRuleCount() > 0) {
+ planned.add(new ReindexingPlan.PlannedInterval(
+ effectiveInterval,
+ originalInterval,
+ disposition,
+ originalInfo.getSourceRule(),
+ originalInfo.isRuleSynthetic(),
+ originalInfo.getGranularity(),
+ buildResult.getRuleCount(),
+ builder.build(),
+ buildResult.getAppliedRules()
+ ));
+ } else {
+ // Interval is geometrically valid but no rules apply to it. We retain
INCLUDED/TRUNCATED disposition
+ // (the geometry didn't skip it) but mark config null so it won't
generate a job and the view can
+ // choose to hide it.
+ planned.add(new ReindexingPlan.PlannedInterval(
+ effectiveInterval,
+ originalInterval,
+ disposition,
+ originalInfo.getSourceRule(),
+ originalInfo.isRuleSynthetic(),
+ originalInfo.getGranularity(),
+ 0,
+ null,
+ Collections.emptyList()
+ ));
+ }
+ }
+
+ return new ReindexingPlan(dataSource, referenceTime, skipOffsetResolution,
planned, null);
+ }
+
+ /**
+ * Computes the data-range interval clipped by skip offsets. Returns null
when the entire data
+ * range falls within skip offsets and nothing remains to plan.
+ */
+ @Nullable
+ private Interval computeDataRangeWithSkipOffset(SegmentTimeline timeline,
DateTime referenceTime)
+ {
+ final Interval dataBounds = new Interval(
+ timeline.first().getInterval().getStart(),
+ timeline.last().getInterval().getEnd()
+ );
+
+ final Period skipFromNow = template.getSkipOffsetFromNowOrNull();
+ final Period skipFromLatest = template.getSkipOffsetFromLatestOrNull();
+
+ DateTime end = dataBounds.getEnd();
Review Comment:
so start is always the timeline start but end depends on offsets in
template? also what if data end is before the skip offset, would we want a
smaller interval?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.server.compaction.ReindexingDataSchemaRule;
+import org.apache.druid.server.compaction.ReindexingDeletionRule;
+import org.apache.druid.server.compaction.ReindexingIndexSpecRule;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Public API DTO describing the timeline of search intervals and their
associated reindexing
+ * configurations for a cascading reindexing supervisor. Surfaced through the
+ * {@code /supervisor/{id}/reindexingTimeline} endpoint so operators (and the
web console)
+ * can visualize how rules are applied across time.
+ *
+ * <p>Always produced via {@link #fromPlan(ReindexingPlan)} so the API view is
a pure projection
+ * of the same plan that drives job creation — the two cannot drift.
+ */
+public class ReindexingTimelineView
+{
+ private final String dataSource;
+ private final DateTime referenceTime;
+ @Nullable
+ private final SkipOffsetInfo skipOffset;
+ private final List<IntervalConfig> intervals;
+ @Nullable
+ private final ValidationError validationError;
+
+ @JsonCreator
+ public ReindexingTimelineView(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("referenceTime") DateTime referenceTime,
+ @JsonProperty("skipOffset") @Nullable SkipOffsetInfo skipOffset,
+ @JsonProperty("intervals") List<IntervalConfig> intervals,
+ @JsonProperty("validationError") @Nullable ValidationError
validationError
+ )
+ {
+ this.dataSource = dataSource;
+ this.referenceTime = referenceTime;
+ this.skipOffset = skipOffset;
+ this.intervals = Collections.unmodifiableList(intervals);
+ this.validationError = validationError;
+ }
+
+ /**
+ * Projects a {@link ReindexingPlan} into the API view. Intervals that have
rules applied
+ * are emitted with their resolved config; intervals fully eclipsed by a
skip-offset boundary
+ * are emitted with zero rules so the UI can render the skipped span.
Intervals with no data
+ * overlap (a job-path concern) are not surfaced — operators see only ranges
that would
+ * actually be compacted or that are explicitly skipped.
+ */
+ static ReindexingTimelineView fromPlan(ReindexingPlan plan)
+ {
+ final List<IntervalConfig> intervalConfigs = new ArrayList<>();
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ switch (planned.getDisposition()) {
+ case INCLUDED:
+ case TRUNCATED:
+ if (planned.getRuleCount() > 0) {
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ planned.getRuleCount(),
+ planned.getConfig(),
+ planned.getAppliedRules()
+ ));
+ }
+ break;
+ case SKIPPED_BEYOND_BOUNDARY:
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ 0,
+ null,
+ Collections.emptyList()
+ ));
+ break;
+ case SKIPPED_NO_DATA:
+ // Operator-visible timeline only shows ranges that would be
compacted or are
+ // explicitly skipped by configuration. Ranges with no underlying
data are omitted.
+ break;
+ }
+ }
+
+ return new ReindexingTimelineView(
+ plan.getDataSource(),
+ plan.getReferenceTime(),
+ SkipOffsetInfo.fromPlan(plan.getSkipOffset()),
+ intervalConfigs,
+ ValidationError.fromPlan(plan.getValidationError())
+ );
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public DateTime getReferenceTime()
+ {
+ return referenceTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public SkipOffsetInfo getSkipOffset()
+ {
+ return skipOffset;
+ }
+
+ @JsonProperty
+ public List<IntervalConfig> getIntervals()
+ {
+ return intervals;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public ValidationError getValidationError()
+ {
+ return validationError;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReindexingTimelineView that = (ReindexingTimelineView) o;
+ return Objects.equals(dataSource, that.dataSource)
+ && Objects.equals(referenceTime, that.referenceTime)
+ && Objects.equals(skipOffset, that.skipOffset)
+ && Objects.equals(intervals, that.intervals)
+ && Objects.equals(validationError, that.validationError);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dataSource, referenceTime, skipOffset, intervals,
validationError);
+ }
+
+ /**
+ * Describes a granularity-ordering or configuration error encountered while
building the timeline.
+ * Surfaced in the response body rather than as a 5xx so clients can render
meaningful messages.
+ */
+ public static class ValidationError
+ {
+ private final String errorType;
+ private final String message;
+ @Nullable
+ private final String olderInterval;
+ @Nullable
+ private final String olderGranularity;
+ @Nullable
+ private final String newerInterval;
+ @Nullable
+ private final String newerGranularity;
+
+ @JsonCreator
+ public ValidationError(
+ @JsonProperty("errorType") String errorType,
+ @JsonProperty("message") String message,
+ @JsonProperty("olderInterval") @Nullable String olderInterval,
+ @JsonProperty("olderGranularity") @Nullable String olderGranularity,
+ @JsonProperty("newerInterval") @Nullable String newerInterval,
+ @JsonProperty("newerGranularity") @Nullable String newerGranularity
+ )
+ {
+ this.errorType = errorType;
+ this.message = message;
+ this.olderInterval = olderInterval;
+ this.olderGranularity = olderGranularity;
+ this.newerInterval = newerInterval;
+ this.newerGranularity = newerGranularity;
+ }
+
+ @Nullable
+ static ValidationError fromPlan(@Nullable
ReindexingPlan.PlanValidationError error)
+ {
+ if (error == null) {
+ return null;
+ }
+ return new ValidationError(
+ error.getType().name(),
+ error.getMessage(),
+ error.getOlderInterval() == null ? null :
error.getOlderInterval().toString(),
+ error.getOlderGranularity() == null ? null :
error.getOlderGranularity().toString(),
+ error.getNewerInterval() == null ? null :
error.getNewerInterval().toString(),
+ error.getNewerGranularity() == null ? null :
error.getNewerGranularity().toString()
+ );
+ }
+
+ @JsonProperty
+ public String getErrorType()
+ {
+ return errorType;
+ }
+
+ @JsonProperty
+ public String getMessage()
+ {
+ return message;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getOlderInterval()
+ {
+ return olderInterval;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getOlderGranularity()
+ {
+ return olderGranularity;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getNewerInterval()
+ {
+ return newerInterval;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getNewerGranularity()
+ {
+ return newerGranularity;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ValidationError that = (ValidationError) o;
+ return Objects.equals(errorType, that.errorType)
+ && Objects.equals(message, that.message)
+ && Objects.equals(olderInterval, that.olderInterval)
+ && Objects.equals(olderGranularity, that.olderGranularity)
+ && Objects.equals(newerInterval, that.newerInterval)
+ && Objects.equals(newerGranularity, that.newerGranularity);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(errorType, message, olderInterval, olderGranularity,
newerInterval, newerGranularity);
+ }
+ }
+
+ /**
+ * Describes the configured skip offset that was applied to this view. Only
present in the
+ * response when the configured skip offset was actually resolved against
the live segment
+ * timeline — operators don't see a "configured but not applied" state
because the scheduler
+ * short-circuits to an empty timeline when no segments exist.
+ */
+ public static class SkipOffsetInfo
Review Comment:
this is almost identical as SkipOffsetResolution, can we merge them?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.server.compaction.ReindexingDataSchemaRule;
+import org.apache.druid.server.compaction.ReindexingDeletionRule;
+import org.apache.druid.server.compaction.ReindexingIndexSpecRule;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Public API DTO describing the timeline of search intervals and their
associated reindexing
+ * configurations for a cascading reindexing supervisor. Surfaced through the
+ * {@code /supervisor/{id}/reindexingTimeline} endpoint so operators (and the
web console)
+ * can visualize how rules are applied across time.
+ *
+ * <p>Always produced via {@link #fromPlan(ReindexingPlan)} so the API view is
a pure projection
+ * of the same plan that drives job creation — the two cannot drift.
+ */
+public class ReindexingTimelineView
+{
+ private final String dataSource;
+ private final DateTime referenceTime;
+ @Nullable
+ private final SkipOffsetInfo skipOffset;
+ private final List<IntervalConfig> intervals;
+ @Nullable
+ private final ValidationError validationError;
+
+ @JsonCreator
+ public ReindexingTimelineView(
Review Comment:
`ReindexingTimelineView` sounds ambiguous, it's not like `SegmentTimeline`
or `ServerView`. maybe `ReindexingPlanResponse`, WDYT?
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.compaction.IntervalPartitioningInfo;
+import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds a {@link ReindexingPlan} from a {@link CascadingReindexingTemplate}
for a given reference
+ * time and the live segment timeline. The plan is the single source of truth
for both
+ * compaction-job creation and the API timeline view, ensuring the two cannot
drift.
+ *
+ * <p>The data range bounds the planned intervals (no-overlap intervals become
+ * {@link ReindexingPlan.IntervalDisposition#SKIPPED_NO_DATA}), {@code
skipOffsetFromLatest} is
+ * anchored on the live data, and intervals that extend past the skip boundary
are truncated or
+ * marked {@link ReindexingPlan.IntervalDisposition#SKIPPED_BEYOND_BOUNDARY}.
+ *
+ * <p>Validation failures (misconfigured granularity ordering, no rules) are
caught and surfaced
+ * on the returned plan as a {@link ReindexingPlan.PlanValidationError} so
HTTP callers can render
+ * them in the response rather than as 500s.
+ */
+final class ReindexingPlanner
+{
+ private static final Logger LOG = new Logger(ReindexingPlanner.class);
+
+ private final CascadingReindexingTemplate template;
+
+ ReindexingPlanner(CascadingReindexingTemplate template)
+ {
+ this.template = template;
+ }
+
+ /**
+ * Build a plan for the given reference time. The plan reflects exactly what
+ * {@link CascadingReindexingTemplate#createCompactionJobs} would do given
the same
+ * {@code timeline} and reference time.
+ *
+ * @param timeline live segment timeline for the datasource; must be
non-null and non-empty —
+ * callers are responsible for short-circuiting when no data
exists rather than
+ * calling this with a fabricated empty timeline.
+ */
+ ReindexingPlan plan(DateTime referenceTime, SegmentTimeline timeline)
+ {
+ final String dataSource = template.getDataSource();
+
+ if (!template.getReindexingRuleProvider().isReady()) {
+ LOG.info(
+ "Rule provider [%s] is not ready for dataSource[%s], returning empty
plan",
+ template.getReindexingRuleProvider().getType(),
+ dataSource
+ );
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final List<IntervalPartitioningInfo> searchIntervals;
+ try {
+ searchIntervals = template.generateAlignedSearchIntervals(referenceTime);
+ }
+ catch (SegmentGranularityTimelineValidationException e) {
+ LOG.warn(e, "Granularity timeline validation failed for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+
ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE,
+ e.getMessage(),
+ e.getOlderInterval(),
+ e.getOlderGranularity(),
+ e.getNewerInterval(),
+ e.getNewerGranularity()
+ )
+ );
+ }
+ catch (IAE e) {
+ LOG.warn(e, "Validation failed while planning timeline for
dataSource[%s]", dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+ ReindexingPlan.PlanValidationError.Type.VALIDATION_ERROR,
+ e.getMessage()
+ )
+ );
+ }
+
+ if (searchIntervals.isEmpty()) {
+ LOG.debug("No search intervals generated for dataSource[%s]",
dataSource);
+ return new ReindexingPlan(dataSource, referenceTime, null,
Collections.emptyList(), null);
+ }
+
+ final ReindexingPlan.SkipOffsetResolution skipOffsetResolution =
buildSkipOffsetResolution(timeline, referenceTime);
+ final Interval dataRangeWithSkipOffset =
computeDataRangeWithSkipOffset(timeline, referenceTime);
+ if (dataRangeWithSkipOffset == null) {
+ LOG.debug("All data for dataSource[%s] is within skip offsets; no
intervals will be planned", dataSource);
+ return new ReindexingPlan(dataSource, referenceTime,
skipOffsetResolution, Collections.emptyList(), null);
+ }
+
+ final List<ReindexingPlan.PlannedInterval> planned = new
ArrayList<>(searchIntervals.size());
+ for (int i = 0; i < searchIntervals.size(); i++) {
+ final IntervalPartitioningInfo originalInfo = searchIntervals.get(i);
+ final Interval originalInterval = originalInfo.getInterval();
+
+ if (!originalInterval.overlaps(dataRangeWithSkipOffset)) {
Review Comment:
this seems valid, what if dataRangeWithSkipOffset is [Day0, DAY10] and
searchInterval is [DAY12, DAY13]?
i also wonder what if this just start a reindex task anyway, if the interval
is before dataRange, is anything horrible going to happen? Even if the interval
is within dataRange, it's also possible timeline is empty in that specific
interval.
I feel it'd be simpler if there's only skip offset, and search interval
based on rules.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]