abhishekagarwal87 commented on code in PR #16768:
URL: https://github.com/apache/druid/pull/16768#discussion_r1713350238


##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.server.compaction.CompactionSimulateResult;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+
+import java.util.Map;
+
+public interface CompactionScheduler
+{
+  void start();
+
+  void stop();
+
+  boolean isRunning();
+
+  void startCompaction(String dataSourceName, DataSourceCompactionConfig 
compactionConfig);
+
+  void stopCompaction(String dataSourceName);
+
+  Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots();
+
+  AutoCompactionSnapshot getCompactionSnapshot(String dataSource);
+
+  Long getSegmentBytesAwaitingCompaction(String dataSource);

Review Comment:
   Instead of it being a special method just for segmentBytesAwaiting, you may 
instead create a class for holding other metrics and return that here. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+
+/**
+ * Supervisor for compaction of a single datasource.
+ */
+public class CompactionSupervisor implements Supervisor
+{
+  private static final Logger log = new Logger(CompactionSupervisor.class);
+
+  private final CompactionScheduler scheduler;
+  private final CompactionSupervisorSpec supervisorSpec;
+
+  public CompactionSupervisor(
+      CompactionSupervisorSpec supervisorSpec,
+      CompactionScheduler scheduler
+  )
+  {
+    this.supervisorSpec = supervisorSpec;
+    this.scheduler = scheduler;
+  }
+
+  @Override
+  public void start()
+  {
+    final String dataSource = getDataSource();
+    if (supervisorSpec.isSuspended()) {
+      log.info("Suspending compaction for dataSource[%s].", dataSource);
+      scheduler.stopCompaction(dataSource);
+    } else {
+      log.info("Starting compaction for dataSource[%s].", dataSource);
+      scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
+    }
+  }
+
+  @Override
+  public void stop(boolean stopGracefully)
+  {
+    final String dataSource = getDataSource();
+    log.info("Stopping compaction for dataSource[%s].", dataSource);
+    scheduler.stopCompaction(dataSource);
+  }
+
+  @Override
+  public SupervisorReport<AutoCompactionSnapshot> getStatus()
+  {
+    final AutoCompactionSnapshot snapshot;
+    if (supervisorSpec.isSuspended()) {
+      snapshot = AutoCompactionSnapshot.builder(getDataSource())
+                                       
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
+                                       .build();
+    } else {
+      snapshot = scheduler.getCompactionSnapshot(getDataSource());
+    }
+
+    return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), 
snapshot);
+  }
+
+  @Override
+  public SupervisorStateManager.State getState()
+  {
+    if (!scheduler.isRunning()) {
+      return State.SCHEDULER_STOPPED;
+    } else if (supervisorSpec.isSuspended()) {
+      return State.SUSPENDED;
+    } else {

Review Comment:
   whats the difference between stopped and suspended? is `stopped` just a 
temporary state that goes away after JVM bootstraps? 



##########
server/src/main/java/org/apache/druid/server/compaction/PriorityBasedSegmentSearchPolicy.java:
##########


Review Comment:
   How does this interface allow for policies that are at interval level e.g. 
compacting intervals with the least compacted data first? 



##########
server/src/main/java/org/apache/druid/server/http/CompactionResource.java:
##########
@@ -100,4 +114,31 @@ public Response getCompactionSnapshotForDataSource(
     }
     return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
   }
+
+  @POST
+  @Path("/simulate")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response simulateClusterCompactionConfigUpdate(
+      ClusterCompactionConfig updatePayload
+  )
+  {
+    if (compactionSupervisorsConfig.isEnabled()) {
+      return buildErrorResponseWhenRunningAsSupervisor();
+    }
+
+    return Response.ok().entity(
+        coordinator.simulateRunWithConfigUpdate(updatePayload)
+    ).build();
+  }
+
+  private Response buildErrorResponseWhenRunningAsSupervisor()
+  {
+    return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
+        ImmutableMap.of(
+            "error",
+            "Compaction has been disabled on the Coordinator."
+            + " Use Overlord APIs to fetch compaction status."
+        )
+    ).build();
+  }

Review Comment:
   This will be a problem during rolling config changes when you are going from 
overlord based compaction to coordinator based compaction. Router will start 
issuing requests to coordinator but coordinator will reject till it has been 
restarted with the property. maybe we can return an error code (302) and router 
can send the request to overlord? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionStatusReport.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+/**
+ * This can contain stats and progress and stuff.

Review Comment:
   javadocs are bit unclear. It's not clear how the simulation is related to 
this class. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+
+/**
+ * Supervisor for compaction of a single datasource.
+ */
+public class CompactionSupervisor implements Supervisor
+{
+  private static final Logger log = new Logger(CompactionSupervisor.class);
+
+  private final CompactionScheduler scheduler;
+  private final CompactionSupervisorSpec supervisorSpec;
+
+  public CompactionSupervisor(
+      CompactionSupervisorSpec supervisorSpec,
+      CompactionScheduler scheduler
+  )
+  {
+    this.supervisorSpec = supervisorSpec;
+    this.scheduler = scheduler;
+  }
+
+  @Override
+  public void start()
+  {
+    final String dataSource = getDataSource();

Review Comment:
   Nit - it might be simpler to have dataSource initialized as a field to avoid 
many `getDataSource` calls. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.server.compaction.CompactionSimulateResult;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+
+import java.util.Map;
+
+public interface CompactionScheduler

Review Comment:
   nit: javadocs



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+
+/**
+ * Supervisor for compaction of a single datasource.
+ */
+public class CompactionSupervisor implements Supervisor
+{
+  private static final Logger log = new Logger(CompactionSupervisor.class);
+
+  private final CompactionScheduler scheduler;
+  private final CompactionSupervisorSpec supervisorSpec;
+
+  public CompactionSupervisor(
+      CompactionSupervisorSpec supervisorSpec,
+      CompactionScheduler scheduler
+  )
+  {
+    this.supervisorSpec = supervisorSpec;
+    this.scheduler = scheduler;
+  }
+
+  @Override
+  public void start()
+  {
+    final String dataSource = getDataSource();
+    if (supervisorSpec.isSuspended()) {
+      log.info("Suspending compaction for dataSource[%s].", dataSource);
+      scheduler.stopCompaction(dataSource);
+    } else {
+      log.info("Starting compaction for dataSource[%s].", dataSource);
+      scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
+    }
+  }
+
+  @Override
+  public void stop(boolean stopGracefully)
+  {
+    final String dataSource = getDataSource();
+    log.info("Stopping compaction for dataSource[%s].", dataSource);
+    scheduler.stopCompaction(dataSource);
+  }
+
+  @Override
+  public SupervisorReport<AutoCompactionSnapshot> getStatus()
+  {
+    final AutoCompactionSnapshot snapshot;
+    if (supervisorSpec.isSuspended()) {
+      snapshot = AutoCompactionSnapshot.builder(getDataSource())
+                                       
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
+                                       .build();
+    } else {
+      snapshot = scheduler.getCompactionSnapshot(getDataSource());
+    }
+
+    return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), 
snapshot);
+  }
+
+  @Override
+  public SupervisorStateManager.State getState()
+  {
+    if (!scheduler.isRunning()) {
+      return State.SCHEDULER_STOPPED;
+    } else if (supervisorSpec.isSuspended()) {
+      return State.SUSPENDED;
+    } else {
+      return State.RUNNING;
+    }
+  }
+
+  private String getDataSource()
+  {
+    return supervisorSpec.getSpec().getDataSource();
+  }
+
+  // Un-implemented methods used only by streaming supervisors
+
+  @Override
+  public void reset(DataSourceMetadata dataSourceMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public LagStats computeLagStats()
+  {
+    return new LagStats(0L, 0L, 0L);
+  }
+
+  @Override
+  public int getActiveTaskGroupsCount()
+  {
+    return 0;

Review Comment:
   can you add a comment as to why it is 0? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.compact.CompactionScheduler;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.http.security.StateResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Contains the same logic as {@code CompactionResource} but the APIs are 
served
+ * by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
+ */
+@Path("/druid/indexer/v1/compaction")
+public class OverlordCompactionResource
+{
+  private final CompactionScheduler scheduler;
+
+  @Inject
+  public OverlordCompactionResource(
+      CompactionScheduler scheduler
+  )
+  {
+    this.scheduler = scheduler;
+  }
+
+  @GET

Review Comment:
   can you add a nicer error message if the query param is not passed? I am 
assuming that won't work. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java:
##########


Review Comment:
   Do we need this interface? 



##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSchedulerTest.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.junit.Test;
+
+/**
+ * Tests the scheduling behaviour of the Compaction Scheduler and not the
+ * compaction of segments itself. There are other tests which already verify
+ * the compaction of segments with different configs and datasources.
+ */
+public class CompactionSchedulerTest
+{
+  // what are the different aspects we would like to test
+  // task status updates
+  //
+  // config
+  //
+
+  // there should also be a test where we can do some sort of comparison of 
the two things
+
+  // CompactSegmentsTest is not the right place for that because the entry 
point is CompactSegments
+
+  // For us, the entry point is DruidCoordinator (i.e. sim) vs Compaction 
Scheduler.

Review Comment:
   leftover file? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.compact.CompactionScheduler;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.http.security.StateResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Contains the same logic as {@code CompactionResource} but the APIs are 
served
+ * by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
+ */
+@Path("/druid/indexer/v1/compaction")
+public class OverlordCompactionResource

Review Comment:
   can you rename the other one to CoordinatorCompactionResource? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+
+/**
+ * Supervisor for compaction of a single datasource.
+ */
+public class CompactionSupervisor implements Supervisor
+{
+  private static final Logger log = new Logger(CompactionSupervisor.class);
+
+  private final CompactionScheduler scheduler;
+  private final CompactionSupervisorSpec supervisorSpec;
+
+  public CompactionSupervisor(
+      CompactionSupervisorSpec supervisorSpec,
+      CompactionScheduler scheduler
+  )
+  {
+    this.supervisorSpec = supervisorSpec;
+    this.scheduler = scheduler;
+  }
+
+  @Override
+  public void start()
+  {
+    final String dataSource = getDataSource();
+    if (supervisorSpec.isSuspended()) {
+      log.info("Suspending compaction for dataSource[%s].", dataSource);
+      scheduler.stopCompaction(dataSource);
+    } else {
+      log.info("Starting compaction for dataSource[%s].", dataSource);
+      scheduler.startCompaction(dataSource, supervisorSpec.getSpec());
+    }
+  }
+
+  @Override
+  public void stop(boolean stopGracefully)
+  {
+    final String dataSource = getDataSource();
+    log.info("Stopping compaction for dataSource[%s].", dataSource);
+    scheduler.stopCompaction(dataSource);
+  }
+
+  @Override
+  public SupervisorReport<AutoCompactionSnapshot> getStatus()
+  {
+    final AutoCompactionSnapshot snapshot;
+    if (supervisorSpec.isSuspended()) {
+      snapshot = AutoCompactionSnapshot.builder(getDataSource())
+                                       
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
+                                       .build();
+    } else {
+      snapshot = scheduler.getCompactionSnapshot(getDataSource());
+    }
+
+    return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), 
snapshot);
+  }
+
+  @Override
+  public SupervisorStateManager.State getState()
+  {
+    if (!scheduler.isRunning()) {
+      return State.SCHEDULER_STOPPED;
+    } else if (supervisorSpec.isSuspended()) {
+      return State.SUSPENDED;
+    } else {
+      return State.RUNNING;
+    }
+  }
+
+  private String getDataSource()
+  {
+    return supervisorSpec.getSpec().getDataSource();
+  }
+
+  // Un-implemented methods used only by streaming supervisors
+
+  @Override
+  public void reset(DataSourceMetadata dataSourceMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
+  {
+    // Do nothing
+  }
+
+  @Override
+  public LagStats computeLagStats()
+  {
+    return new LagStats(0L, 0L, 0L);
+  }
+
+  @Override
+  public int getActiveTaskGroupsCount()
+  {
+    return 0;
+  }

Review Comment:
   Might be worth moving to a `BatchJobSupervisor` abstract class. Also, 
shouldn't these thrown an exception so we know these are not used at all since 
they are not supposed to be used at all. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.compact.CompactionScheduler;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.http.security.StateResourceFilter;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Contains the same logic as {@code CompactionResource} but the APIs are 
served
+ * by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
+ */
+@Path("/druid/indexer/v1/compaction")
+public class OverlordCompactionResource
+{
+  private final CompactionScheduler scheduler;
+
+  @Inject
+  public OverlordCompactionResource(
+      CompactionScheduler scheduler
+  )
+  {
+    this.scheduler = scheduler;
+  }
+
+  @GET
+  @Path("/progress")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getCompactionProgress(
+      @QueryParam("dataSource") String dataSource
+  )
+  {
+    final Long notCompactedSegmentSizeBytes = 
scheduler.getSegmentBytesAwaitingCompaction(dataSource);
+    if (notCompactedSegmentSizeBytes == null) {
+      return Response.status(Response.Status.NOT_FOUND)
+                     .entity(Collections.singletonMap("error", "Unknown 
DataSource"))
+                     .build();
+    } else {
+      return Response.ok(Collections.singletonMap("remainingSegmentSize", 
notCompactedSegmentSizeBytes))
+                     .build();
+    }
+  }
+
+  @GET
+  @Path("/status")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getCompactionSnapshotForDataSource(
+      @QueryParam("dataSource") String dataSource
+  )
+  {
+    final Collection<AutoCompactionSnapshot> snapshots;
+    if (dataSource == null || dataSource.isEmpty()) {
+      snapshots = scheduler.getAllCompactionSnapshots().values();
+    } else {
+      AutoCompactionSnapshot autoCompactionSnapshot = 
scheduler.getCompactionSnapshot(dataSource);
+      if (autoCompactionSnapshot == null) {
+        return Response.status(Response.Status.NOT_FOUND)
+                       .entity(Collections.singletonMap("error", "Unknown 
DataSource"))
+                       .build();
+      }
+      snapshots = Collections.singleton(autoCompactionSnapshot);
+    }
+    return Response.ok(Collections.singletonMap("latestStatus", 
snapshots)).build();
+  }
+
+  @POST
+  @Path("/simulate")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response simulateClusterCompactionConfigUpdate(

Review Comment:
   this too can be restricted like other APIs even if it doesn't change 
anything. 



##########
server/src/main/java/org/apache/druid/server/AsyncManagementForwardingServlet.java:
##########
@@ -99,7 +109,15 @@ protected void service(HttpServletRequest request, 
HttpServletResponse response)
   {
     String currentLeader;
     String requestURI = StringUtils.toLowerCase(request.getRequestURI());
-    if (requestURI.startsWith(STANDARD_COORDINATOR_BASE_PATH)) {
+    if (compactionSupervisorsConfig.isEnabled()
+        && requestURI.startsWith(COMPACTION_COORDINATOR_PATH)) {
+      // If Compaction Scheduler is enabled, compaction APIs must be forwarded 
to the Overlord
+      currentLeader = overlordLeaderSelector.getCurrentLeader();
+      request.setAttribute(
+          MODIFIED_PATH_ATTRIBUTE,
+          StringUtils.replace(request.getRequestURI(), 
COMPACTION_COORDINATOR_PATH, COMPACTION_OVERLORD_PATH)
+      );
+    } else if (requestURI.startsWith(STANDARD_COORDINATOR_BASE_PATH)) {

Review Comment:
   how does a gradual shift of compaction from coordinator to overlord look 
like? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskQueryTool;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.compaction.CompactionRunSimulator;
+import org.apache.druid.server.compaction.CompactionSimulateResult;
+import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionSupervisorsConfig;
+import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Compaction scheduler that runs on the Overlord if {@link 
CompactionSupervisorsConfig}
+ * is enabled.
+ * <p>
+ * Usage:
+ * <ul>
+ * <li>When an active {@link CompactionSupervisor} starts, it should register
+ * itself by calling {@link #startCompaction}.</li>
+ * <li>When a suspended {@link CompactionSupervisor} starts, it should stop
+ * compaction by calling {@link #stopCompaction}.</li>
+ * <li>When stopping, any {@link CompactionSupervisor} (active or suspended)
+ * should call {@link #stopCompaction}.</li>
+ * </ul>
+ */
+public class OverlordCompactionScheduler implements CompactionScheduler
+{
+  private static final Logger log = new 
Logger(OverlordCompactionScheduler.class);
+
+  private static final long SCHEDULE_PERIOD_SECONDS = 5;
+  private static final Duration METRIC_EMISSION_PERIOD = 
Duration.standardMinutes(5);
+
+  private final SegmentsMetadataManager segmentManager;
+  private final OverlordClient overlordClient;
+  private final ServiceEmitter emitter;
+
+  private final CompactionSupervisorsConfig schedulerConfig;
+  private final Supplier<DruidCompactionConfig> compactionConfigSupplier;
+  private final ConcurrentHashMap<String, DataSourceCompactionConfig> 
activeDatasourceConfigs;
+
+  /**
+   * Single-threaded executor to process the compaction queue.
+   */
+  private final ScheduledExecutorService executor;
+
+  private final CompactionStatusTracker statusTracker;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final CompactSegments duty;
+
+  /**
+   * The scheduler should enable/disable polling of segments only if the 
Overlord
+   * is running in standalone mode, otherwise this is handled by the 
DruidCoordinator
+   * class itself.
+   */
+  private final boolean shouldPollSegments;
+
+  private final Stopwatch sinceStatsEmitted = Stopwatch.createUnstarted();
+
+  @Inject
+  public OverlordCompactionScheduler(
+      TaskMaster taskMaster,
+      TaskQueryTool taskQueryTool,
+      SegmentsMetadataManager segmentManager,
+      Supplier<DruidCompactionConfig> compactionConfigSupplier,
+      CompactionStatusTracker statusTracker,
+      CompactionSupervisorsConfig schedulerConfig,
+      CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
+      ScheduledExecutorFactory executorFactory,
+      ServiceEmitter emitter,
+      ObjectMapper objectMapper
+  )
+  {
+    this.segmentManager = segmentManager;
+    this.emitter = emitter;
+    this.schedulerConfig = schedulerConfig;
+    this.compactionConfigSupplier = compactionConfigSupplier;
+
+    this.executor = executorFactory.create(1, "CompactionScheduler-%s");
+    this.statusTracker = statusTracker;
+    this.shouldPollSegments = segmentManager != null
+                              && !coordinatorOverlordServiceConfig.isEnabled();
+    this.overlordClient = new LocalOverlordClient(taskMaster, taskQueryTool, 
objectMapper);
+    this.duty = new CompactSegments(this.statusTracker, overlordClient);
+    this.activeDatasourceConfigs = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void start()
+  {
+    if (isEnabled() && started.compareAndSet(false, true)) {
+      log.info("Starting compaction scheduler.");
+      initState();
+      scheduleOnExecutor(this::checkSchedulingStatus, SCHEDULE_PERIOD_SECONDS);
+    }
+  }
+
+  @Override
+  public void stop()
+  {
+    if (isEnabled() && started.compareAndSet(true, false)) {
+      log.info("Stopping compaction scheduler.");
+      cleanupState();
+    }
+  }
+
+  @Override
+  public boolean isRunning()
+  {
+    return isEnabled() && started.get();
+  }
+
+  @Override
+  public void startCompaction(String dataSourceName, 
DataSourceCompactionConfig config)
+  {
+    activeDatasourceConfigs.put(dataSourceName, config);

Review Comment:
   there should be some check here to make sure that compaction scheduler has 
been started. 



##########
indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.guice.IndexingServiceTuningConfigModule;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.CompactionTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskQueryTool;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.server.compaction.CompactionStatistics;
+import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.CompactionSupervisorsConfig;
+import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCompactionConfig;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import 
org.apache.druid.server.coordinator.simulate.TestSegmentsMetadataManager;
+import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.List;
+
+public class OverlordCompactionSchedulerTest
+{
+  private static final ObjectMapper OBJECT_MAPPER;
+
+  static {
+    OBJECT_MAPPER = new DefaultObjectMapper();
+    OBJECT_MAPPER.registerModules(new 
IndexingServiceTuningConfigModule().getJacksonModules());
+    OBJECT_MAPPER.setInjectableValues(
+        new InjectableValues
+            .Std()
+            .addValue(
+                SegmentCacheManagerFactory.class,
+                new SegmentCacheManagerFactory(TestIndex.INDEX_IO, 
OBJECT_MAPPER)
+            )
+    );
+  }
+
+  private CompactionSupervisorsConfig schedulerConfig;
+  private DruidCompactionConfig compactionConfig;
+  private CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig;
+
+  private TaskMaster taskMaster;
+  private TaskRunner taskRunner;
+  private TaskQueue taskQueue;
+  private BlockingExecutorService executor;
+
+  private HeapMemoryTaskStorage taskStorage;
+  private TestSegmentsMetadataManager segmentsMetadataManager;
+  private StubServiceEmitter serviceEmitter;
+
+  private OverlordCompactionScheduler scheduler;
+
+  @Before
+  public void setUp()
+  {
+    taskRunner = Mockito.mock(TaskRunner.class);
+    taskQueue = Mockito.mock(TaskQueue.class);
+
+    taskMaster = new TaskMaster(null, null);
+    taskMaster.becomeLeader(taskRunner, taskQueue);
+
+    taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
+
+    executor = new BlockingExecutorService("test");
+    serviceEmitter = new StubServiceEmitter();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+
+    schedulerConfig = new CompactionSupervisorsConfig(true);
+    compactionConfig = DruidCompactionConfig.empty();
+    coordinatorOverlordServiceConfig = new 
CoordinatorOverlordServiceConfig(false, null);
+
+    initScheduler();
+  }
+
+  private void initScheduler()
+  {
+    TaskLockbox taskLockbox = new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator());
+    WorkerBehaviorConfig defaultWorkerConfig
+        = new 
DefaultWorkerBehaviorConfig(WorkerBehaviorConfig.DEFAULT_STRATEGY, null);
+    scheduler = new OverlordCompactionScheduler(
+        taskMaster,
+        new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, () -> 
defaultWorkerConfig),
+        segmentsMetadataManager,
+        () -> compactionConfig,
+        new CompactionStatusTracker(OBJECT_MAPPER),
+        schedulerConfig,
+        coordinatorOverlordServiceConfig,
+        (nameFormat, numThreads) -> new 
WrappingScheduledExecutorService("test", executor, false),
+        serviceEmitter,
+        OBJECT_MAPPER
+    );
+  }
+
+  @After
+  public void tearDown()
+  {
+
+  }

Review Comment:
   can be removed



##########
server/src/main/java/org/apache/druid/server/http/CompactionResource.java:
##########
@@ -100,4 +114,31 @@ public Response getCompactionSnapshotForDataSource(
     }
     return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
   }
+
+  @POST
+  @Path("/simulate")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response simulateClusterCompactionConfigUpdate(
+      ClusterCompactionConfig updatePayload
+  )
+  {
+    if (compactionSupervisorsConfig.isEnabled()) {
+      return buildErrorResponseWhenRunningAsSupervisor();
+    }
+
+    return Response.ok().entity(
+        coordinator.simulateRunWithConfigUpdate(updatePayload)
+    ).build();
+  }
+
+  private Response buildErrorResponseWhenRunningAsSupervisor()
+  {
+    return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
+        ImmutableMap.of(
+            "error",
+            "Compaction has been disabled on the Coordinator."
+            + " Use Overlord APIs to fetch compaction status."
+        )
+    ).build();
+  }

Review Comment:
   Or we don't return an error but log a warning? 



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks status of recently submitted compaction tasks. Can be used by a 
segment
+ * search policy to skip an interval if it has been recently compacted or if it
+ * keeps failing repeatedly.
+ */
+public class CompactionStatusTracker

Review Comment:
   there should be a way to forget the state. That is don't compact an interval 
if its been failing but then try again after X minutes. 



##########
services/src/main/java/org/apache/druid/cli/CliRouter.java:
##########
@@ -101,6 +102,7 @@ protected List<? extends Module> getModules()
           JsonConfigProvider.bind(binder, "druid.router", 
TieredBrokerConfig.class);
           JsonConfigProvider.bind(binder, "druid.router.avatica.balancer", 
AvaticaConnectionBalancer.class);
           JsonConfigProvider.bind(binder, "druid.router.managementProxy", 
ManagementProxyConfig.class);
+          JsonConfigProvider.bind(binder, "druid.compaction.supervisors", 
CompactionSupervisorsConfig.class);

Review Comment:
   can `druid.compaction.supervisors` be declared as constant somewhere or this 
thing moved into its own module for reuse? 



##########
server/src/main/java/org/apache/druid/server/compaction/Table.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A simple table POJO with any number of rows and specified column names.
+ */
+public class Table

Review Comment:
   what is this used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to