This is an automated email from the ASF dual-hosted git repository.

ayushtkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bb9cbf1da52 HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to 
scale down idle AMs (#6561)
bb9cbf1da52 is described below

commit bb9cbf1da52ec11cd15250bd2db1041581080822
Author: Tanishq Chugh <[email protected]>
AuthorDate: Fri Jul 3 17:33:26 2026 +0530

    HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs 
(#6561)
---
 .../llap/tezplugins/LlapTaskSchedulerService.java  |   9 ++
 .../tezplugins/metrics/LlapTaskSchedulerInfo.java  |   3 +-
 .../metrics/LlapTaskSchedulerMetrics.java          |   8 ++
 packaging/src/kubernetes/README.md                 |  16 +--
 .../src/kubernetes/helm/hive-operator/Chart.yaml   |   2 +-
 .../crds/hiveclusters.hive.apache.org-v1.yml       |   4 +
 .../helm/hive-operator/templates/clusterrole.yaml  |   4 +
 .../helm/hive-operator/templates/hivecluster.yaml  |   4 +-
 .../src/kubernetes/helm/hive-operator/values.yaml  |   5 +-
 packaging/src/kubernetes/pom.xml                   |   9 +-
 .../autoscaling/HiveClusterAutoscaler.java         |  80 +++++++++--
 .../operator/autoscaling/MetricsScraper.java       |   2 +-
 .../operator/autoscaling/PrometheusTextParser.java |   4 +-
 .../operator/autoscaling/TezAmScalingStrategy.java |  37 +++++
 .../operator/autoscaling/TezAmZkDeregister.java    | 160 +++++++++++++++++++++
 .../operator/dependent/HiveDependentResource.java  |   8 +-
 .../dependent/HiveServer2DeploymentDependent.java  |   1 +
 .../operator/dependent/LlapResourceBuilder.java    | 148 +++++++++++++++++--
 .../dependent/MetastoreDeploymentDependent.java    |   1 +
 .../operator/dependent/SchemaInitJobDependent.java |   1 +
 .../kubernetes/operator/model/HiveClusterSpec.java |   3 +
 .../operator/reconciler/HiveClusterReconciler.java |  67 ++++++---
 .../hive/kubernetes/operator/util/ConfigUtils.java |  58 ++++++++
 .../operator/util/HiveConfigBuilder.java           |  10 ++
 24 files changed, 579 insertions(+), 65 deletions(-)

diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 9ac108728fb..c75800c5546 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1080,6 +1080,9 @@ public void dagComplete() {
     writeLock.lock();
     try {
       dagRunning = false;
+      if (metrics != null) {
+        metrics.setDagRunning(false);
+      }
       dagStats = new StatsPerDag();
       int pendingCount = 0;
       for (Entry<Priority, List<TaskInfo>> entry : pendingTasks.entrySet()) {
@@ -1173,6 +1176,9 @@ public void allocateTask(Object task, Resource 
capability, String[] hosts, Strin
         metrics.setDagId(id.getDAGID().toString());
       }
       dagRunning = true;
+      if (metrics != null) {
+        metrics.setDagRunning(true);
+      }
     }
     dagStats.registerTaskRequest(hosts, racks);
     addPendingTask(taskInfo);
@@ -1194,6 +1200,9 @@ public void allocateTask(Object task, Resource 
capability, ContainerId container
         metrics.setDagId(id.getDAGID().toString());
       }
       dagRunning = true;
+      if (metrics != null) {
+        metrics.setDagRunning(true);
+      }
     }
     dagStats.registerTaskRequest(null, null);
     addPendingTask(taskInfo);
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
index 0750dc6aa7b..10878ea3149 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
@@ -38,7 +38,8 @@ public enum LlapTaskSchedulerInfo implements MetricsInfo {
   SchedulerRunningTaskCount("Total number of running tasks"),
   SchedulerPendingPreemptionTaskCount("Total number of tasks pending for 
pre-emption"),
   SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
-  SchedulerCompletedDagCount("Number of DAGs completed");
+  SchedulerCompletedDagCount("Number of DAGs completed"),
+  SchedulerDagStatus("Current AM operational DAG status: 0 for IDLE, 1 for 
RUNNING");
 
   private final String desc;
 
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index 0dd645da82c..5c94d0b0270 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -25,6 +25,7 @@
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount;
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount;
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount;
+import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDagStatus;
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount;
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount;
 import static 
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount;
@@ -84,6 +85,8 @@ public class LlapTaskSchedulerMetrics implements 
MetricsSource {
   @Metric
   MutableCounterInt completedDagcount;
   @Metric
+  MutableGaugeInt dagRunning;
+  @Metric
   MutableCounterInt pendingPreemptionTasksCount;
   @Metric
   MutableGaugeInt wmUnusedGuaranteedCount;
@@ -276,6 +279,7 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb) 
{
         .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value())
         .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value())
         .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value())
+        .addGauge(SchedulerDagStatus, dagRunning.value())
         .addCounter(SchedulerPendingTaskCount, pendingTasksCount.value())
         .addCounter(SchedulerSchedulableTaskCount, 
schedulableTasksCount.value())
         .addCounter(SchedulerRunningTaskCount, runningTasksCount.value())
@@ -285,6 +289,10 @@ private void getTaskSchedulerStats(MetricsRecordBuilder 
rb) {
         .addCounter(SchedulerCompletedDagCount, completedDagcount.value());
   }
 
+  public void setDagRunning(boolean running) {
+    dagRunning.set(running ? 1 : 0);
+  }
+
   public JvmMetrics getJvmMetrics() {
     return jvmMetrics;
   }
diff --git a/packaging/src/kubernetes/README.md 
b/packaging/src/kubernetes/README.md
index 7a9aac5aa12..2202885d459 100644
--- a/packaging/src/kubernetes/README.md
+++ b/packaging/src/kubernetes/README.md
@@ -520,7 +520,7 @@ HS2 routes sessions to clusters server-side based on 
admin-defined user/group ru
 
 Each LLAP cluster is fully isolated:
 - **Separate LLAP daemon StatefulSet** with independent executor count, 
memory, and replicas
-- **Separate TezAM StatefulSet** (one per LLAP cluster) with its own ZooKeeper 
registration
+- **Separate TezAM Deployment** (one per LLAP cluster) with its own ZooKeeper 
registration
 - **Separate autoscaling** — each cluster scales independently based on its 
own metrics
 - **Shared scratch PVC** (ReadWriteMany) for HS2 ↔ TezAM coordination files
 
@@ -618,11 +618,11 @@ For the above configuration, the operator creates:
 | Resource | Name | Purpose |
 |----------|------|---------|
 | StatefulSet | `hive-production` | LLAP daemons for production cluster |
-| StatefulSet | `hive-tezam-production` | TezAM for production cluster |
+| Deployment | `hive-tezam-production` | TezAM for production cluster |
 | StatefulSet | `hive-analytics` | LLAP daemons for analytics cluster |
-| StatefulSet | `hive-tezam-analytics` | TezAM for analytics cluster |
+| Deployment | `hive-tezam-analytics` | TezAM for analytics cluster |
 | StatefulSet | `hive-dev` | LLAP daemons for dev cluster |
-| StatefulSet | `hive-tezam-dev` | TezAM for dev cluster |
+| Deployment | `hive-tezam-dev` | TezAM for dev cluster |
 | Service (headless) | `hive-production`, `hive-analytics`, `hive-dev` | LLAP 
daemon discovery |
 | Service (headless) | `hive-tezam-production`, `hive-tezam-analytics`, 
`hive-tezam-dev` | TezAM discovery |
 | ConfigMap | `hive-production-config`, etc. | `llap-daemon-site.xml` per 
cluster |
@@ -1391,7 +1391,7 @@ setup is needed — simply connect to HS2 and the operator 
wakes LLAP/TezAM as n
 
 LLAP is configured as an array (`llapClusters`) to support multi-tenant 
deployments with
 independent scaling. Each entry creates a separate LLAP StatefulSet, Service, 
ConfigMap,
-and a paired TezAM StatefulSet (when `tezAm.enabled: true`).
+and a paired TezAM Deployment (when `tezAm.enabled: true`).
 
 | Value | Default | Description |
 |-------|---------|-------------|
@@ -1419,7 +1419,7 @@ Clients connect with just their identity — no 
cluster-specific JDBC URL params
 
 ### Tez AM
 
-TezAM is deployed as one StatefulSet per LLAP cluster. The global `tezAm` 
section
+TezAM is deployed as one Deployment per LLAP cluster. The global `tezAm` 
section
 controls shared settings (enabled flag, scratch PVC). Per-LLAP TezAM settings
 (replicas, autoscaling) can be overridden in each `llapClusters[].tezAm` entry.
 
@@ -1587,14 +1587,14 @@ HiveClusterReconciler
   |
   +-- [Imperative] Per-LLAP-Cluster Resources (for each llapClusters[] entry):
         +-- LLAP StatefulSet + headless Service + ConfigMap + PDB
-        +-- TezAM StatefulSet + headless Service + ConfigMap (one TezAM per 
LLAP cluster)
+        +-- TezAM Deployment + headless Service + ConfigMap (one TezAM per 
LLAP cluster)
 ```
 
 LLAP clusters and their paired TezAM instances are managed imperatively by the 
reconciler
 (not via JOSDK workflow dependents) because the number of clusters is dynamic 
— determined
 at runtime from the CR spec. Each `llapClusters[]` entry produces:
 - **LLAP**: StatefulSet (`{cluster}-{name}`), headless Service, ConfigMap 
(`llap-daemon-site.xml`), PDB
-- **TezAM**: StatefulSet (`{cluster}-tezam-{name}`), headless Service, 
ConfigMap (`tez-site.xml`)
+- **TezAM**: Deployment (`{cluster}-tezam-{name}`), headless Service, 
ConfigMap (`tez-site.xml`)
 
 All imperative resources are applied via `serverSideApply()`. Removed LLAP 
clusters (and
 their TezAMs) are garbage-collected automatically using label-based discovery.
diff --git a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml 
b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
index b1e8104b155..8aea42bdca5 100644
--- a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
@@ -19,7 +19,7 @@ description: Apache Hive Kubernetes Operator - deploys and 
manages Hive clusters
 type: application
 version: "4.3.0-SNAPSHOT"
 appVersion: "4.3.0-SNAPSHOT"
-kubeVersion: ">=1.25.0"
+kubeVersion: ">=1.25.0-0"
 keywords:
   - hive
   - hadoop
diff --git 
a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
 
b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
index 81947c1c491..9947f44b75b 100644
--- 
a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
+++ 
b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
@@ -692,6 +692,10 @@ spec:
                     type: string
                 type: object
                 x-kubernetes-preserve-unknown-fields: true
+              serviceAccountName:
+                description: "Kubernetes ServiceAccount name for all component 
pods.\
+                  \ If not specified, pods use the namespace default service 
account."
+                type: string
               suspend:
                 description: "When true, the cluster is immediately suspended 
(all\
                   \ components scaled to 0). Set to false to wake a suspended 
cluster."
diff --git 
a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml 
b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
index 3b0eb0e8e40..3316ce11054 100644
--- a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
@@ -58,3 +58,7 @@ rules:
   - apiGroups: ["policy"]
     resources: ["poddisruptionbudgets"]
     verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
+  # EndpointSlices: operator manages a custom per-pod-hostname slice for TezAM 
DNS
+  - apiGroups: ["discovery.k8s.io"]
+    resources: ["endpointslices"]
+    verbs: ["get", "list", "create", "patch", "delete"]
diff --git 
a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml 
b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
index 67ec6c168fb..278ae40e8df 100644
--- a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
@@ -26,6 +26,9 @@ metadata:
 spec:
   image: {{ .Values.cluster.image }}
   imagePullPolicy: {{ .Values.cluster.imagePullPolicy }}
+  {{- if .Values.cluster.serviceAccountName }}
+  serviceAccountName: {{ .Values.cluster.serviceAccountName }}
+  {{- end }}
 
   metastore:
     enabled: {{ .Values.cluster.metastore.enabled }}
@@ -178,7 +181,6 @@ spec:
   tezAm:
     enabled: {{ .Values.cluster.tezAm.enabled }}
     {{- if .Values.cluster.tezAm.enabled }}
-    replicas: {{ .Values.cluster.tezAm.replicas }}
     scratchStorageSize: {{ .Values.cluster.tezAm.scratchStorageSize | quote }}
     {{- if .Values.cluster.tezAm.scratchStorageClassName }}
     scratchStorageClassName: {{ .Values.cluster.tezAm.scratchStorageClassName 
| quote }}
diff --git a/packaging/src/kubernetes/helm/hive-operator/values.yaml 
b/packaging/src/kubernetes/helm/hive-operator/values.yaml
index 26f7eeb5af5..c16f7240ea2 100644
--- a/packaging/src/kubernetes/helm/hive-operator/values.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/values.yaml
@@ -47,6 +47,10 @@ cluster:
   image: "apache/hive:4.3.0-SNAPSHOT"
   imagePullPolicy: IfNotPresent
 
+  # ServiceAccount name for all component pods (HS2, Metastore, LLAP, TezAM, 
schema-init).
+  # If empty, pods use the namespace default service account.
+  serviceAccountName: ""
+
   # ---------------------------------------------------------------------------
   # DATABASE (Required) — RDBMS for the Hive Metastore backend
   # ---------------------------------------------------------------------------
@@ -207,7 +211,6 @@ cluster:
   # ---------------------------------------------------------------------------
   tezAm:
     enabled: true
-    replicas: 2
     scratchStorageSize: "1Gi"
     scratchStorageClassName: ""
     resources: {}
diff --git a/packaging/src/kubernetes/pom.xml b/packaging/src/kubernetes/pom.xml
index f9e7bd046de..eea5fe87921 100644
--- a/packaging/src/kubernetes/pom.xml
+++ b/packaging/src/kubernetes/pom.xml
@@ -52,10 +52,6 @@
       <artifactId>kubernetes-httpclient-vertx</artifactId>
       <version>${fabric8.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-    </dependency>
     <dependency>
       <groupId>io.github.java-diff-utils</groupId>
       <artifactId>java-diff-utils</artifactId>
@@ -73,6 +69,11 @@
       <version>${fabric8.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
index df46ced674c..953052318fd 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.ToIntFunction;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.hive.kubernetes.operator.model.HiveCluster;
@@ -79,7 +80,7 @@ public static void setManagedReplicas(String namespace, 
String clusterName,
     MANAGED_REPLICAS.put(cacheKey(namespace, clusterName, component), 
replicas);
   }
 
-  private record PendingScaleDown(int targetReplicas, Instant annotatedAt) {}
+  private record PendingScaleDown(int targetReplicas, Instant annotatedAt, 
List<String> podsToDeregister) {}
 
   private final BackgroundMetricsScraper bgScraper;
   private final MetricsCache metricsCache;
@@ -112,6 +113,7 @@ public void cleanupCluster(String namespace, String 
clusterName) {
     autoscalers.keySet().removeIf(k -> k.startsWith(prefix));
     lastScaleTimes.keySet().removeIf(k -> k.startsWith(prefix));
     pendingScaleDowns.keySet().removeIf(k -> k.startsWith(prefix));
+    TezAmZkDeregister.cleanupCluster(namespace, clusterName);
     LOG.info("Cleaned up autoscaler state for {}/{}", namespace, clusterName);
   }
 
@@ -178,8 +180,9 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, 
KubernetesClient clie
       } else {
         // Pod deletion cost only applies to Deployments (ReplicaSet 
controller).
         // StatefulSets always scale down by highest ordinal regardless of this
-        // annotation. LLAP/TezAM graceful drain is handled by preStop hooks.
-        updateDeploymentPodDeletionCost(client, namespace, hs2Metrics, 
"hs2_open_sessions");
+        // annotation. LLAP graceful drain is handled by preStop hooks.
+        updateDeploymentPodDeletionCost(client, namespace, hs2Metrics,
+            pm -> pm.metrics().getOrDefault("hs2_open_sessions", 
0.0).intValue());
 
         Map<String, Integer> hs2Patches = new HashMap<>();
         evaluateComponent(cluster, client, namespace, clusterName,
@@ -190,7 +193,7 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, 
KubernetesClient clie
         int currentReplicas = getCurrentReplicas(client, namespace, 
clusterName, ConfigUtils.COMPONENT_HIVESERVER2);
         if (hs2Patch != null && hs2Patch < currentReplicas) {
           // Scale-down: defer to allow deletion-cost annotations to propagate
-          pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch, 
Instant.now()));
+          pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch, 
Instant.now(), null));
           LOG.info("[hiveserver2] Deferring scale-down to {} (waiting for 
deletion-cost propagation)",
               hs2Patch);
         } else if (hs2Patch != null) {
@@ -251,9 +254,54 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, 
KubernetesClient clie
             tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
         String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey);
         List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey, 
tezAuto.metricsScrapeIntervalSeconds() * 3);
-        evaluateComponent(cluster, client, namespace, clusterName,
-            tezAmComponentKey, tezAuto,
-            perLlapTezAm.replicas(), patches, statuses, tezMetrics);
+
+        int currentTezReplicas = getCurrentReplicas(client, namespace, 
clusterName, tezAmComponentKey);
+        PendingScaleDown pending = pendingScaleDowns.get(tezKey);
+        if (pending != null) {
+          Integer appliedTarget = null;
+          if (Duration.between(pending.annotatedAt(), 
Instant.now()).toSeconds() >= 2) {
+            TezAmZkDeregister.deregisterIdlePods(namespace, clusterName,
+                cluster.getSpec().zookeeper().quorum(), llapSpec.name(), 
pending.podsToDeregister(),
+                cluster.getSpec().hiveServer2().configOverrides());
+            appliedTarget = pending.targetReplicas();
+            patches.put(tezAmComponentKey, appliedTarget);
+            MANAGED_REPLICAS.put(tezKey, appliedTarget);
+            lastScaleTimes.put(tezKey, Instant.now().toString());
+            pendingScaleDowns.remove(tezKey);
+            LOG.info("[{}] Applying deferred scale-down to {} replicas", 
tezAmComponentKey, appliedTarget);
+          }
+          evaluateComponent(cluster, client, namespace, clusterName,
+              tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), new 
HashMap<>(), statuses, tezMetrics);
+          if (pendingScaleDowns.containsKey(tezKey)) {
+            MANAGED_REPLICAS.put(tezKey, currentTezReplicas);
+          } else if (appliedTarget != null) {
+            MANAGED_REPLICAS.put(tezKey, appliedTarget);
+          }
+        } else {
+          Map<String, Integer> tezCosts = 
TezAmScalingStrategy.deletionCostsByPod(tezMetrics);
+          updateDeploymentPodDeletionCost(client, namespace, tezMetrics, pm -> 
tezCosts.get(pm.podName()));
+          
+          Map<String, Integer> tezPatches = new HashMap<>();
+          evaluateComponent(cluster, client, namespace, clusterName,
+              tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches, 
statuses, tezMetrics);
+
+          Integer tezPatch = tezPatches.get(tezAmComponentKey);
+          if (tezPatch != null && tezPatch < currentTezReplicas) {
+            // Scale-down: defer to allow deletion-cost annotations to 
propagate
+            int busyCount = countBusyPods(tezMetrics);
+            int effectivePatch = Math.max(tezPatch, busyCount);
+            int removeCount = currentTezReplicas - effectivePatch;
+            List<String> podsToDeregister = 
TezAmScalingStrategy.podsToRemove(tezMetrics, tezCosts, removeCount);
+            pendingScaleDowns.put(tezKey, new PendingScaleDown(effectivePatch, 
Instant.now(), podsToDeregister));
+            MANAGED_REPLICAS.put(tezKey, currentTezReplicas);
+            LOG.info("[{}] Deferring scale-down to {} (waiting for 
deletion-cost propagation)",
+                tezAmComponentKey, effectivePatch);
+          } else if (tezPatch != null) {
+            // Scale-up: apply immediately
+            patches.put(tezAmComponentKey, tezPatch);
+            MANAGED_REPLICAS.put(tezKey, tezPatch);
+          }
+        }
       }
     }
 
@@ -362,8 +410,7 @@ private int getCurrentReplicas(KubernetesClient client, 
String namespace,
     } else {
       workloadName = clusterName + "-" + component;
     }
-    if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
-        || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
+    if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
       var ss = client.apps().statefulSets()
           .inNamespace(namespace).withName(workloadName).get();
       return ss != null && ss.getSpec().getReplicas() != null ? 
ss.getSpec().getReplicas() : 0;
@@ -375,17 +422,24 @@ private int getCurrentReplicas(KubernetesClient client, 
String namespace,
     }
   }
 
+  /** Counts TezAM pods with active DAG work. */
+  private int countBusyPods(List<PodMetrics> tezMetrics) {
+    return (int) tezMetrics.stream()
+        .filter(pm -> TezAmScalingStrategy.hasActiveDag(pm.metrics()))
+        .count();
+  }
+
   /**
    * Patches each pod's deletion cost annotation based on its active session 
count.
    * Kubernetes uses this during scale-down to kill idle pods first (lower 
cost = killed first).
    * <p>
-   * Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet 
controller
+   * Only meaningful for Deployments (HS2, Metastore, TezAM) — the ReplicaSet 
controller
    * respects this annotation. StatefulSets ignore it and always terminate by 
ordinal.
    */
   private void updateDeploymentPodDeletionCost(KubernetesClient client, String 
namespace,
-      List<PodMetrics> metrics, String metricName) {
+      List<PodMetrics> metrics, ToIntFunction<PodMetrics> costFunction) {
     for (PodMetrics pm : metrics) {
-      int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue();
+      int cost = costFunction.applyAsInt(pm);
       try {
         client.pods().inNamespace(namespace).withName(pm.podName())
             .edit(pod -> {
@@ -393,7 +447,7 @@ private void 
updateDeploymentPodDeletionCost(KubernetesClient client, String nam
                 pod.getMetadata().setAnnotations(new java.util.HashMap<>());
               }
               pod.getMetadata().getAnnotations()
-                  .put("controller.kubernetes.io/pod-deletion-cost", 
String.valueOf(sessions));
+                  .put("controller.kubernetes.io/pod-deletion-cost", 
String.valueOf(cost));
               return pod;
             });
       } catch (Exception e) {
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
index c3aa8aa82ca..20d6474354e 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
@@ -132,7 +132,7 @@ private CompletableFuture<String> fetchMetricsAsync(String 
podIp, int metricsPor
         });
   }
 
-  private static boolean isPodReady(Pod pod) {
+  public static boolean isPodReady(Pod pod) {
     if (pod.getStatus() == null || pod.getStatus().getConditions() == null) {
       return false;
     }
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
index babee17aa77..8940a80b193 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
@@ -24,8 +24,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.lang3.StringUtils;
-
 /**
  * Parses Prometheus text exposition format (from JMX Exporter /metrics).
  * Only extracts metric name → value pairs; labels are stripped.
@@ -55,7 +53,7 @@ public static Map<String, Double> parseWithLabels(String 
body) {
 
   private static Map<String, Double> doParse(String body, boolean keepLabels) {
     Map<String, Double> result = new HashMap<>();
-    if (StringUtils.isEmpty(body)) {
+    if (body == null || body.isEmpty()) {
       return result;
     }
     try (BufferedReader reader = new BufferedReader(new StringReader(body))) {
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
index bc43905a1b3..64a6f0d8306 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
@@ -18,7 +18,10 @@
 
 package org.apache.hive.kubernetes.operator.autoscaling;
 
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hive.kubernetes.operator.model.HiveCluster;
 import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
@@ -32,10 +35,15 @@
  * <p>
  * Uses the per-target session metric from HS2: 
hs2_llap_target_sessions_{llapName}.
  * Falls back to hs2_open_sessions if per-target metrics are not available.
+ * <p>
+ * Scale-down uses {@link #METRIC_DAG_RUNNING} from each TezAM pod's scraped
+ * {@link PodMetrics} to rank idle AMs for safe termination.
  */
 public class TezAmScalingStrategy implements ScalingStrategy {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TezAmScalingStrategy.class);
+  public static final String METRIC_DAG_RUNNING = "tez_am_dag_running";
+  public static final int BUSY_DELETION_COST = Integer.MAX_VALUE;
 
   private final HiveClusterAutoscaler orchestrator;
   private final HiveCluster cluster;
@@ -115,4 +123,33 @@ public int lastMetricValue() {
   public boolean usesScaleUpThreshold() {
     return false;
   }
+
+  public static boolean hasActiveDag(Map<String, Double> metrics) {
+    return metrics.getOrDefault(METRIC_DAG_RUNNING, 0.0) > 0;
+  }
+
+  public static Map<String, Integer> deletionCostsByPod(List<PodMetrics> 
metrics) {
+    Map<String, Integer> costs = new HashMap<>();
+    int idleCost = 0;
+    for (PodMetrics pm : metrics) {
+      if (hasActiveDag(pm.metrics())) {
+        costs.put(pm.podName(), BUSY_DELETION_COST);
+      } else {
+        costs.put(pm.podName(), idleCost++);
+      }
+    }
+    return costs;
+  }
+
+  public static List<String> podsToRemove(List<PodMetrics> metrics,
+      Map<String, Integer> costs, int removeCount) {
+    if (removeCount <= 0) {
+      return List.of();
+    }
+    return metrics.stream()
+        .sorted(Comparator.comparingInt(pm -> costs.get(pm.podName())))
+        .limit(removeCount)
+        .map(PodMetrics::podName)
+        .toList();
+  }
 }
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
new file mode 100644
index 00000000000..353b7fc8a9a
--- /dev/null
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is 
applied.
+ * <p>
+ * A {@link CuratorFramework} client is cached per HiveCluster and reused 
across
+ * scale-downs; it is closed when the cluster is deleted or ZK config changes.
+ */
+public final class TezAmZkDeregister {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezAmZkDeregister.class);
+  private static final String PATH_SEPARATOR = "/";
+
+  private record ZkClientRecord(String zkQuorum, int connTimeoutMs, int 
sessionTimeoutMs,
+      int baseSleepMs, int maxRetries, CuratorFramework client) {}
+
+  private static final ConcurrentHashMap<String, ZkClientRecord> ZK_CLIENTS = 
new ConcurrentHashMap<>();
+
+  private TezAmZkDeregister() {}
+
+  public static void deregisterIdlePods(String namespace, String clusterName, 
String zkQuorum, String llapName,
+      List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+    if (idlePodNames == null || idlePodNames.isEmpty()) {
+      return;
+    }
+    ZkClientRecord zkRecord = getZKRecord(namespace, clusterName, zkQuorum, 
hiveSiteConfig);
+    CuratorFramework client = zkRecord.client();
+    try {
+      if (!client.blockUntilConnected(zkRecord.connTimeoutMs(), 
TimeUnit.MILLISECONDS)) {
+        LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration", 
llapName);
+        return;
+      }
+      String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX + 
PATH_SEPARATOR + llapName;
+      if (client.checkExists().forPath(registryPath) == null) {
+        return;
+      }
+      for (String appId : client.getChildren().forPath(registryPath)) {
+        String nodePath = registryPath + PATH_SEPARATOR + appId;
+        byte[] data = client.getData().forPath(nodePath);
+        if (data == null || data.length == 0) {
+          continue;
+        }
+        String hostName = ConfigUtils.getJsonStringField(new String(data, 
StandardCharsets.UTF_8), "hostName");
+        if (hostName != null) {
+          // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
+          String podName = hostName.contains(".") ? hostName.substring(0, 
hostName.indexOf('.')) : hostName;
+          if (idlePodNames.contains(podName)) {
+            deleteRegistrationNode(client, llapName, nodePath, podName, appId);
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName, 
e.getMessage());
+    } catch (Exception e) {
+      LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName, 
e.getMessage());
+    }
+  }
+
+  private static ZkClientRecord getZKRecord(String namespace, String 
clusterName,
+      String zkQuorum, Map<String, String> hiveSiteConfig) {
+    int connTimeoutMs = ConfigUtils.getTimeMs(hiveSiteConfig, 
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_KEY,
+        ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT_MS);
+    int sessionTimeoutMs = ConfigUtils.getTimeMs(hiveSiteConfig, 
ConfigUtils.HIVE_ZOOKEEPER_SESSION_TIMEOUT_KEY,
+        ConfigUtils.HIVE_ZOOKEEPER_SESSION_TIMEOUT_DEFAULT_MS);
+    int baseSleepMs = ConfigUtils.getTimeMs(hiveSiteConfig, 
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_KEY,
+        ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_DEFAULT_MS);
+    int maxRetries = ConfigUtils.getInt(hiveSiteConfig, 
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_KEY,
+        null, ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_DEFAULT);
+
+    synchronized (ZK_CLIENTS) {
+      String key = namespace + PATH_SEPARATOR + clusterName;
+      ZkClientRecord cachedRecord = ZK_CLIENTS.get(key);
+      if (cachedRecord != null && cachedRecord.zkQuorum().equals(zkQuorum)
+          && cachedRecord.connTimeoutMs() == connTimeoutMs
+          && cachedRecord.sessionTimeoutMs() == sessionTimeoutMs
+          && cachedRecord.baseSleepMs() == baseSleepMs
+          && cachedRecord.maxRetries() == maxRetries) {
+        return cachedRecord;
+      }
+      if (cachedRecord != null) {
+        closeClient(cachedRecord.client());
+        LOG.info("Recreating cached ZK client for {} (quorum or connection 
settings changed)", key);
+      }
+      CuratorFramework client = CuratorFrameworkFactory.builder()
+          .connectString(zkQuorum)
+          .connectionTimeoutMs(connTimeoutMs)
+          .sessionTimeoutMs(sessionTimeoutMs)
+          .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+          .build();
+      client.start();
+      ZkClientRecord zkRecord = new ZkClientRecord(zkQuorum, connTimeoutMs, 
sessionTimeoutMs,
+          baseSleepMs, maxRetries, client);
+      ZK_CLIENTS.put(key, zkRecord);
+      return zkRecord;
+    }
+  }
+
+
+  private static void deleteRegistrationNode(CuratorFramework client, String 
llapName,
+      String nodePath, String podName, String appId) {
+    try {
+      client.delete().forPath(nodePath);
+      LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will 
stop routing to it",
+          llapName, podName, appId);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted): 
{}",
+          llapName, podName, e.getMessage());
+    } catch (Exception e) {
+      LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName, 
podName, e.getMessage());
+    }
+  }
+
+  private static void closeClient(CuratorFramework client) {
+    try {
+      client.close();
+    } catch (Exception e) {
+      LOG.debug("Error closing ZK client: {}", e.getMessage());
+    }
+  }
+
+  public static void cleanupCluster(String namespace, String clusterName) {
+    ZkClientRecord zkRecord = ZK_CLIENTS.remove(namespace + PATH_SEPARATOR + 
clusterName);
+    if (zkRecord != null) {
+      closeClient(zkRecord.client());
+    }
+  }
+}
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
index 2315b455d76..8133f369e66 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
@@ -855,9 +855,11 @@ private static String buildJmxExporterConfig(String 
component) {
       sb.append("  type: GAUGE\n");
       break;
     case ConfigUtils.COMPONENT_TEZAM:
-      // TezAM DAG execution metrics
-      sb.append("- pattern: 'Hadoop<service=TezAppMaster, 
name=TezAppMaster><>(.+)'\n");
-      sb.append("  name: tez_am_$1\n");
+      // LlapMetricsSystem registers beans under LlapTaskScheduler service
+      // SchedulerDagStatus tracks if the AM is running a dag or is idle so 
exported as GAUGE.
+      String schedulerBean = "Hadoop<service=LlapTaskScheduler, 
name=LlapTaskSchedulerMetrics.+><>";
+      sb.append("- pattern: 
'").append(schedulerBean).append("SchedulerDagStatus'\n");
+      sb.append("  name: tez_am_dag_running\n");
       sb.append("  type: GAUGE\n");
       break;
     default:
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
index 3afb0af118d..4ef8a21693e 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
@@ -251,6 +251,7 @@ protected Deployment desired(HiveCluster hiveCluster,
               .addToAnnotations("hive.apache.org/config-hash", configHash)
             .endMetadata()
             .withNewSpec()
+              .withServiceAccountName(spec.serviceAccountName())
               .withInitContainers(initContainers)
               .addNewContainer()
                 .withName(COMPONENT)
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
index 50490119716..1af7df3669b 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.hive.kubernetes.operator.dependent;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,16 +32,25 @@
 import io.fabric8.kubernetes.api.model.OwnerReference;
 import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
 import io.fabric8.kubernetes.api.model.IntOrString;
+import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.Probe;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServiceBuilder;
 import io.fabric8.kubernetes.api.model.Volume;
 import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
+import io.fabric8.kubernetes.api.model.discovery.v1.Endpoint;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointBuilder;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSliceBuilder;
 import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
 import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.kubernetes.operator.autoscaling.TezAmScalingStrategy;
 import org.apache.hive.kubernetes.operator.model.HiveCluster;
 import org.apache.hive.kubernetes.operator.model.HiveClusterSpec;
 import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
@@ -50,6 +60,8 @@
 import org.apache.hive.kubernetes.operator.util.HiveConfigBuilder;
 import org.apache.hive.kubernetes.operator.util.Labels;
 
+import static 
org.apache.hive.kubernetes.operator.autoscaling.MetricsScraper.isPodReady;
+
 /**
  * Static builder methods for LLAP Kubernetes resources.
  * Used by the reconciler to imperatively manage multiple LLAP clusters.
@@ -193,7 +205,7 @@ public static PodDisruptionBudget buildPdb(HiveCluster hc, 
LlapSpec llap) {
 
   // --- TezAM resource builders (one TezAM per LLAP cluster) ---
 
-  /** TezAM StatefulSet name for a specific LLAP cluster. */
+  /** TezAM Deployment/Service name for a specific LLAP cluster. */
   public static String tezAmResourceName(HiveCluster hc, LlapSpec llap) {
     return hc.getMetadata().getName() + TEZAM_INFIX + llap.name();
   }
@@ -229,9 +241,94 @@ public static PodDisruptionBudget 
buildTezAmPdb(HiveCluster hc, LlapSpec llap) {
         .build();
   }
 
-  /** Builds the TezAM StatefulSet for a specific LLAP cluster. */
-  public static StatefulSet buildTezAmStatefulSet(HiveCluster hc, LlapSpec 
llap, Integer replicas) {
-    return INSTANCE.doBuildTezAmStatefulSet(hc, llap, replicas);
+  /** Builds the TezAM Deployment for a specific LLAP cluster. */
+  public static Deployment buildTezAmDeployment(HiveCluster hc, LlapSpec llap, 
Integer replicas) {
+    return INSTANCE.doBuildTezAmDeployment(hc, llap, replicas);
+  }
+
+  /**
+   * Name for the operator-managed EndpointSlice that provides per-pod DNS for 
TezAM.
+   * CoreDNS creates {@code <pod-name>.<svc>.<ns>.svc.cluster.local} A-records 
using it.
+   */
+  public static String tezAmEndpointSliceName(HiveCluster hc, LlapSpec llap) {
+    return tezAmResourceName(hc, llap) + "-hostnames";
+  }
+
+  /**
+   * Builds a custom EndpointSlice for the TezAM headless Service.
+   * <p>
+   * Kubernetes only creates per-pod DNS records ({@code 
<pod>.<svc>.<ns>.svc.cluster.local})
+   * when the Endpoints/EndpointSlice has a {@code hostname} field for each 
address. The
+   * default EndpointSlice controller omits {@code hostname} for Deployment 
pods (it only
+   * sets it automatically for StatefulSet pods). This operator-managed 
EndpointSlice fills
+   * that gap, giving every ready TezAM pod a resolvable FQDN.
+   *
+   * @param pods list of TezAM pods
+   * @return the EndpointSlice, or {@code null} if there are no pod IPs yet or 
if pods have mixed IPv4/IPv6 addresses
+   */
+  public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec 
llap, List<Pod> pods) {
+    String ns = hc.getMetadata().getNamespace();
+    String svcName = tezAmResourceName(hc, llap);
+    Map<String, String> labels = new HashMap<>(Map.of(
+        "kubernetes.io/service-name", svcName,
+        "endpointslice.kubernetes.io/managed-by", "hive-kubernetes-operator",
+        Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE,
+        Labels.APP_INSTANCE, hc.getMetadata().getName(),
+        Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM));
+
+    List<Endpoint> endpoints = new ArrayList<>();
+    String addressType = null;
+    for (var pod : pods) {
+      String ip = pod.getStatus() != null ? pod.getStatus().getPodIP() : null;
+      if (ip == null || ip.isEmpty()) {
+        continue;
+      }
+      String ipFamily = ipAddressType(ip);
+      if (addressType == null) {
+        addressType = ipFamily;
+      } else if (!addressType.equals(ipFamily)) {
+        return null;
+      }
+      boolean ready = isPodReady(pod);
+      String hostname = pod.getMetadata().getName();
+      if (hostname.length() > 63) {
+        hostname = StringUtils.stripEnd(hostname.substring(0, 63), "-");
+      }
+      endpoints.add(new EndpointBuilder()
+          .withHostname(hostname)
+          .withAddresses(ip)
+          .withNewConditions()
+            .withReady(ready)
+            .withServing(ready)
+            .withTerminating(false)
+          .endConditions()
+          .withNewTargetRef()
+            .withKind("Pod")
+            .withNamespace(ns)
+            .withName(pod.getMetadata().getName())
+          .endTargetRef()
+          .build());
+    }
+
+    if (endpoints.isEmpty()) {
+      return null;
+    }
+
+    return new EndpointSliceBuilder()
+        .withNewMetadata()
+          .withName(tezAmEndpointSliceName(hc, llap))
+          .withNamespace(ns)
+          .withLabels(labels)
+          .withOwnerReferences(ownerRef(hc))
+        .endMetadata()
+        .withAddressType(addressType)
+        .withEndpoints(endpoints)
+        .build();
+  }
+
+  /** Returns {@code IPv4} or {@code IPv6} for a pod IP string. */
+  private static String ipAddressType(String ip) {
+    return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4";
   }
 
   /** Builds the headless Service for a TezAM cluster. */
@@ -274,11 +371,11 @@ public static ConfigMap buildTezAmConfigMap(HiveCluster 
hc, LlapSpec llap) {
 
   // --- Private instance methods that use protected helpers from 
HiveDependentResource ---
 
-  private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster, 
LlapSpec llap,
+  private Deployment doBuildTezAmDeployment(HiveCluster hiveCluster, LlapSpec 
llap,
       Integer replicas) {
     HiveClusterSpec spec = hiveCluster.getSpec();
     String ns = hiveCluster.getMetadata().getNamespace();
-    String ssName = tezAmResourceName(hiveCluster, llap);
+    String deployName = tezAmResourceName(hiveCluster, llap);
     Map<String, String> allLabels = Labels.forTezAmCluster(hiveCluster, 
llap.name());
     Map<String, String> selectorLabels = 
Labels.selectorForTezAmCluster(hiveCluster, llap.name());
 
@@ -323,22 +420,26 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster 
hiveCluster, LlapSpec ll
     replaceConfMountWithSubPaths(volumeMounts, HIVE_CONFIG_VOLUME,
         "hive-site.xml", "tez-site.xml", "core-site.xml");
 
+    AutoscalingSpec tezAutoscaling = llap.tezAm().autoscaling();
+    if (tezAutoscaling.isEnabled()) {
+      addJmxExporter(spec.image(), ConfigUtils.COMPONENT_TEZAM, 
tezAutoscaling.metricsPort(),
+          initContainers, volumeMounts, volumes, envVars, ports);
+    }
+
     String configHash = sha256(
         
HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHiveServer2HiveSite(hiveCluster, 
spec)),
         HadoopXmlBuilder.buildXml(HiveConfigBuilder.getTezSite(spec, llap)),
         HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHadoopCoreSite(spec)));
 
-    StatefulSet statefulSet = new StatefulSetBuilder()
+    Deployment deployment = new DeploymentBuilder()
         .withNewMetadata()
-          .withName(ssName)
+          .withName(deployName)
           .withNamespace(ns)
           .withLabels(allLabels)
           .withOwnerReferences(ownerRef(hiveCluster))
         .endMetadata()
         .withNewSpec()
           .withReplicas(replicas)
-          .withPodManagementPolicy("Parallel")
-          .withServiceName(ssName)
           .withNewSelector()
             .withMatchLabels(selectorLabels)
           .endSelector()
@@ -350,6 +451,8 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster 
hiveCluster, LlapSpec ll
               .addToAnnotations("hive.apache.org/config-hash", configHash)
             .endMetadata()
             .withNewSpec()
+              .withServiceAccountName(spec.serviceAccountName())
+              .withSubdomain(deployName)
               .withInitContainers(initContainers)
               .addNewContainer()
                 .withName(ConfigUtils.COMPONENT_TEZAM)
@@ -367,13 +470,31 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster 
hiveCluster, LlapSpec ll
         .build();
 
     applySpreadAffinityIfAbsent(
-        statefulSet.getSpec().getTemplate().getSpec(), selectorLabels);
+        deployment.getSpec().getTemplate().getSpec(), selectorLabels);
 
-    appendUserVolumes(statefulSet.getSpec().getTemplate().getSpec(),
+    appendUserVolumes(deployment.getSpec().getTemplate().getSpec(),
         spec.volumes(), spec.volumeMounts(),
         spec.tezAm().extraVolumes(), spec.tezAm().extraVolumeMounts());
 
-    return statefulSet;
+    // When autoscaling is enabled, add a preStop hook that waits for any 
in-flight DAG
+    // to complete before exiting. The operator has already deleted the ZK 
registration
+    // node (see HiveClusterAutoscaler) so no new DAGs can arrive. If a DAG 
arrives in
+    // via the brief race window before ZK delete, we wait for it to finish.
+    // Once tez_am_dag_running reaches 0, the hook exits and Kubernetes 
terminates the pod.
+    if (tezAutoscaling.isEnabled()) {
+      String preStopScript = buildDrainScript(
+          "Waiting for active DAG to complete",
+          TezAmScalingStrategy.METRIC_DAG_RUNNING, "DAG",
+          "TezAM is idle. preStop complete, K8s will terminate pod.",
+          10, 6, null, tezAutoscaling.metricsPort());
+      applyAutoscalingLifecycle(
+          deployment.getSpec().getTemplate().getSpec(),
+          deployment.getSpec().getTemplate().getMetadata(),
+          preStopScript, tezAutoscaling.gracePeriodSeconds(),
+          tezAutoscaling.metricsScrapeIntervalSeconds());
+    }
+
+    return deployment;
   }
 
   private StatefulSet doBuildStatefulSet(HiveCluster hiveCluster, LlapSpec 
llap, Integer replicas) {
@@ -478,6 +599,7 @@ private StatefulSet doBuildStatefulSet(HiveCluster 
hiveCluster, LlapSpec llap, I
               .addToAnnotations("hive.apache.org/config-hash", configHash)
             .endMetadata()
             .withNewSpec()
+              .withServiceAccountName(spec.serviceAccountName())
               .withInitContainers(initContainers)
               .addNewContainer()
                 .withName(ConfigUtils.COMPONENT_LLAP)
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
index ff19afd5c02..7a8616c3559 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
@@ -161,6 +161,7 @@ protected Deployment desired(HiveCluster hiveCluster,
               .addToAnnotations("hive.apache.org/config-hash", configHash)
             .endMetadata()
             .withNewSpec()
+              .withServiceAccountName(spec.serviceAccountName())
               .withInitContainers(initContainers)
               .addNewContainer()
                 .withName(COMPONENT)
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
index fb4b588401c..d68af96e4f0 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
@@ -128,6 +128,7 @@ protected Job desired(HiveCluster hiveCluster,
                   hiveCluster, COMPONENT))
             .endMetadata()
             .withNewSpec()
+              .withServiceAccountName(spec.serviceAccountName())
               .withRestartPolicy("OnFailure")
               .withInitContainers(initContainers)
               .addNewContainer()
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
index b9c0faf42c5..cb1428826bb 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
@@ -82,6 +82,9 @@ public record HiveClusterSpec(
         + "(e.g., mounting a GCS key file at /etc/gcs/key.json)")
     @SchemaFrom(type = Object[].class) @PreserveUnknownFields
     List<VolumeMount> volumeMounts,
+    @JsonPropertyDescription("Kubernetes ServiceAccount name for all component 
pods. "
+        + "If not specified, pods use the namespace default service account.")
+    String serviceAccountName,
     @JsonPropertyDescription("Auto-suspend configuration. When enabled and all 
components "
         + "are idle for the configured timeout, the cluster scales to 0 
replicas.")
     AutoSuspendSpec autoSuspend,
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
index 1bdc6a4ebe2..1afb813aad4 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
@@ -30,9 +30,11 @@
 
 import io.fabric8.kubernetes.api.model.Condition;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -312,7 +314,7 @@ private HiveClusterStatus buildStatus(HiveCluster resource,
             ? perLlapTezAm.autoscaling().minReplicas()
             : perLlapTezAm.replicas();
         tezAmStatuses.put(llapSpec.name(),
-            buildComponentStatus(context, StatefulSet.class, tezAmSsName,
+            buildComponentStatus(context, Deployment.class, tezAmSsName,
                 perLlapTezAm.replicas(), tezAmMin));
       }
       status.setTezAmClusters(tezAmStatuses);
@@ -562,8 +564,7 @@ private void patchReplicas(KubernetesClient client, 
HiveCluster resource,
       workloadName = resource.getMetadata().getName() + "-" + component;
     }
     try {
-      if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
-          || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
+      if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
         
client.apps().statefulSets().inNamespace(namespace).withName(workloadName).scale(replicas);
       } else {
         
client.apps().deployments().inNamespace(namespace).withName(workloadName).scale(replicas);
@@ -586,7 +587,8 @@ private void patchSuspendSpec(KubernetesClient client, 
HiveCluster resource, boo
               oldSpec.hiveServer2(), oldSpec.llapClusters(), 
oldSpec.llapClusterRouting(),
               oldSpec.tezAm(), oldSpec.zookeeper(),
               oldSpec.hadoop(), oldSpec.envVars(), oldSpec.externalJars(),
-              oldSpec.volumes(), oldSpec.volumeMounts(), 
oldSpec.autoSuspend(), suspend);
+              oldSpec.volumes(), oldSpec.volumeMounts(), 
oldSpec.serviceAccountName(),
+              oldSpec.autoSuspend(), suspend);
           hc.setSpec(newSpec);
           return hc;
         });
@@ -642,10 +644,11 @@ private void reconcileLlapClusters(HiveCluster resource, 
KubernetesClient client
         client.services().inNamespace(ns)
             .resource(LlapResourceBuilder.buildTezAmService(resource, 
llapSpec))
             .serverSideApply();
-        client.apps().statefulSets().inNamespace(ns)
-            .resource(LlapResourceBuilder.buildTezAmStatefulSet(resource, 
llapSpec, tezAmReplicas))
+        client.apps().deployments().inNamespace(ns)
+            .resource(LlapResourceBuilder.buildTezAmDeployment(resource, 
llapSpec, tezAmReplicas))
             .forceConflicts()
             .serverSideApply();
+        reconcileTezAmEndpointSlice(resource, client, llapSpec);
         if (llapSpec.tezAm().autoscaling().isEnabled()) {
           client.policy().v1().podDisruptionBudget().inNamespace(ns)
               .resource(LlapResourceBuilder.buildTezAmPdb(resource, llapSpec))
@@ -743,24 +746,56 @@ private void garbageCollectLlapResources(KubernetesClient 
client, String ns,
         Labels.APP_INSTANCE, clusterName,
         Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM);
 
-    
client.apps().statefulSets().inNamespace(ns).withLabels(tezamSelector).list().getItems()
+    
client.apps().deployments().inNamespace(ns).withLabels(tezamSelector).list().getItems()
         .stream()
-        .filter(ss -> {
-          String llapName = 
ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
+        .filter(d -> {
+          String llapName = 
d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
           return llapName != null && !desiredNames.contains(llapName);
         })
-        .forEach(ss -> {
-          String llapName = 
ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
+        .forEach(d -> {
+          String llapName = 
d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
           LOG.info("Garbage-collecting TezAM for LLAP cluster '{}' in {}/{}", 
llapName, ns, clusterName);
-          
client.apps().statefulSets().inNamespace(ns).withName(ss.getMetadata().getName()).delete();
-          
client.services().inNamespace(ns).withName(ss.getMetadata().getName()).delete();
+          
client.apps().deployments().inNamespace(ns).withName(d.getMetadata().getName()).delete();
+          
client.services().inNamespace(ns).withName(d.getMetadata().getName()).delete();
           client.configMaps().inNamespace(ns)
-              .withName(ss.getMetadata().getName() + "-config").delete();
+              .withName(d.getMetadata().getName() + "-config").delete();
           client.policy().v1().podDisruptionBudget().inNamespace(ns)
-              .withName(ss.getMetadata().getName() + "-pdb").delete();
+              .withName(d.getMetadata().getName() + "-pdb").delete();
+          client.discovery().v1().endpointSlices().inNamespace(ns)
+              .withName(d.getMetadata().getName() + "-hostnames").delete();
         });
   }
 
+  /**
+   * Maintains an operator-managed EndpointSlice for the TezAM headless 
Service.
+   * The default EndpointSlice controller does not set the {@code hostname} 
field for
+   * Deployment pods, so per-pod DNS records are not created by CoreDNS. This 
method
+   * creates/updates an EndpointSlice (with 
managed-by=hive-kubernetes-operator)
+   * that includes {@code hostname} for each ready TezAM pod, giving CoreDNS 
the data
+   * it needs to serve {@code <pod>.<svc>.<ns>.svc.cluster.local} A-records.
+   */
+  private void reconcileTezAmEndpointSlice(HiveCluster resource, 
KubernetesClient client, LlapSpec llapSpec) {
+    String ns = resource.getMetadata().getNamespace();
+    Map<String, String> selector = Labels.selectorForTezAmCluster(resource, 
llapSpec.name());
+    List<Pod> pods = 
client.pods().inNamespace(ns).withLabels(selector).list().getItems();
+    EndpointSlice slice = 
LlapResourceBuilder.buildTezAmEndpointSlice(resource, llapSpec, pods);
+    String sliceName = LlapResourceBuilder.tezAmEndpointSliceName(resource, 
llapSpec);
+    if (slice == null) {
+      
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).delete();
+      return;
+    }
+    var existing = 
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).get();
+    if (existing != null && existing.getAddressType() != null
+        && !existing.getAddressType().equals(slice.getAddressType())) {
+      
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).withGracePeriod(0L).delete();
+    }
+    
+    client.discovery().v1().endpointSlices().inNamespace(ns)
+        .resource(slice)
+        .forceConflicts()
+        .serverSideApply();
+  }
+
   // --- Auto-Suspend / Wake ---
 
   enum SuspendAction { RUNNING, IDLE_START, IDLE_WAITING, SUSPEND_NOW, 
STAY_SUSPENDED, WAKE }
@@ -829,7 +864,7 @@ private boolean isClusterIdle(HiveCluster resource, 
KubernetesClient client) {
     if (spec.tezAm().isEnabled()) {
       for (var llap : spec.llapClusters()) {
         if (llap.isEnabled()
-            && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(), 
true,
+            && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(), 
false,
                 llap.tezAm().autoscaling().minReplicas())) {
           return false;
         }
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
index 6dbc38f9b67..a86a4fb9c50 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
@@ -20,8 +20,14 @@
 
 import java.util.Map;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 public final class ConfigUtils {
 
+  private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
   private ConfigUtils() {
   }
 
@@ -71,6 +77,9 @@ public static String tezAmComponentKey(String llapName) {
 
   public static final String HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS_KEY = 
"hive.server2.tez.use.external.sessions";
 
+  public static final String HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS_KEY =
+      "hive.server2.tez.initialize.default.sessions";
+
   public static final String HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE_KEY =
       "hive.server2.tez.external.sessions.namespace";
 
@@ -85,6 +94,18 @@ public static String tezAmComponentKey(String llapName) {
 
   public static final String HIVE_ZOOKEEPER_QUORUM_KEY = 
"hive.zookeeper.quorum";
 
+  public static final String HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_KEY = 
"hive.zookeeper.connection.timeout";
+  public static final int HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT_MS = 15000;
+
+  public static final String HIVE_ZOOKEEPER_SESSION_TIMEOUT_KEY = 
"hive.zookeeper.session.timeout";
+  public static final int HIVE_ZOOKEEPER_SESSION_TIMEOUT_DEFAULT_MS = 120000;
+
+  public static final String HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_KEY = 
"hive.zookeeper.connection.basesleeptime";
+  public static final int HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_DEFAULT_MS = 
1000;
+
+  public static final String HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_KEY = 
"hive.zookeeper.connection.max.retries";
+  public static final int HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_DEFAULT = 3;
+
   public static final String HIVE_EXECUTION_MODE_KEY = "hive.execution.mode";
 
   public static final String HIVE_LLAP_EXECUTION_MODE_KEY = 
"hive.llap.execution.mode";
@@ -198,6 +219,31 @@ public static int getInt(Map<String, String> overrides,
     return defaultVal;
   }
 
+  public static int getTimeMs(Map<String, String> overrides, String key, int 
defaultMs) {
+    if (overrides == null) {
+      return defaultMs;
+    }
+    String val = overrides.get(key);
+    if (val == null) {
+      return defaultMs;
+    }
+    val = val.trim();
+    try {
+      if (val.endsWith("ms")) {
+        return Integer.parseInt(val.substring(0, val.length() - 2).trim());
+      }
+      if (val.endsWith("s")) {
+        return (int) (Double.parseDouble(val.substring(0, val.length() - 
1).trim()) * 1000);
+      }
+      if (val.endsWith("m")) {
+        return (int) (Double.parseDouble(val.substring(0, val.length() - 
1).trim()) * 60000);
+      }
+      return Integer.parseInt(val);
+    } catch (NumberFormatException e) {
+      return defaultMs;
+    }
+  }
+
   public static boolean getBoolean(Map<String, String> overrides,
       String key, boolean defaultVal) {
     if (overrides != null) {
@@ -208,4 +254,16 @@ public static boolean getBoolean(Map<String, String> 
overrides,
     }
     return defaultVal;
   }
+
+  public static String getJsonStringField(String json, String fieldName) {
+    if (json == null || json.isBlank()) {
+      return null;
+    }
+    try {
+      JsonNode field = JSON_MAPPER.readTree(json).get(fieldName);
+      return field != null && field.isTextual() ? field.asText() : null;
+    } catch (JsonProcessingException e) {
+      return null;
+    }
+  }
 }
diff --git 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
index cfcfbacc888..5969d6b9e25 100644
--- 
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
+++ 
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
@@ -146,6 +146,16 @@ public static Map<String, String> getHiveServer2HiveSite(
     if (spec.hiveServer2().configOverrides() != null) {
       props.putAll(spec.hiveServer2().configOverrides());
     }
+
+    // Default Tez sessions with multi-tenancy is not supported, and the 
respective config would be overridden.
+    // In case of single llap cluster, default tez sessions is supported.
+    long enabledLlapClusters = spec.llapClusters().stream()
+        .filter(LlapSpec::isEnabled)
+        .count();
+    if (enabledLlapClusters > 1) {
+      props.put(ConfigUtils.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS_KEY, 
"false");
+    }
+
     return props;
   }
 

Reply via email to