gianm commented on code in PR #18886:
URL: https://github.com/apache/druid/pull/18886#discussion_r2671559005


##########
multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java:
##########
@@ -19,54 +19,237 @@
 
 package org.apache.druid.msq.dart.controller;
 
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Registry for actively-running {@link Controller}.
+ * Registry for actively-running {@link Controller} and recently-completed 
{@link TaskReport}.
  */
+@ManageLifecycle
 public class DartControllerRegistry
 {
+  /**
+   * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs 
to be run.
+   */
+  private static final long MIN_CLEANUP_CHECK_MILLIS = 10_000;
+
+  private final DartControllerConfig config;
+
+  /**
+   * Map of Dart query ID -> controller for currently-running queries.
+   */
   private final ConcurrentHashMap<String, ControllerHolder> controllerMap = 
new ConcurrentHashMap<>();
 
+  /**
+   * Map of Dart query ID -> timestamped report for completed queries.
+   */
+  @GuardedBy("completeReports")
+  private final LinkedHashMap<String, QueryInfoAndReport> completeReports = 
new LinkedHashMap<>();
+
+  /**
+   * Map of SQL query ID -> Dart query ID. Used by {@link 
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+   * entry for every query in either {@link #controllerMap} or {@link 
#completeReports}.
+   *
+   * It is possible for the same SQL query ID to map to multiple Dart query 
IDs, because SQL query IDs can be set
+   * by the user, and uniqueness is not a required. If this occurs case, we go 
with the first one encountered
+   * and ignore the others.
+   */
+  private final ConcurrentHashMap<String, String> sqlQueryIdToDartQueryId = 
new ConcurrentHashMap<>();
+
+  /**
+   * Executor for cleaning up reports older than {@link 
DartControllerConfig#getMaxRetainedReportDuration()}.
+   */
+  private ScheduledExecutorService cleanupExec;
+
+  @Inject
+  public DartControllerRegistry(final DartControllerConfig config)
+  {
+    this.config = config;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    // Schedule periodic cleanup of expired reports.
+    if (!config.getMaxRetainedReportDuration().equals(Period.ZERO)) {
+      final String threadNameFormat = 
StringUtils.format("%s-ReportCleanupExec-%%s", getClass().getSimpleName());
+      final long cleanupPeriodMs = Math.max(

Review Comment:
   I was thinking that in a production scenario, an admin probably wants to 
think about things in terms of time-based expiration (i.e. I'm going to have 
reports available for 5 minutes). I was thinking the count-based retention is 
more just to protect against the broker going OOM if there is a burst of 
queries. It would be better for the limit to be footprint-based, but the code 
for that is more complex :)



##########
multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java:
##########
@@ -19,54 +19,237 @@
 
 package org.apache.druid.msq.dart.controller;
 
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Registry for actively-running {@link Controller}.
+ * Registry for actively-running {@link Controller} and recently-completed 
{@link TaskReport}.
  */
+@ManageLifecycle
 public class DartControllerRegistry
 {
+  /**
+   * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs 
to be run.
+   */
+  private static final long MIN_CLEANUP_CHECK_MILLIS = 10_000;
+
+  private final DartControllerConfig config;
+
+  /**
+   * Map of Dart query ID -> controller for currently-running queries.
+   */
   private final ConcurrentHashMap<String, ControllerHolder> controllerMap = 
new ConcurrentHashMap<>();
 
+  /**
+   * Map of Dart query ID -> timestamped report for completed queries.
+   */
+  @GuardedBy("completeReports")
+  private final LinkedHashMap<String, QueryInfoAndReport> completeReports = 
new LinkedHashMap<>();
+
+  /**
+   * Map of SQL query ID -> Dart query ID. Used by {@link 
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+   * entry for every query in either {@link #controllerMap} or {@link 
#completeReports}.
+   *
+   * It is possible for the same SQL query ID to map to multiple Dart query 
IDs, because SQL query IDs can be set
+   * by the user, and uniqueness is not a required. If this occurs case, we go 
with the first one encountered
+   * and ignore the others.
+   */
+  private final ConcurrentHashMap<String, String> sqlQueryIdToDartQueryId = 
new ConcurrentHashMap<>();
+
+  /**
+   * Executor for cleaning up reports older than {@link 
DartControllerConfig#getMaxRetainedReportDuration()}.
+   */
+  private ScheduledExecutorService cleanupExec;
+
+  @Inject
+  public DartControllerRegistry(final DartControllerConfig config)
+  {
+    this.config = config;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    // Schedule periodic cleanup of expired reports.
+    if (!config.getMaxRetainedReportDuration().equals(Period.ZERO)) {
+      final String threadNameFormat = 
StringUtils.format("%s-ReportCleanupExec-%%s", getClass().getSimpleName());
+      final long cleanupPeriodMs = Math.max(

Review Comment:
   I was thinking that in a production scenario, an admin probably wants to 
think about things in terms of time-based expiration (i.e. I'm going to have 
reports available for 5 minutes). I was thinking the count-based retention is 
more just to protect against the broker going OOM if there is a burst of 
queries. It would be better for the limit to be footprint-based, but the code 
for that is more complex :|



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to