imply-cheddar commented on code in PR #18402: URL: https://github.com/apache/druid/pull/18402#discussion_r2344651752
########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CatalogCompactionJobTemplate.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.catalog.MetadataCatalog; +import org.apache.druid.catalog.model.ResolvedTable; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.table.IndexingTemplateDefn; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.indexing.template.BatchIndexingJobTemplate; +import org.apache.druid.java.util.common.granularity.Granularity; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * Compaction template that delegates job creation to a template stored in the + * Druid catalog. + */ +public class CatalogCompactionJobTemplate implements CompactionJobTemplate +{ + public static final String TYPE = "compactCatalog"; + + private final String templateId; + + private final TableId tableId; + private final MetadataCatalog catalog; + + @JsonCreator + public CatalogCompactionJobTemplate( + @JsonProperty("templateId") String templateId, + @JacksonInject MetadataCatalog catalog + ) + { + this.templateId = templateId; + this.catalog = catalog; + this.tableId = TableId.of(TableId.INDEXING_TEMPLATE_SCHEMA, templateId); + } + + @JsonProperty + public String getTemplateId() + { + return templateId; + } + + @Nullable + @Override + public Granularity getSegmentGranularity() + { + final CompactionJobTemplate delegate = getDelegate(); + return delegate == null ? null : delegate.getSegmentGranularity(); + } + + @Override + public List<CompactionJob> createCompactionJobs( + DruidInputSource source, + CompactionJobParams params + ) + { + final CompactionJobTemplate delegate = getDelegate(); + if (delegate == null) { + return List.of(); Review Comment: This would mean that the table doesn't actually exist right? Should we offer some sort of indication that there is a reference to a table that doesn't exist? an exception? a log line? Something that can help figure out that there's an issue with how things are setup? ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobParams.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.indexing.template.JobParams; +import org.apache.druid.server.compaction.CompactionSnapshotBuilder; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.DateTime; + +/** + * Parameters used while creating a {@link CompactionJob} using a {@link CompactionJobTemplate}. + */ +public class CompactionJobParams implements JobParams Review Comment: In some other code, you are adjsuting the input with a searchInterval in order to limit the time interval seen. When I initially read that, I wondered "isn't that a param? why isn't it on the param object". After seeing this class, I still don't have a good answer, why isn't that a param on the param object? ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CatalogCompactionJobTemplate.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.catalog.MetadataCatalog; +import org.apache.druid.catalog.model.ResolvedTable; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.table.IndexingTemplateDefn; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.indexing.template.BatchIndexingJobTemplate; +import org.apache.druid.java.util.common.granularity.Granularity; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * Compaction template that delegates job creation to a template stored in the + * Druid catalog. + */ +public class CatalogCompactionJobTemplate implements CompactionJobTemplate +{ + public static final String TYPE = "compactCatalog"; + + private final String templateId; + + private final TableId tableId; + private final MetadataCatalog catalog; + + @JsonCreator + public CatalogCompactionJobTemplate( + @JsonProperty("templateId") String templateId, + @JacksonInject MetadataCatalog catalog + ) + { + this.templateId = templateId; + this.catalog = catalog; + this.tableId = TableId.of(TableId.INDEXING_TEMPLATE_SCHEMA, templateId); + } + + @JsonProperty + public String getTemplateId() + { + return templateId; + } + + @Nullable + @Override + public Granularity getSegmentGranularity() + { + final CompactionJobTemplate delegate = getDelegate(); + return delegate == null ? null : delegate.getSegmentGranularity(); + } + + @Override + public List<CompactionJob> createCompactionJobs( + DruidInputSource source, + CompactionJobParams params + ) + { + final CompactionJobTemplate delegate = getDelegate(); + if (delegate == null) { + return List.of(); + } else { + return delegate.createCompactionJobs(source, params); + } + } + + @Nullable + private CompactionJobTemplate getDelegate() + { + final ResolvedTable resolvedTable = catalog.resolveTable(tableId); + if (resolvedTable == null) { + return null; + } + + final BatchIndexingJobTemplate delegate + = resolvedTable.decodeProperty(IndexingTemplateDefn.PROPERTY_PAYLOAD); + if (delegate instanceof CompactionJobTemplate) { + return (CompactionJobTemplate) delegate; + } else { Review Comment: Perhaps this is an established pattern. But I would never have thought that you are supposed to get the table from the catalog and then pass a magical parameter to "decodeProperty" in order to get it to become an object that is a template. Why isn't there a method that's like `asJobTemplate()` or `as()` or something like that? What's `PROPERTY_PAYLOAD` and why is it special? Will a decoded property always generate a `BatchIndexingJobTemplate`? What does "decode property" have to do with generating a `BatchIndexingJobTemplate`? Maybe as I read more of the code I'll understand, but only reading the usage-side of this is not very intuitive. ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientTaskQuery; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.input.DruidDatasourceDestination; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; +import org.apache.druid.indexing.template.BatchIndexingJob; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; +import org.apache.druid.server.compaction.CompactionSlotManager; +import org.apache.druid.server.compaction.CompactionSnapshotBuilder; +import org.apache.druid.server.compaction.CompactionStatus; +import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; +import org.apache.druid.server.coordinator.CompactionConfigValidationResult; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Iterates over all eligible compaction jobs in order of their priority. + * A fresh instance of this class must be used in every run of the + * {@link CompactionScheduler}. + */ Review Comment: What's the scale of compaction jobs? Like, how many do we expect this to be iterating at any point in time? ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.input.DruidDatasourceDestination; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionSlotManager; +import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator; +import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; +import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * This template never needs to be deserialized as a {@code BatchIndexingJobTemplate}. + * It is just a delegating template that uses a {@link DataSourceCompactionConfig} + * to create compaction jobs. + */ +public class CompactionConfigBasedJobTemplate implements CompactionJobTemplate +{ + private final DataSourceCompactionConfig config; + + public CompactionConfigBasedJobTemplate(DataSourceCompactionConfig config) + { + this.config = config; + } + + public static CompactionConfigBasedJobTemplate create(String dataSource, CompactionStateMatcher stateMatcher) + { + return new CompactionConfigBasedJobTemplate( + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.ZERO) + .withTransformSpec(stateMatcher.getTransformSpec()) + .withProjections(stateMatcher.getProjections()) + .withMetricsSpec(stateMatcher.getMetricsSpec()) + .withGranularitySpec(stateMatcher.getGranularitySpec()) + .build() + ); + } + + @Nullable + @Override + public Granularity getSegmentGranularity() + { + return config.getSegmentGranularity(); + } + + @Override + public List<CompactionJob> createCompactionJobs( + DruidInputSource source, + CompactionJobParams params + ) + { + final DataSourceCompactibleSegmentIterator segmentIterator = getCompactibleCandidates(source, params); + + final List<CompactionJob> jobs = new ArrayList<>(); + + // Create a job for each CompactionCandidate + while (segmentIterator.hasNext()) { + final CompactionCandidate candidate = segmentIterator.next(); + + ClientCompactionTaskQuery taskPayload + = CompactSegments.createCompactionTask(candidate, config, params.getClusterCompactionConfig().getEngine()); + jobs.add( + new CompactionJob( + taskPayload, + candidate, + CompactionSlotManager.getMaxTaskSlotsForNativeCompactionTask(taskPayload.getTuningConfig()) + ) + ); + } + + return jobs; + } + + @Override + public String getType() + { + throw new UnsupportedOperationException("This template type cannot be serialized"); + } Review Comment: What is someone supposed to do if this exception gets thrown? Why would it have happened? Are there any hints that can be provided to the developer who sees this and needs to fix it? Also, it should probably be either a `DruidException.defensive()` or a `NotYetImplemented` exception. ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java: ########## @@ -107,43 +121,57 @@ public class OverlordCompactionScheduler implements CompactionScheduler private final AtomicBoolean isLeader = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false); - private final CompactSegments duty; /** * The scheduler should enable/disable polling of segments only if the Overlord * is running in standalone mode, otherwise this is handled by the DruidCoordinator * class itself. */ private final boolean shouldPollSegments; + private final long schedulePeriodMillis; private final Stopwatch sinceStatsEmitted = Stopwatch.createUnstarted(); @Inject public OverlordCompactionScheduler( TaskMaster taskMaster, + GlobalTaskLockbox taskLockbox, TaskQueryTool taskQueryTool, SegmentsMetadataManager segmentManager, + SegmentsMetadataManagerConfig segmentManagerConfig, Supplier<DruidCompactionConfig> compactionConfigSupplier, CompactionStatusTracker statusTracker, CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, + TaskActionClientFactory taskActionClientFactory, + DruidInputSourceFactory druidInputSourceFactory, ScheduledExecutorFactory executorFactory, + BrokerClient brokerClient, ServiceEmitter emitter, ObjectMapper objectMapper ) { + final long segmentPollPeriodSeconds = + segmentManagerConfig.getPollDuration().toStandardDuration().getMillis(); + this.schedulePeriodMillis = Math.min(5_000, segmentPollPeriodSeconds); Review Comment: A 5 second poll is pretty rapid, what's the logic behind the need for such a rapid poll and why it won't be a significant resource burden? ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingCompactionTemplate.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexing.input.DruidInputSource; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.transform.CompactionTransformSpec; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Template to perform period-based cascading compaction. Contains a list of + * {@link CompactionRule} which divide the segment timeline into compactible + * intervals. Each rule specifies a period relative to the current time which is + * used to determine its applicable interval: + * <ul> + * <li>Rule 1: range = [now - p1, +inf)</li> + * <li>Rule 2: range = [now - p2, now - p1)</li> + * <li>...</li> + * <li>Rule n: range = (-inf, now - p(n - 1))</li> + * </ul> + * + * If two adjacent rules explicitly specify a segment granularity, the boundary + * between them may be {@linkplain CompactionRule#computeStartTime adjusted} + * to ensure that there are no uncompacted gaps in the timeline. + * <p> + * This template never needs to be deserialized as a {@code BatchIndexingJobTemplate}, + * only as a {@link DataSourceCompactionConfig} in {@link CompactionSupervisorSpec}. + */ +public class CascadingCompactionTemplate implements CompactionJobTemplate, DataSourceCompactionConfig +{ + public static final String TYPE = "compactCascade"; + + private final String dataSource; + private final List<CompactionRule> rules; + + @JsonCreator + public CascadingCompactionTemplate( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("rules") List<CompactionRule> rules + ) + { + this.rules = rules; + this.dataSource = Objects.requireNonNull(dataSource, "'dataSource' cannot be null"); + + InvalidInput.conditionalException(rules != null && !rules.isEmpty(), "'rules' cannot be empty"); + } + + @Override + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public List<CompactionRule> getRules() + { + return rules; + } + + @Override + public List<CompactionJob> createCompactionJobs( + DruidInputSource source, + CompactionJobParams jobParams + ) + { + final List<CompactionJob> allJobs = new ArrayList<>(); + + // Include future dates in the first rule + final DateTime currentTime = jobParams.getScheduleStartTime(); + DateTime previousRuleStartTime = DateTimes.MAX; + for (int i = 0; i < rules.size() - 1; ++i) { + final CompactionRule rule = rules.get(i); + final DateTime ruleStartTime = rule.computeStartTime(currentTime, rules.get(i + 1)); + final Interval ruleInterval = new Interval(ruleStartTime, previousRuleStartTime); + + allJobs.addAll( + createJobsForSearchInterval(rule.getTemplate(), ruleInterval, source, jobParams) + ); + + previousRuleStartTime = ruleStartTime; + } + + // Include past dates in the last rule + final CompactionRule lastRule = rules.get(rules.size() - 1); + final Interval lastRuleInterval = new Interval(DateTimes.MIN, previousRuleStartTime); + allJobs.addAll( + createJobsForSearchInterval(lastRule.getTemplate(), lastRuleInterval, source, jobParams) + ); + + return allJobs; + } + + private List<CompactionJob> createJobsForSearchInterval( + CompactionJobTemplate template, + Interval searchInterval, + DruidInputSource inputSource, + CompactionJobParams jobParams + ) + { + final List<CompactionJob> allJobs = template.createCompactionJobs( + inputSource.withInterval(searchInterval), + jobParams + ); + + // Filter out jobs if they are outside the search interval + final List<CompactionJob> validJobs = new ArrayList<>(); + for (CompactionJob job : allJobs) { + final Interval compactionInterval = job.getCandidate().getCompactionInterval(); + if (searchInterval.contains(compactionInterval)) { + validJobs.add(job); + } + } Review Comment: Why would you get jobs outside of the interval? That seems like a problem with the contract or the specific implementation rather than a concern that everybody who ever calls the method needs to apply? ########## server/src/main/java/org/apache/druid/indexing/template/JobParams.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.template; + +import org.joda.time.DateTime; + +/** + * Provides parameters required to create a {@link BatchIndexingJob}. + */ +public interface JobParams Review Comment: Why interface instead of class? It seems like this interface is very likely to only ever carry `getters` and it's unclear to me why it's important that different classes can implement this instead of just having a reference to one of these lying around. ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobParams.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.indexing.template.JobParams; +import org.apache.druid.server.compaction.CompactionSnapshotBuilder; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.DateTime; + +/** + * Parameters used while creating a {@link CompactionJob} using a {@link CompactionJobTemplate}. + */ +public class CompactionJobParams implements JobParams +{ + private final DateTime scheduleStartTime; + private final TimelineProvider timelineProvider; + private final ClusterCompactionConfig clusterCompactionConfig; + private final CompactionSnapshotBuilder snapshotBuilder; + + public CompactionJobParams( + DateTime scheduleStartTime, + ClusterCompactionConfig clusterCompactionConfig, + TimelineProvider timelineProvider, + CompactionSnapshotBuilder snapshotBuilder + ) + { + this.scheduleStartTime = scheduleStartTime; + this.clusterCompactionConfig = clusterCompactionConfig; + this.timelineProvider = timelineProvider; + this.snapshotBuilder = snapshotBuilder; + } + + @Override + public DateTime getScheduleStartTime() + { + return scheduleStartTime; + } + + public ClusterCompactionConfig getClusterCompactionConfig() + { + return clusterCompactionConfig; + } + + public SegmentTimeline getTimeline(String dataSource) + { + return timelineProvider.getTimelineForDataSource(dataSource); + } + + public CompactionSnapshotBuilder getSnapshotBuilder() + { + return snapshotBuilder; + } Review Comment: Where is the right place to find the contract of what these objects are and what they do and why they exist? If I'm just a lowly developer trying to create a new CompactionTask thingie and I'm passing in a `CompactionJobParams` how do I go about figuring out what the semantics of the things that were given to me are and what I need to do with them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
