Jackie-Jiang commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2430572598


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -210,6 +210,16 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   // reingestion metrics
   SEGMENT_REINGESTION_FAILURE("segments", false),
 
+  // ThrottleOnCriticalHeapUsageExecutor meters
+  THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL("count", true,

Review Comment:
   (minor) Suggest removing `_TOTAL` suffix and update the description given it 
is a meter, same for other meters
   ```suggestion
     THROTTLE_EXECUTOR_QUEUED_TASKS("count", true, "Number of tasks that have 
been queued in the throttle executor")
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
+ * Heap Usage level is obtained from {@link 
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  // Defaults are kept near config in CommonConstants; no local defaults 
needed here
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final BlockingQueue<QueuedTask<?>> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private long _lastQueueStatusLogTime = 0;
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+    super(executorService);
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded 
queue
+    _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new 
LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    // Register metrics and gauge for queue activity tracking
+    ServerMetrics metrics = ServerMetrics.get();
+    metrics.setOrUpdateGlobalGauge(ServerGauge.THROTTLE_EXECUTOR_QUEUE_SIZE, 
() -> (long) _taskQueue.size());
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
default timeout: {}ms, "
+            + "monitor interval: {}ms (maxQueueSize<=0 => unbounded)", 
maxQueueSize, defaultQueueTimeoutMs,
+        monitorIntervalMs);
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return QueryThreadContext.get().getAccountant().throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}", 
initialQueueSize);
+        _lastQueueStatusLogTime = currentTime;
+        // Metrics are exported via ServerMetrics global gauges registered by 
callers
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask<?> queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > queuedTask.getTimeoutMs()) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.THROTTLE_EXECUTOR_TIMED_OUT_TASKS_TOTAL,
 1);
+            LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)", 
queueTime, queuedTask.getTimeoutMs());
+          } else {
+            // Submit the task for execution on the underlying executor (avoid 
double decoration)
+            super.execute(new FromQueueRunnable(() -> {

Review Comment:
   I think an easier way would be to directly call `_executorService.execute()` 
(make it protected access in `DecoratorExecutorService`). Having another check 
in `decorate()` is not intuitive



##########
pinot-core/src/main/java/org/apache/pinot/core/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.executor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.executor.DecoratorExecutorService;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
+ * Heap Usage level is obtained from {@link 
ThreadAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ */
+public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  // Defaults are kept near config in CommonConstants; no local defaults 
needed here
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final BlockingQueue<QueuedTask<?>> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private long _lastQueueStatusLogTime = 0;
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
+    super(executorService);
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    // If maxQueueSize <= 0, use an unbounded queue; otherwise, use bounded 
queue
+    _taskQueue = (maxQueueSize <= 0) ? new LinkedBlockingQueue<>() : new 
LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    // Register metrics and gauge for queue activity tracking
+    ServerMetrics metrics = ServerMetrics.get();

Review Comment:
   (minor) Cache it as a final member variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to