architjainjain commented on code in PR #6501:
URL: https://github.com/apache/hive/pull/6501#discussion_r3457173405


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java:
##########
@@ -795,6 +808,20 @@ public String getSessionId() {
 
   protected final void setTezClient(TezClient session) {
     this.session = session;
+
+    // Initialize YarnClient for queue metrics collection
+    if (session != null && yarnClient == null) {
+      try {
+        yarnClient = YarnClient.createYarnClient();
+        yarnClient.init(conf);
+        yarnClient.start();
+        LOG.info("YarnClient initialized for session: {}", sessionId);
+      } catch (Exception e) {
+        LOG.warn("Failed to initialize YarnClient for metrics collection: {}", 
e.getMessage());
+        LOG.debug("Full exception for YarnClient initialization failure", e);

Review Comment:
   done



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java:
##########
@@ -144,6 +150,53 @@ private RenderStrategy.UpdateFunction updateFunction() {
         : new RenderStrategy.LogToFileFunction(this, perfLogger);
   }
 
+  private YarnQueueMetricsCollector initializeMetricsCollector() {
+    // Get refresh interval - controls whether the feature is enabled.
+    // interval <= 0 means disabled (default is 0s = disabled).
+    long refreshInterval = HiveConf.getTimeVar(hiveConf,
+        ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 
TimeUnit.MILLISECONDS);
+
+    if (refreshInterval <= 0) {
+      LOG.debug("Queue metrics collection disabled (refresh interval: {}ms)", 
refreshInterval);
+      return null;
+    }
+
+    try {
+      // Get YarnClient from session
+      YarnClient yarnClient = session.getYarnClient();
+      if (yarnClient == null) {
+        LOG.warn("YarnClient not available, skipping queue metrics 
collection");
+        return null;
+      }
+
+      // Get queue name, default to "default" if not specified
+      String queueName = session.getQueueName();
+      if (queueName == null || queueName.trim().isEmpty()) {
+        queueName = "default";
+        LOG.info("Queue name not specified, using default queue");
+      }
+
+      // Validate minimum refresh interval (at least 1 second)
+      if (refreshInterval < 1000) {
+        LOG.warn("Queue metrics refresh interval {}ms is less than minimum 
1000ms, using 1000ms",
+            refreshInterval);
+        refreshInterval = 1000;
+      }
+
+      // Get query ID from DAG name
+      String queryId = dag.getName();
+
+      LOG.info("Initializing YARN queue metrics collector for queue: {}, 
refresh interval: {}ms",
+          queueName, refreshInterval);
+
+      return new YarnQueueMetricsCollector(yarnClient, queueName, 
refreshInterval, queryId);
+    } catch (Exception e) {
+      LOG.warn("Unable to initialize YARN queue metrics collector: {}", 
e.getMessage());
+      LOG.debug("Full exception for queue metrics initialization failure", e);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to