architjainjain commented on code in PR #6501:
URL: https://github.com/apache/hive/pull/6501#discussion_r3457507399


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueStatistics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Collects YARN queue resource metrics in the background using a scheduled 
executor.
+ * Provides thread-safe access to the latest metrics snapshot.
+ */
+public class YarnQueueMetricsCollector {
+  private static final Logger LOG = 
LoggerFactory.getLogger(YarnQueueMetricsCollector.class);
+  private static final Random RANDOM = new Random();
+
+  private final YarnClient yarnClient;
+  private final String queueName;
+  private final ScheduledExecutorService executorService;
+  private final AtomicReference<QueueMetricsSnapshot> snapshotRef;
+  private final AtomicBoolean isShutdown;
+
+  // Circuit breaker for handling repeated failures
+  private int consecutiveFailures = 0;
+  private static final int MAX_CONSECUTIVE_FAILURES = 5;
+  private static final int BACKOFF_THRESHOLD = 3;
+
+  /**
+   * Creates a new metrics collector that immediately begins collecting queue 
metrics.
+   *
+   * @param yarnClient The YarnClient to use for querying queue info
+   * @param queueName The queue name to monitor
+   * @param refreshIntervalMs How often to refresh metrics in milliseconds
+   * @param queryId The query ID for thread naming
+   * @throws IllegalArgumentException if yarnClient or queueName is null
+   */
+  public YarnQueueMetricsCollector(YarnClient yarnClient, String queueName,
+                                    long refreshIntervalMs, String queryId) {
+    if (yarnClient == null) {
+      throw new IllegalArgumentException("YarnClient cannot be null");
+    }
+    if (queueName == null) {
+      throw new IllegalArgumentException("Queue name cannot be null");
+    }
+
+    this.yarnClient = yarnClient;
+    this.queueName = queueName;
+    this.snapshotRef = new AtomicReference<>(null);
+    this.isShutdown = new AtomicBoolean(false);
+
+    // Create named daemon thread for metrics collection
+    this.executorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder()
+            .setNameFormat("yarn-queue-metrics-collector-" + queryId)
+            .setDaemon(true)
+            .build()
+    );
+
+    try {
+      // Perform eager initial collection
+      collectMetrics();
+
+      // Add random jitter (0–20 % of refresh interval) to prevent thundering 
herd:
+      // when many queries start simultaneously they would otherwise all hit 
YARN RM
+      // at the same fixed intervals, causing load spikes.
+      // RANDOM.nextLong(1, 100) returns [1, 99], divide by 100 to get 
percentage, then multiply by 0.2 for 20% max.
+      long jitter = (long) (refreshIntervalMs * RANDOM.nextLong(1, 100) / 
100.0 * 0.2);
+      long initialDelay = refreshIntervalMs + jitter;
+
+      // Schedule periodic collection with jittered initial delay
+      executorService.scheduleWithFixedDelay(
+              () -> {
+                try {
+                  collectMetrics();
+                } catch (Exception e) {
+                  LOG.error("Unexpected error in scheduled metrics collection 
for queue {}: {}",
+                          queueName, e.getMessage(), e);
+                }
+              },
+              initialDelay,
+              refreshIntervalMs,
+              TimeUnit.MILLISECONDS
+      );
+
+      LOG.info("Started YARN queue metrics collector for queue: {}, refresh 
interval: {}ms, initial delay: {}ms",
+          queueName, refreshIntervalMs, initialDelay);
+    } catch (IllegalArgumentException e) {
+      // scheduleWithFixedDelay rejects a zero or negative period; clean up and
+      // rethrow with context so the caller can log and skip gracefully.
+      executorService.shutdownNow();
+      throw new IllegalArgumentException(
+          "Invalid refresh interval " + refreshIntervalMs + "ms for queue: " + 
queueName, e);
+    } catch (RuntimeException e) {
+      // Any other runtime failure during initialisation — clean up to prevent 
thread leak.
+      LOG.error("Failed to initialize metrics collector for queue {}, shutting 
down executor",
+          queueName, e);
+      executorService.shutdownNow();
+      throw new IllegalStateException(
+          "Failed to initialize YARN queue metrics collector for queue: " + 
queueName, e);
+    }
+  }
+
+  /**
+   * Checks if an exception is or was caused by an InterruptedException.
+   *
+   * @param e The exception to check
+   * @return true if the exception is an InterruptedException or has one as 
its cause
+   */
+  private boolean isInterruptedException(Exception e) {
+    return e instanceof InterruptedException || e.getCause() instanceof 
InterruptedException;
+  }
+
+  /**
+   * Collects queue metrics and updates the snapshot.
+   * Handles all exceptions gracefully by setting snapshot to null.
+   * Implements circuit breaker pattern to back off on repeated failures.
+   */
+  private void collectMetrics() {
+    // Circuit breaker: Skip collection if too many consecutive failures
+    // This prevents hammering a struggling YARN ResourceManager
+    if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) {
+      if (consecutiveFailures == MAX_CONSECUTIVE_FAILURES) {
+        LOG.warn("Queue metrics collection has failed {} times consecutively 
for queue {}. " +
+                "Temporarily reducing collection attempts to avoid overloading 
YARN RM. " +
+                "Will retry periodically.", MAX_CONSECUTIVE_FAILURES, 
queueName);
+        consecutiveFailures++; // Increment to avoid repeated logging
+      }
+      // Still attempt collection occasionally, but skip most attempts
+      if (RANDOM.nextDouble() > 0.1) {  // Only try 10% of the time
+        return;
+      }
+    }
+
+    try {
+      QueueInfo queueInfo = yarnClient.getQueueInfo(queueName);
+      if (queueInfo != null) {
+        QueueMetricsSnapshot snapshot = new QueueMetricsSnapshot(queueInfo);
+        snapshotRef.set(snapshot);
+
+        // Success - reset circuit breaker
+        if (consecutiveFailures > 0) {
+          LOG.info("Queue metrics collection recovered for queue {} after {} 
failures",
+                   queueName, consecutiveFailures);
+          consecutiveFailures = 0;
+        }
+
+        LOG.debug("Collected queue metrics for {}: memory={}/{} GB, 
vCores={}/{}",
+            queueName, snapshot.memoryUsedGB, snapshot.memoryTotalGB,
+            snapshot.vCoresUsed, snapshot.vCoresTotal);
+      } else {
+        LOG.warn("QueueInfo is null for queue: {}", queueName);
+        consecutiveFailures++;
+        snapshotRef.set(null);

Review Comment:
   done, no failure increment in case of null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to