kfaraz commented on code in PR #19179:
URL: https://github.com/apache/druid/pull/19179#discussion_r2958381474


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java:
##########
@@ -368,6 +372,16 @@ public Response simulateRunWithConfigUpdate(
     ).build();
   }
 
+  @POST
+  @Path("/dryRun")

Review Comment:
   The `/simulate` API is already meant to do a dry run. Please don't add 
another API for the same purpose.
   We should fix the simulator instead.
   It is okay to break backward compatibility with the `/simulate` API since it 
is not used anywhere being experimental and not documented.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -370,45 +378,56 @@ private synchronized void scheduledRun()
   }
 
   /**
-   * Creates and launches eligible compaction jobs.
+   * Creates a new compaction job queue and enqueues eligible jobs.
+   * In dry run mode, no jobs are launched and no metrics are emitted but 
detailed stats are collected.
    */
-  private synchronized void resetCompactionJobQueue()
+  private synchronized void resetCompactionJobQueue(
+      boolean dryRun,
+      ClusterCompactionConfig clusterCompactionConfig,
+      @Nullable CompactionStatusDetailedStats detailedStats
+  )
   {
-    // Remove the old queue so that no more jobs are added to it
-    latestJobQueue.set(null);
-
-    final Stopwatch runDuration = Stopwatch.createStarted();
-    final DataSourcesSnapshot dataSourcesSnapshot = getDatasourceSnapshot();
-    final CompactionJobQueue queue = new CompactionJobQueue(
-        dataSourcesSnapshot,
-        getLatestClusterConfig(),
-        statusTracker,
-        taskActionClientFactory,
-        taskLockbox,
-        overlordClient,
-        brokerClient,
-        objectMapper,
-        indexingStateStorage,
-        indexingStateCache
-    );
-    latestJobQueue.set(queue);
+    synchronized (lock) {

Review Comment:
   Please don't add another lock, this method is already `synchronized`.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -266,17 +309,40 @@ public void test_minorCompactionWithMSQ(PartitionsSpec 
partitionsSpec) throws Ex
             .withIoConfig(new UserCompactionTaskIOConfig(true))
             
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
             .build();
-
-    runCompactionWithSpec(dayGranularityConfig);
+    String supervisorId = enableSupervisor(dayGranularityConfig);
+
+    Map<CompactionStatus.State, Table> result1 = dryRun(CompactionEngine.MSQ, 
policy).getCompactionStates();
+    // Expect dry run to return 1 compaction job with 2 segments
+    System.out.println("--- result1: " + result1);

Review Comment:
   Note: please remove this before committing.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -784,12 +872,12 @@ private void 
verifyCompactedSegmentsHaveFingerprints(DataSourceCompactionConfig
         });
   }
 
-  private void runCompactionWithSpec(DataSourceCompactionConfig config)
+  private String enableSupervisor(DataSourceCompactionConfig config)
   {
-    cluster.callApi().postSupervisor(new CompactionSupervisorSpec(config, 
false, null));
+    return cluster.callApi().postSupervisor(new 
CompactionSupervisorSpec(config, false, null));
   }
 
-  private void pauseCompaction(DataSourceCompactionConfig config)
+  private void terminateSupervisor(String supervisorId)

Review Comment:
   Method name and arg have changed but the method contents haven't.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -370,45 +378,56 @@ private synchronized void scheduledRun()
   }
 
   /**
-   * Creates and launches eligible compaction jobs.
+   * Creates a new compaction job queue and enqueues eligible jobs.
+   * In dry run mode, no jobs are launched and no metrics are emitted but 
detailed stats are collected.
    */
-  private synchronized void resetCompactionJobQueue()
+  private synchronized void resetCompactionJobQueue(

Review Comment:
   Don't update this method. Instead, construct a separate 
`CompactionJobQueue`, if needed in the simulate method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -503,6 +526,16 @@ public CompactionSimulateResult 
simulateRunWithConfigUpdate(ClusterCompactionCon
     }
   }
 
+  @Override
+  public CompactionStatusDetailedStats 
dryRunWithConfig(ClusterCompactionConfig config)

Review Comment:
   Remove this method. In the `simulateRunWithConfigUpdate` method, create a 
fresh test-only instance of `CompactionJobQueue`. None of the actual run 
methods like `resetCompactionJobQueue` etc should be called in the simulate 
flow. The actual `CompactionStatusTracker` should be used only in a read-only 
capacity, same as in `CompactionRunSimulator`.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusDetailedStats.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.error.DruidException;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Collects detailed compaction statistics in table format during dry run mode 
in overlord.
+ */
+public class CompactionStatusDetailedStats

Review Comment:
   Instead of adding a new class, update the `CompactionSimulateResult`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java:
##########
@@ -107,6 +107,7 @@ public CompactionJobQueue(
       DataSourcesSnapshot dataSourcesSnapshot,
       ClusterCompactionConfig clusterCompactionConfig,
       CompactionStatusTracker statusTracker,
+      boolean dryRun,

Review Comment:
   Please call this flag `simulate` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java:
##########
@@ -354,6 +354,10 @@ public Response getDatasourceCompactionConfigHistory(
     }
   }
 
+  /**
+   * @deprecated Use {@link #dryRun(ClusterCompactionConfig)} instead
+   */
+  @Deprecated

Review Comment:
   Let's not deprecate this API.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -99,19 +104,12 @@ public List<CompactionJob> createCompactionJobs(
                 .getCompactionPolicy()
                 .checkEligibilityForCompaction(candidate, 
params.getLatestTaskStatus(candidate));
       if (!eligibility.isEligible()) {
+        params.collectCompactionStatus(candidate, eligibility.getReason());
         continue;
       }
-      final CompactionCandidate finalCandidate;
       switch (eligibility.getMode()) {

Review Comment:
   We don't need this switch anymore.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -187,7 +192,13 @@ public CompactionStatus getCurrentStatus()
    */
   public CompactionCandidate withCurrentStatus(CompactionStatus status)
   {
-    return new CompactionCandidate(segments, umbrellaInterval, 
compactionInterval, numIntervals, status);
+    return new CompactionCandidate(
+        segments,
+        umbrellaInterval,
+        compactionInterval,
+        Math.toIntExact(compactionStatistics.getNumIntervals()),

Review Comment:
   Instead of changing long to int here, update 
`CompactionStatistics.numIntervals` field to be an `int` instead. It should 
have been an int in the first place.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -459,6 +478,9 @@ private void onTaskFinished(String taskId, TaskStatus 
taskStatus)
 
   private void updateCompactionSnapshots(CompactionJobQueue queue)
   {
+    if (queue.isDryRun()) {

Review Comment:
   This shouldn't be needed.



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java:
##########
@@ -42,9 +42,8 @@ public class CompactionCandidate
   private final Interval umbrellaInterval;
   private final Interval compactionInterval;
   private final String dataSource;
-  private final long totalBytes;
-  private final int numIntervals;
 
+  private final CompactionStatistics compactionStatistics;

Review Comment:
   Super Nit: Maybe rename this field to `stats` to align with the getter 
`getStats()`. Also, `stats` just seems nicer 🙂 



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobParams.java:
##########
@@ -24,7 +24,9 @@
 import org.apache.druid.server.compaction.CompactionSnapshotBuilder;
 import org.apache.druid.server.compaction.CompactionTaskStatus;
 import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.SegmentTimeline;
+import org.checkerframework.checker.nullness.qual.Nullable;

Review Comment:
   Use `javax.annotation.Nullable` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -111,10 +113,16 @@ public class OverlordCompactionScheduler implements 
CompactionScheduler
   private final AtomicBoolean shouldRecomputeJobsForAnyDatasource = new 
AtomicBoolean(false);
 
   /**
-   * Compaction job queue built in the last invocation of {@link 
#resetCompactionJobQueue()}.
+   * Compaction job queue built in the last invocation of {@link 
#resetCompactionJobQueue}.
    */
   private final AtomicReference<CompactionJobQueue> latestJobQueue;
 
+  /**
+   * Lock used to synchronize access when resetting the compaction job queue.
+   * Ensures thread-safe updates to the queue and related state during queue 
resets.
+   */
+  private final Object lock = new Object();

Review Comment:
   This shouldn't be needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java:
##########
@@ -83,7 +84,15 @@ public interface CompactionScheduler
    * Simulates a compaction run with the given cluster config.
    *
    * @return Result of the simulation
+   * @deprecated Use {@link #dryRunWithConfig(ClusterCompactionConfig)} instead
    */
+  @Deprecated
   CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig 
updateRequest);
 
+  /**
+   * Dry run with the given config to view compaction status without 
submitting jobs.
+   *
+   * @return Detailed compaction statistics
+   */
+  CompactionStatusDetailedStats dryRunWithConfig(ClusterCompactionConfig 
config);

Review Comment:
   Please do not add this new method, use the existing 
`simulateRunWithConfigUpdate` method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -539,9 +571,9 @@ private DataSourcesSnapshot getDatasourceSnapshot()
     return segmentManager.getRecentDataSourcesSnapshot();
   }
 
-  private void scheduleOnExecutor(Runnable runnable, long delayMillis)
+  private ScheduledFuture<?> scheduleOnExecutor(Runnable runnable, long 
delayMillis)

Review Comment:
   Where is the future used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to