gianm commented on code in PR #18886:
URL: https://github.com/apache/druid/pull/18886#discussion_r2671562564


##########
multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java:
##########
@@ -19,54 +19,237 @@
 
 package org.apache.druid.msq.dart.controller;
 
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Registry for actively-running {@link Controller}.
+ * Registry for actively-running {@link Controller} and recently-completed 
{@link TaskReport}.
  */
+@ManageLifecycle
 public class DartControllerRegistry
 {
+  /**
+   * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs 
to be run.
+   */
+  private static final long MIN_CLEANUP_CHECK_MILLIS = 10_000;
+
+  private final DartControllerConfig config;
+
+  /**
+   * Map of Dart query ID -> controller for currently-running queries.
+   */
   private final ConcurrentHashMap<String, ControllerHolder> controllerMap = 
new ConcurrentHashMap<>();
 
+  /**
+   * Map of Dart query ID -> timestamped report for completed queries.
+   */
+  @GuardedBy("completeReports")
+  private final LinkedHashMap<String, QueryInfoAndReport> completeReports = 
new LinkedHashMap<>();
+
+  /**
+   * Map of SQL query ID -> Dart query ID. Used by {@link 
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+   * entry for every query in either {@link #controllerMap} or {@link 
#completeReports}.
+   *
+   * It is possible for the same SQL query ID to map to multiple Dart query 
IDs, because SQL query IDs can be set
+   * by the user, and uniqueness is not a required. If this occurs case, we go 
with the first one encountered
+   * and ignore the others.
+   */
+  private final ConcurrentHashMap<String, String> sqlQueryIdToDartQueryId = 
new ConcurrentHashMap<>();
+
+  /**
+   * Executor for cleaning up reports older than {@link 
DartControllerConfig#getMaxRetainedReportDuration()}.
+   */
+  private ScheduledExecutorService cleanupExec;
+
+  @Inject
+  public DartControllerRegistry(final DartControllerConfig config)
+  {
+    this.config = config;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    // Schedule periodic cleanup of expired reports.
+    if (!config.getMaxRetainedReportDuration().equals(Period.ZERO)) {
+      final String threadNameFormat = 
StringUtils.format("%s-ReportCleanupExec-%%s", getClass().getSimpleName());
+      final long cleanupPeriodMs = Math.max(
+          MIN_CLEANUP_CHECK_MILLIS,
+          
config.getMaxRetainedReportDuration().toStandardDuration().getMillis() / 10
+      );
+      cleanupExec = Execs.scheduledSingleThreaded(threadNameFormat);
+      cleanupExec.scheduleAtFixedRate(
+          this::cleanupExpiredReports,
+          cleanupPeriodMs,
+          cleanupPeriodMs,
+          TimeUnit.MILLISECONDS
+      );
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    if (cleanupExec != null) {
+      cleanupExec.shutdown();
+    }
+  }
+
   /**
    * Add a controller. Throws {@link DruidException} if a controller with the 
same {@link Controller#queryId()} is
    * already registered.
    */
   public void register(ControllerHolder holder)
   {
-    if (controllerMap.putIfAbsent(holder.getController().queryId(), holder) != 
null) {
-      throw DruidException.defensive("Controller[%s] already registered", 
holder.getController().queryId());
+    final String dartQueryId = holder.getController().queryId();
+    if (controllerMap.putIfAbsent(dartQueryId, holder) != null) {
+      throw DruidException.defensive("Controller[%s] already registered", 
dartQueryId);
     }
+    sqlQueryIdToDartQueryId.putIfAbsent(holder.getSqlQueryId(), dartQueryId);
   }
 
   /**
-   * Remove a controller from the registry.
+   * Remove a controller from the registry. Optionally registers a report that 
will be available for some
+   * time afterwards, based on {@link 
DartControllerConfig#getMaxRetainedReportCount()} and
+   * {@link DartControllerConfig#getMaxRetainedReportDuration()}.
    */
-  public void deregister(ControllerHolder holder)
+  public void deregister(ControllerHolder holder, @Nullable 
TaskReport.ReportMap completeReport)
   {
+    final String dartQueryId = holder.getController().queryId();
+
     // Remove only if the current mapping for the queryId is this specific 
controller.
-    controllerMap.remove(holder.getController().queryId(), holder);
+    final boolean didRemove = controllerMap.remove(dartQueryId, holder);
+
+    // Add completeReport to completeReports, if present, and if we actually 
did deregister this specific controller.
+    if (didRemove && completeReport != null && 
config.getMaxRetainedReportCount() > 0) {
+      synchronized (completeReports) {
+        // Remove reports if size is greater than maxRetainedReportCount - 1.
+        int reportsToRemove = completeReports.size() - 
config.getMaxRetainedReportCount() + 1;

Review Comment:
   I think so too, just being defensive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to