This is an automated email from the ASF dual-hosted git repository.
ayushtkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bb9cbf1da52 HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to
scale down idle AMs (#6561)
bb9cbf1da52 is described below
commit bb9cbf1da52ec11cd15250bd2db1041581080822
Author: Tanishq Chugh <[email protected]>
AuthorDate: Fri Jul 3 17:33:26 2026 +0530
HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs
(#6561)
---
.../llap/tezplugins/LlapTaskSchedulerService.java | 9 ++
.../tezplugins/metrics/LlapTaskSchedulerInfo.java | 3 +-
.../metrics/LlapTaskSchedulerMetrics.java | 8 ++
packaging/src/kubernetes/README.md | 16 +--
.../src/kubernetes/helm/hive-operator/Chart.yaml | 2 +-
.../crds/hiveclusters.hive.apache.org-v1.yml | 4 +
.../helm/hive-operator/templates/clusterrole.yaml | 4 +
.../helm/hive-operator/templates/hivecluster.yaml | 4 +-
.../src/kubernetes/helm/hive-operator/values.yaml | 5 +-
packaging/src/kubernetes/pom.xml | 9 +-
.../autoscaling/HiveClusterAutoscaler.java | 80 +++++++++--
.../operator/autoscaling/MetricsScraper.java | 2 +-
.../operator/autoscaling/PrometheusTextParser.java | 4 +-
.../operator/autoscaling/TezAmScalingStrategy.java | 37 +++++
.../operator/autoscaling/TezAmZkDeregister.java | 160 +++++++++++++++++++++
.../operator/dependent/HiveDependentResource.java | 8 +-
.../dependent/HiveServer2DeploymentDependent.java | 1 +
.../operator/dependent/LlapResourceBuilder.java | 148 +++++++++++++++++--
.../dependent/MetastoreDeploymentDependent.java | 1 +
.../operator/dependent/SchemaInitJobDependent.java | 1 +
.../kubernetes/operator/model/HiveClusterSpec.java | 3 +
.../operator/reconciler/HiveClusterReconciler.java | 67 ++++++---
.../hive/kubernetes/operator/util/ConfigUtils.java | 58 ++++++++
.../operator/util/HiveConfigBuilder.java | 10 ++
24 files changed, 579 insertions(+), 65 deletions(-)
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 9ac108728fb..c75800c5546 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1080,6 +1080,9 @@ public void dagComplete() {
writeLock.lock();
try {
dagRunning = false;
+ if (metrics != null) {
+ metrics.setDagRunning(false);
+ }
dagStats = new StatsPerDag();
int pendingCount = 0;
for (Entry<Priority, List<TaskInfo>> entry : pendingTasks.entrySet()) {
@@ -1173,6 +1176,9 @@ public void allocateTask(Object task, Resource
capability, String[] hosts, Strin
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
+ if (metrics != null) {
+ metrics.setDagRunning(true);
+ }
}
dagStats.registerTaskRequest(hosts, racks);
addPendingTask(taskInfo);
@@ -1194,6 +1200,9 @@ public void allocateTask(Object task, Resource
capability, ContainerId container
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
+ if (metrics != null) {
+ metrics.setDagRunning(true);
+ }
}
dagStats.registerTaskRequest(null, null);
addPendingTask(taskInfo);
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
index 0750dc6aa7b..10878ea3149 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java
@@ -38,7 +38,8 @@ public enum LlapTaskSchedulerInfo implements MetricsInfo {
SchedulerRunningTaskCount("Total number of running tasks"),
SchedulerPendingPreemptionTaskCount("Total number of tasks pending for
pre-emption"),
SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
- SchedulerCompletedDagCount("Number of DAGs completed");
+ SchedulerCompletedDagCount("Number of DAGs completed"),
+ SchedulerDagStatus("Current AM operational DAG status: 0 for IDLE, 1 for
RUNNING");
private final String desc;
diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index 0dd645da82c..5c94d0b0270 100644
---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -25,6 +25,7 @@
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount;
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount;
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount;
+import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDagStatus;
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount;
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount;
import static
org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount;
@@ -84,6 +85,8 @@ public class LlapTaskSchedulerMetrics implements
MetricsSource {
@Metric
MutableCounterInt completedDagcount;
@Metric
+ MutableGaugeInt dagRunning;
+ @Metric
MutableCounterInt pendingPreemptionTasksCount;
@Metric
MutableGaugeInt wmUnusedGuaranteedCount;
@@ -276,6 +279,7 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb)
{
.addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value())
.addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value())
.addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value())
+ .addGauge(SchedulerDagStatus, dagRunning.value())
.addCounter(SchedulerPendingTaskCount, pendingTasksCount.value())
.addCounter(SchedulerSchedulableTaskCount,
schedulableTasksCount.value())
.addCounter(SchedulerRunningTaskCount, runningTasksCount.value())
@@ -285,6 +289,10 @@ private void getTaskSchedulerStats(MetricsRecordBuilder
rb) {
.addCounter(SchedulerCompletedDagCount, completedDagcount.value());
}
+ public void setDagRunning(boolean running) {
+ dagRunning.set(running ? 1 : 0);
+ }
+
public JvmMetrics getJvmMetrics() {
return jvmMetrics;
}
diff --git a/packaging/src/kubernetes/README.md
b/packaging/src/kubernetes/README.md
index 7a9aac5aa12..2202885d459 100644
--- a/packaging/src/kubernetes/README.md
+++ b/packaging/src/kubernetes/README.md
@@ -520,7 +520,7 @@ HS2 routes sessions to clusters server-side based on
admin-defined user/group ru
Each LLAP cluster is fully isolated:
- **Separate LLAP daemon StatefulSet** with independent executor count,
memory, and replicas
-- **Separate TezAM StatefulSet** (one per LLAP cluster) with its own ZooKeeper
registration
+- **Separate TezAM Deployment** (one per LLAP cluster) with its own ZooKeeper
registration
- **Separate autoscaling** — each cluster scales independently based on its
own metrics
- **Shared scratch PVC** (ReadWriteMany) for HS2 ↔ TezAM coordination files
@@ -618,11 +618,11 @@ For the above configuration, the operator creates:
| Resource | Name | Purpose |
|----------|------|---------|
| StatefulSet | `hive-production` | LLAP daemons for production cluster |
-| StatefulSet | `hive-tezam-production` | TezAM for production cluster |
+| Deployment | `hive-tezam-production` | TezAM for production cluster |
| StatefulSet | `hive-analytics` | LLAP daemons for analytics cluster |
-| StatefulSet | `hive-tezam-analytics` | TezAM for analytics cluster |
+| Deployment | `hive-tezam-analytics` | TezAM for analytics cluster |
| StatefulSet | `hive-dev` | LLAP daemons for dev cluster |
-| StatefulSet | `hive-tezam-dev` | TezAM for dev cluster |
+| Deployment | `hive-tezam-dev` | TezAM for dev cluster |
| Service (headless) | `hive-production`, `hive-analytics`, `hive-dev` | LLAP
daemon discovery |
| Service (headless) | `hive-tezam-production`, `hive-tezam-analytics`,
`hive-tezam-dev` | TezAM discovery |
| ConfigMap | `hive-production-config`, etc. | `llap-daemon-site.xml` per
cluster |
@@ -1391,7 +1391,7 @@ setup is needed — simply connect to HS2 and the operator
wakes LLAP/TezAM as n
LLAP is configured as an array (`llapClusters`) to support multi-tenant
deployments with
independent scaling. Each entry creates a separate LLAP StatefulSet, Service,
ConfigMap,
-and a paired TezAM StatefulSet (when `tezAm.enabled: true`).
+and a paired TezAM Deployment (when `tezAm.enabled: true`).
| Value | Default | Description |
|-------|---------|-------------|
@@ -1419,7 +1419,7 @@ Clients connect with just their identity — no
cluster-specific JDBC URL params
### Tez AM
-TezAM is deployed as one StatefulSet per LLAP cluster. The global `tezAm`
section
+TezAM is deployed as one Deployment per LLAP cluster. The global `tezAm`
section
controls shared settings (enabled flag, scratch PVC). Per-LLAP TezAM settings
(replicas, autoscaling) can be overridden in each `llapClusters[].tezAm` entry.
@@ -1587,14 +1587,14 @@ HiveClusterReconciler
|
+-- [Imperative] Per-LLAP-Cluster Resources (for each llapClusters[] entry):
+-- LLAP StatefulSet + headless Service + ConfigMap + PDB
- +-- TezAM StatefulSet + headless Service + ConfigMap (one TezAM per
LLAP cluster)
+ +-- TezAM Deployment + headless Service + ConfigMap (one TezAM per
LLAP cluster)
```
LLAP clusters and their paired TezAM instances are managed imperatively by the
reconciler
(not via JOSDK workflow dependents) because the number of clusters is dynamic
— determined
at runtime from the CR spec. Each `llapClusters[]` entry produces:
- **LLAP**: StatefulSet (`{cluster}-{name}`), headless Service, ConfigMap
(`llap-daemon-site.xml`), PDB
-- **TezAM**: StatefulSet (`{cluster}-tezam-{name}`), headless Service,
ConfigMap (`tez-site.xml`)
+- **TezAM**: Deployment (`{cluster}-tezam-{name}`), headless Service,
ConfigMap (`tez-site.xml`)
All imperative resources are applied via `serverSideApply()`. Removed LLAP
clusters (and
their TezAMs) are garbage-collected automatically using label-based discovery.
diff --git a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
index b1e8104b155..8aea42bdca5 100644
--- a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml
@@ -19,7 +19,7 @@ description: Apache Hive Kubernetes Operator - deploys and
manages Hive clusters
type: application
version: "4.3.0-SNAPSHOT"
appVersion: "4.3.0-SNAPSHOT"
-kubeVersion: ">=1.25.0"
+kubeVersion: ">=1.25.0-0"
keywords:
- hive
- hadoop
diff --git
a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
index 81947c1c491..9947f44b75b 100644
---
a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
+++
b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml
@@ -692,6 +692,10 @@ spec:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
+ serviceAccountName:
+ description: "Kubernetes ServiceAccount name for all component
pods.\
+ \ If not specified, pods use the namespace default service
account."
+ type: string
suspend:
description: "When true, the cluster is immediately suspended
(all\
\ components scaled to 0). Set to false to wake a suspended
cluster."
diff --git
a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
index 3b0eb0e8e40..3316ce11054 100644
--- a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml
@@ -58,3 +58,7 @@ rules:
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
+ # EndpointSlices: operator manages a custom per-pod-hostname slice for TezAM
DNS
+ - apiGroups: ["discovery.k8s.io"]
+ resources: ["endpointslices"]
+ verbs: ["get", "list", "create", "patch", "delete"]
diff --git
a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
index 67ec6c168fb..278ae40e8df 100644
--- a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml
@@ -26,6 +26,9 @@ metadata:
spec:
image: {{ .Values.cluster.image }}
imagePullPolicy: {{ .Values.cluster.imagePullPolicy }}
+ {{- if .Values.cluster.serviceAccountName }}
+ serviceAccountName: {{ .Values.cluster.serviceAccountName }}
+ {{- end }}
metastore:
enabled: {{ .Values.cluster.metastore.enabled }}
@@ -178,7 +181,6 @@ spec:
tezAm:
enabled: {{ .Values.cluster.tezAm.enabled }}
{{- if .Values.cluster.tezAm.enabled }}
- replicas: {{ .Values.cluster.tezAm.replicas }}
scratchStorageSize: {{ .Values.cluster.tezAm.scratchStorageSize | quote }}
{{- if .Values.cluster.tezAm.scratchStorageClassName }}
scratchStorageClassName: {{ .Values.cluster.tezAm.scratchStorageClassName
| quote }}
diff --git a/packaging/src/kubernetes/helm/hive-operator/values.yaml
b/packaging/src/kubernetes/helm/hive-operator/values.yaml
index 26f7eeb5af5..c16f7240ea2 100644
--- a/packaging/src/kubernetes/helm/hive-operator/values.yaml
+++ b/packaging/src/kubernetes/helm/hive-operator/values.yaml
@@ -47,6 +47,10 @@ cluster:
image: "apache/hive:4.3.0-SNAPSHOT"
imagePullPolicy: IfNotPresent
+ # ServiceAccount name for all component pods (HS2, Metastore, LLAP, TezAM,
schema-init).
+ # If empty, pods use the namespace default service account.
+ serviceAccountName: ""
+
# ---------------------------------------------------------------------------
# DATABASE (Required) — RDBMS for the Hive Metastore backend
# ---------------------------------------------------------------------------
@@ -207,7 +211,6 @@ cluster:
# ---------------------------------------------------------------------------
tezAm:
enabled: true
- replicas: 2
scratchStorageSize: "1Gi"
scratchStorageClassName: ""
resources: {}
diff --git a/packaging/src/kubernetes/pom.xml b/packaging/src/kubernetes/pom.xml
index f9e7bd046de..eea5fe87921 100644
--- a/packaging/src/kubernetes/pom.xml
+++ b/packaging/src/kubernetes/pom.xml
@@ -52,10 +52,6 @@
<artifactId>kubernetes-httpclient-vertx</artifactId>
<version>${fabric8.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
<dependency>
<groupId>io.github.java-diff-utils</groupId>
<artifactId>java-diff-utils</artifactId>
@@ -73,6 +69,11 @@
<version>${fabric8.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
index df46ced674c..953052318fd 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.ToIntFunction;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
@@ -79,7 +80,7 @@ public static void setManagedReplicas(String namespace,
String clusterName,
MANAGED_REPLICAS.put(cacheKey(namespace, clusterName, component),
replicas);
}
- private record PendingScaleDown(int targetReplicas, Instant annotatedAt) {}
+ private record PendingScaleDown(int targetReplicas, Instant annotatedAt,
List<String> podsToDeregister) {}
private final BackgroundMetricsScraper bgScraper;
private final MetricsCache metricsCache;
@@ -112,6 +113,7 @@ public void cleanupCluster(String namespace, String
clusterName) {
autoscalers.keySet().removeIf(k -> k.startsWith(prefix));
lastScaleTimes.keySet().removeIf(k -> k.startsWith(prefix));
pendingScaleDowns.keySet().removeIf(k -> k.startsWith(prefix));
+ TezAmZkDeregister.cleanupCluster(namespace, clusterName);
LOG.info("Cleaned up autoscaler state for {}/{}", namespace, clusterName);
}
@@ -178,8 +180,9 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster,
KubernetesClient clie
} else {
// Pod deletion cost only applies to Deployments (ReplicaSet
controller).
// StatefulSets always scale down by highest ordinal regardless of this
- // annotation. LLAP/TezAM graceful drain is handled by preStop hooks.
- updateDeploymentPodDeletionCost(client, namespace, hs2Metrics,
"hs2_open_sessions");
+ // annotation. LLAP graceful drain is handled by preStop hooks.
+ updateDeploymentPodDeletionCost(client, namespace, hs2Metrics,
+ pm -> pm.metrics().getOrDefault("hs2_open_sessions",
0.0).intValue());
Map<String, Integer> hs2Patches = new HashMap<>();
evaluateComponent(cluster, client, namespace, clusterName,
@@ -190,7 +193,7 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster,
KubernetesClient clie
int currentReplicas = getCurrentReplicas(client, namespace,
clusterName, ConfigUtils.COMPONENT_HIVESERVER2);
if (hs2Patch != null && hs2Patch < currentReplicas) {
// Scale-down: defer to allow deletion-cost annotations to propagate
- pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch,
Instant.now()));
+ pendingScaleDowns.put(hs2Key, new PendingScaleDown(hs2Patch,
Instant.now(), null));
LOG.info("[hiveserver2] Deferring scale-down to {} (waiting for
deletion-cost propagation)",
hs2Patch);
} else if (hs2Patch != null) {
@@ -251,9 +254,54 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster,
KubernetesClient clie
tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey);
List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey,
tezAuto.metricsScrapeIntervalSeconds() * 3);
- evaluateComponent(cluster, client, namespace, clusterName,
- tezAmComponentKey, tezAuto,
- perLlapTezAm.replicas(), patches, statuses, tezMetrics);
+
+ int currentTezReplicas = getCurrentReplicas(client, namespace,
clusterName, tezAmComponentKey);
+ PendingScaleDown pending = pendingScaleDowns.get(tezKey);
+ if (pending != null) {
+ Integer appliedTarget = null;
+ if (Duration.between(pending.annotatedAt(),
Instant.now()).toSeconds() >= 2) {
+ TezAmZkDeregister.deregisterIdlePods(namespace, clusterName,
+ cluster.getSpec().zookeeper().quorum(), llapSpec.name(),
pending.podsToDeregister(),
+ cluster.getSpec().hiveServer2().configOverrides());
+ appliedTarget = pending.targetReplicas();
+ patches.put(tezAmComponentKey, appliedTarget);
+ MANAGED_REPLICAS.put(tezKey, appliedTarget);
+ lastScaleTimes.put(tezKey, Instant.now().toString());
+ pendingScaleDowns.remove(tezKey);
+ LOG.info("[{}] Applying deferred scale-down to {} replicas",
tezAmComponentKey, appliedTarget);
+ }
+ evaluateComponent(cluster, client, namespace, clusterName,
+ tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), new
HashMap<>(), statuses, tezMetrics);
+ if (pendingScaleDowns.containsKey(tezKey)) {
+ MANAGED_REPLICAS.put(tezKey, currentTezReplicas);
+ } else if (appliedTarget != null) {
+ MANAGED_REPLICAS.put(tezKey, appliedTarget);
+ }
+ } else {
+ Map<String, Integer> tezCosts =
TezAmScalingStrategy.deletionCostsByPod(tezMetrics);
+ updateDeploymentPodDeletionCost(client, namespace, tezMetrics, pm ->
tezCosts.get(pm.podName()));
+
+ Map<String, Integer> tezPatches = new HashMap<>();
+ evaluateComponent(cluster, client, namespace, clusterName,
+ tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches,
statuses, tezMetrics);
+
+ Integer tezPatch = tezPatches.get(tezAmComponentKey);
+ if (tezPatch != null && tezPatch < currentTezReplicas) {
+ // Scale-down: defer to allow deletion-cost annotations to
propagate
+ int busyCount = countBusyPods(tezMetrics);
+ int effectivePatch = Math.max(tezPatch, busyCount);
+ int removeCount = currentTezReplicas - effectivePatch;
+ List<String> podsToDeregister =
TezAmScalingStrategy.podsToRemove(tezMetrics, tezCosts, removeCount);
+ pendingScaleDowns.put(tezKey, new PendingScaleDown(effectivePatch,
Instant.now(), podsToDeregister));
+ MANAGED_REPLICAS.put(tezKey, currentTezReplicas);
+ LOG.info("[{}] Deferring scale-down to {} (waiting for
deletion-cost propagation)",
+ tezAmComponentKey, effectivePatch);
+ } else if (tezPatch != null) {
+ // Scale-up: apply immediately
+ patches.put(tezAmComponentKey, tezPatch);
+ MANAGED_REPLICAS.put(tezKey, tezPatch);
+ }
+ }
}
}
@@ -362,8 +410,7 @@ private int getCurrentReplicas(KubernetesClient client,
String namespace,
} else {
workloadName = clusterName + "-" + component;
}
- if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
- || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
+ if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
var ss = client.apps().statefulSets()
.inNamespace(namespace).withName(workloadName).get();
return ss != null && ss.getSpec().getReplicas() != null ?
ss.getSpec().getReplicas() : 0;
@@ -375,17 +422,24 @@ private int getCurrentReplicas(KubernetesClient client,
String namespace,
}
}
+ /** Counts TezAM pods with active DAG work. */
+ private int countBusyPods(List<PodMetrics> tezMetrics) {
+ return (int) tezMetrics.stream()
+ .filter(pm -> TezAmScalingStrategy.hasActiveDag(pm.metrics()))
+ .count();
+ }
+
/**
* Patches each pod's deletion cost annotation based on its active session
count.
* Kubernetes uses this during scale-down to kill idle pods first (lower
cost = killed first).
* <p>
- * Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet
controller
+ * Only meaningful for Deployments (HS2, Metastore, TezAM) — the ReplicaSet
controller
* respects this annotation. StatefulSets ignore it and always terminate by
ordinal.
*/
private void updateDeploymentPodDeletionCost(KubernetesClient client, String
namespace,
- List<PodMetrics> metrics, String metricName) {
+ List<PodMetrics> metrics, ToIntFunction<PodMetrics> costFunction) {
for (PodMetrics pm : metrics) {
- int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue();
+ int cost = costFunction.applyAsInt(pm);
try {
client.pods().inNamespace(namespace).withName(pm.podName())
.edit(pod -> {
@@ -393,7 +447,7 @@ private void
updateDeploymentPodDeletionCost(KubernetesClient client, String nam
pod.getMetadata().setAnnotations(new java.util.HashMap<>());
}
pod.getMetadata().getAnnotations()
- .put("controller.kubernetes.io/pod-deletion-cost",
String.valueOf(sessions));
+ .put("controller.kubernetes.io/pod-deletion-cost",
String.valueOf(cost));
return pod;
});
} catch (Exception e) {
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
index c3aa8aa82ca..20d6474354e 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/MetricsScraper.java
@@ -132,7 +132,7 @@ private CompletableFuture<String> fetchMetricsAsync(String
podIp, int metricsPor
});
}
- private static boolean isPodReady(Pod pod) {
+ public static boolean isPodReady(Pod pod) {
if (pod.getStatus() == null || pod.getStatus().getConditions() == null) {
return false;
}
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
index babee17aa77..8940a80b193 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/PrometheusTextParser.java
@@ -24,8 +24,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-
/**
* Parses Prometheus text exposition format (from JMX Exporter /metrics).
* Only extracts metric name → value pairs; labels are stripped.
@@ -55,7 +53,7 @@ public static Map<String, Double> parseWithLabels(String
body) {
private static Map<String, Double> doParse(String body, boolean keepLabels) {
Map<String, Double> result = new HashMap<>();
- if (StringUtils.isEmpty(body)) {
+ if (body == null || body.isEmpty()) {
return result;
}
try (BufferedReader reader = new BufferedReader(new StringReader(body))) {
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
index bc43905a1b3..64a6f0d8306 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmScalingStrategy.java
@@ -18,7 +18,10 @@
package org.apache.hive.kubernetes.operator.autoscaling;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
@@ -32,10 +35,15 @@
* <p>
* Uses the per-target session metric from HS2:
hs2_llap_target_sessions_{llapName}.
* Falls back to hs2_open_sessions if per-target metrics are not available.
+ * <p>
+ * Scale-down uses {@link #METRIC_DAG_RUNNING} from each TezAM pod's scraped
+ * {@link PodMetrics} to rank idle AMs for safe termination.
*/
public class TezAmScalingStrategy implements ScalingStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(TezAmScalingStrategy.class);
+ public static final String METRIC_DAG_RUNNING = "tez_am_dag_running";
+ public static final int BUSY_DELETION_COST = Integer.MAX_VALUE;
private final HiveClusterAutoscaler orchestrator;
private final HiveCluster cluster;
@@ -115,4 +123,33 @@ public int lastMetricValue() {
public boolean usesScaleUpThreshold() {
return false;
}
+
+ public static boolean hasActiveDag(Map<String, Double> metrics) {
+ return metrics.getOrDefault(METRIC_DAG_RUNNING, 0.0) > 0;
+ }
+
+ public static Map<String, Integer> deletionCostsByPod(List<PodMetrics>
metrics) {
+ Map<String, Integer> costs = new HashMap<>();
+ int idleCost = 0;
+ for (PodMetrics pm : metrics) {
+ if (hasActiveDag(pm.metrics())) {
+ costs.put(pm.podName(), BUSY_DELETION_COST);
+ } else {
+ costs.put(pm.podName(), idleCost++);
+ }
+ }
+ return costs;
+ }
+
+ public static List<String> podsToRemove(List<PodMetrics> metrics,
+ Map<String, Integer> costs, int removeCount) {
+ if (removeCount <= 0) {
+ return List.of();
+ }
+ return metrics.stream()
+ .sorted(Comparator.comparingInt(pm -> costs.get(pm.podName())))
+ .limit(removeCount)
+ .map(PodMetrics::podName)
+ .toList();
+ }
}
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
new file mode 100644
index 00000000000..353b7fc8a9a
--- /dev/null
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregister.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * A {@link CuratorFramework} client is cached per HiveCluster and reused
across
+ * scale-downs; it is closed when the cluster is deleted or ZK config changes.
+ */
+public final class TezAmZkDeregister {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregister.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private record ZkClientRecord(String zkQuorum, int connTimeoutMs, int
sessionTimeoutMs,
+ int baseSleepMs, int maxRetries, CuratorFramework client) {}
+
+ private static final ConcurrentHashMap<String, ZkClientRecord> ZK_CLIENTS =
new ConcurrentHashMap<>();
+
+ private TezAmZkDeregister() {}
+
+ public static void deregisterIdlePods(String namespace, String clusterName,
String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames == null || idlePodNames.isEmpty()) {
+ return;
+ }
+ ZkClientRecord zkRecord = getZKRecord(namespace, clusterName, zkQuorum,
hiveSiteConfig);
+ CuratorFramework client = zkRecord.client();
+ try {
+ if (!client.blockUntilConnected(zkRecord.connTimeoutMs(),
TimeUnit.MILLISECONDS)) {
+ LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration",
llapName);
+ return;
+ }
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ if (client.checkExists().forPath(registryPath) == null) {
+ return;
+ }
+ for (String appId : client.getChildren().forPath(registryPath)) {
+ String nodePath = registryPath + PATH_SEPARATOR + appId;
+ byte[] data = client.getData().forPath(nodePath);
+ if (data == null || data.length == 0) {
+ continue;
+ }
+ String hostName = ConfigUtils.getJsonStringField(new String(data,
StandardCharsets.UTF_8), "hostName");
+ if (hostName != null) {
+ // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
+ String podName = hostName.contains(".") ? hostName.substring(0,
hostName.indexOf('.')) : hostName;
+ if (idlePodNames.contains(podName)) {
+ deleteRegistrationNode(client, llapName, nodePath, podName, appId);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName,
e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName,
e.getMessage());
+ }
+ }
+
+ private static ZkClientRecord getZKRecord(String namespace, String
clusterName,
+ String zkQuorum, Map<String, String> hiveSiteConfig) {
+ int connTimeoutMs = ConfigUtils.getTimeMs(hiveSiteConfig,
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_KEY,
+ ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT_MS);
+ int sessionTimeoutMs = ConfigUtils.getTimeMs(hiveSiteConfig,
ConfigUtils.HIVE_ZOOKEEPER_SESSION_TIMEOUT_KEY,
+ ConfigUtils.HIVE_ZOOKEEPER_SESSION_TIMEOUT_DEFAULT_MS);
+ int baseSleepMs = ConfigUtils.getTimeMs(hiveSiteConfig,
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_KEY,
+ ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_DEFAULT_MS);
+ int maxRetries = ConfigUtils.getInt(hiveSiteConfig,
ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_KEY,
+ null, ConfigUtils.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_DEFAULT);
+
+ synchronized (ZK_CLIENTS) {
+ String key = namespace + PATH_SEPARATOR + clusterName;
+ ZkClientRecord cachedRecord = ZK_CLIENTS.get(key);
+ if (cachedRecord != null && cachedRecord.zkQuorum().equals(zkQuorum)
+ && cachedRecord.connTimeoutMs() == connTimeoutMs
+ && cachedRecord.sessionTimeoutMs() == sessionTimeoutMs
+ && cachedRecord.baseSleepMs() == baseSleepMs
+ && cachedRecord.maxRetries() == maxRetries) {
+ return cachedRecord;
+ }
+ if (cachedRecord != null) {
+ closeClient(cachedRecord.client());
+ LOG.info("Recreating cached ZK client for {} (quorum or connection
settings changed)", key);
+ }
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ client.start();
+ ZkClientRecord zkRecord = new ZkClientRecord(zkQuorum, connTimeoutMs,
sessionTimeoutMs,
+ baseSleepMs, maxRetries, client);
+ ZK_CLIENTS.put(key, zkRecord);
+ return zkRecord;
+ }
+ }
+
+
+ private static void deleteRegistrationNode(CuratorFramework client, String
llapName,
+ String nodePath, String podName, String appId) {
+ try {
+ client.delete().forPath(nodePath);
+ LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will
stop routing to it",
+ llapName, podName, appId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted):
{}",
+ llapName, podName, e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName,
podName, e.getMessage());
+ }
+ }
+
+ private static void closeClient(CuratorFramework client) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing ZK client: {}", e.getMessage());
+ }
+ }
+
+ public static void cleanupCluster(String namespace, String clusterName) {
+ ZkClientRecord zkRecord = ZK_CLIENTS.remove(namespace + PATH_SEPARATOR +
clusterName);
+ if (zkRecord != null) {
+ closeClient(zkRecord.client());
+ }
+ }
+}
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
index 2315b455d76..8133f369e66 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java
@@ -855,9 +855,11 @@ private static String buildJmxExporterConfig(String
component) {
sb.append(" type: GAUGE\n");
break;
case ConfigUtils.COMPONENT_TEZAM:
- // TezAM DAG execution metrics
- sb.append("- pattern: 'Hadoop<service=TezAppMaster,
name=TezAppMaster><>(.+)'\n");
- sb.append(" name: tez_am_$1\n");
+ // LlapMetricsSystem registers beans under LlapTaskScheduler service
+ // SchedulerDagStatus tracks if the AM is running a dag or is idle so
exported as GAUGE.
+ String schedulerBean = "Hadoop<service=LlapTaskScheduler,
name=LlapTaskSchedulerMetrics.+><>";
+ sb.append("- pattern:
'").append(schedulerBean).append("SchedulerDagStatus'\n");
+ sb.append(" name: tez_am_dag_running\n");
sb.append(" type: GAUGE\n");
break;
default:
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
index 3afb0af118d..4ef8a21693e 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java
@@ -251,6 +251,7 @@ protected Deployment desired(HiveCluster hiveCluster,
.addToAnnotations("hive.apache.org/config-hash", configHash)
.endMetadata()
.withNewSpec()
+ .withServiceAccountName(spec.serviceAccountName())
.withInitContainers(initContainers)
.addNewContainer()
.withName(COMPONENT)
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
index 50490119716..1af7df3669b 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java
@@ -19,6 +19,7 @@
package org.apache.hive.kubernetes.operator.dependent;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,16 +32,25 @@
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.IntOrString;
+import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
+import io.fabric8.kubernetes.api.model.discovery.v1.Endpoint;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointBuilder;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSliceBuilder;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.kubernetes.operator.autoscaling.TezAmScalingStrategy;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
import org.apache.hive.kubernetes.operator.model.HiveClusterSpec;
import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
@@ -50,6 +60,8 @@
import org.apache.hive.kubernetes.operator.util.HiveConfigBuilder;
import org.apache.hive.kubernetes.operator.util.Labels;
+import static
org.apache.hive.kubernetes.operator.autoscaling.MetricsScraper.isPodReady;
+
/**
* Static builder methods for LLAP Kubernetes resources.
* Used by the reconciler to imperatively manage multiple LLAP clusters.
@@ -193,7 +205,7 @@ public static PodDisruptionBudget buildPdb(HiveCluster hc,
LlapSpec llap) {
// --- TezAM resource builders (one TezAM per LLAP cluster) ---
- /** TezAM StatefulSet name for a specific LLAP cluster. */
+ /** TezAM Deployment/Service name for a specific LLAP cluster. */
public static String tezAmResourceName(HiveCluster hc, LlapSpec llap) {
return hc.getMetadata().getName() + TEZAM_INFIX + llap.name();
}
@@ -229,9 +241,94 @@ public static PodDisruptionBudget
buildTezAmPdb(HiveCluster hc, LlapSpec llap) {
.build();
}
- /** Builds the TezAM StatefulSet for a specific LLAP cluster. */
- public static StatefulSet buildTezAmStatefulSet(HiveCluster hc, LlapSpec
llap, Integer replicas) {
- return INSTANCE.doBuildTezAmStatefulSet(hc, llap, replicas);
+ /** Builds the TezAM Deployment for a specific LLAP cluster. */
+ public static Deployment buildTezAmDeployment(HiveCluster hc, LlapSpec llap,
Integer replicas) {
+ return INSTANCE.doBuildTezAmDeployment(hc, llap, replicas);
+ }
+
+ /**
+ * Name for the operator-managed EndpointSlice that provides per-pod DNS for
TezAM.
+ * CoreDNS creates {@code <pod-name>.<svc>.<ns>.svc.cluster.local} A-records
using it.
+ */
+ public static String tezAmEndpointSliceName(HiveCluster hc, LlapSpec llap) {
+ return tezAmResourceName(hc, llap) + "-hostnames";
+ }
+
+ /**
+ * Builds a custom EndpointSlice for the TezAM headless Service.
+ * <p>
+ * Kubernetes only creates per-pod DNS records ({@code
<pod>.<svc>.<ns>.svc.cluster.local})
+ * when the Endpoints/EndpointSlice has a {@code hostname} field for each
address. The
+ * default EndpointSlice controller omits {@code hostname} for Deployment
pods (it only
+ * sets it automatically for StatefulSet pods). This operator-managed
EndpointSlice fills
+ * that gap, giving every ready TezAM pod a resolvable FQDN.
+ *
+ * @param pods list of TezAM pods
+ * @return the EndpointSlice, or {@code null} if there are no pod IPs yet or
if pods have mixed IPv4/IPv6 addresses
+ */
+ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec
llap, List<Pod> pods) {
+ String ns = hc.getMetadata().getNamespace();
+ String svcName = tezAmResourceName(hc, llap);
+ Map<String, String> labels = new HashMap<>(Map.of(
+ "kubernetes.io/service-name", svcName,
+ "endpointslice.kubernetes.io/managed-by", "hive-kubernetes-operator",
+ Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE,
+ Labels.APP_INSTANCE, hc.getMetadata().getName(),
+ Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM));
+
+ List<Endpoint> endpoints = new ArrayList<>();
+ String addressType = null;
+ for (var pod : pods) {
+ String ip = pod.getStatus() != null ? pod.getStatus().getPodIP() : null;
+ if (ip == null || ip.isEmpty()) {
+ continue;
+ }
+ String ipFamily = ipAddressType(ip);
+ if (addressType == null) {
+ addressType = ipFamily;
+ } else if (!addressType.equals(ipFamily)) {
+ return null;
+ }
+ boolean ready = isPodReady(pod);
+ String hostname = pod.getMetadata().getName();
+ if (hostname.length() > 63) {
+ hostname = StringUtils.stripEnd(hostname.substring(0, 63), "-");
+ }
+ endpoints.add(new EndpointBuilder()
+ .withHostname(hostname)
+ .withAddresses(ip)
+ .withNewConditions()
+ .withReady(ready)
+ .withServing(ready)
+ .withTerminating(false)
+ .endConditions()
+ .withNewTargetRef()
+ .withKind("Pod")
+ .withNamespace(ns)
+ .withName(pod.getMetadata().getName())
+ .endTargetRef()
+ .build());
+ }
+
+ if (endpoints.isEmpty()) {
+ return null;
+ }
+
+ return new EndpointSliceBuilder()
+ .withNewMetadata()
+ .withName(tezAmEndpointSliceName(hc, llap))
+ .withNamespace(ns)
+ .withLabels(labels)
+ .withOwnerReferences(ownerRef(hc))
+ .endMetadata()
+ .withAddressType(addressType)
+ .withEndpoints(endpoints)
+ .build();
+ }
+
+ /** Returns {@code IPv4} or {@code IPv6} for a pod IP string. */
+ private static String ipAddressType(String ip) {
+ return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4";
}
/** Builds the headless Service for a TezAM cluster. */
@@ -274,11 +371,11 @@ public static ConfigMap buildTezAmConfigMap(HiveCluster
hc, LlapSpec llap) {
// --- Private instance methods that use protected helpers from
HiveDependentResource ---
- private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster,
LlapSpec llap,
+ private Deployment doBuildTezAmDeployment(HiveCluster hiveCluster, LlapSpec
llap,
Integer replicas) {
HiveClusterSpec spec = hiveCluster.getSpec();
String ns = hiveCluster.getMetadata().getNamespace();
- String ssName = tezAmResourceName(hiveCluster, llap);
+ String deployName = tezAmResourceName(hiveCluster, llap);
Map<String, String> allLabels = Labels.forTezAmCluster(hiveCluster,
llap.name());
Map<String, String> selectorLabels =
Labels.selectorForTezAmCluster(hiveCluster, llap.name());
@@ -323,22 +420,26 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster
hiveCluster, LlapSpec ll
replaceConfMountWithSubPaths(volumeMounts, HIVE_CONFIG_VOLUME,
"hive-site.xml", "tez-site.xml", "core-site.xml");
+ AutoscalingSpec tezAutoscaling = llap.tezAm().autoscaling();
+ if (tezAutoscaling.isEnabled()) {
+ addJmxExporter(spec.image(), ConfigUtils.COMPONENT_TEZAM,
tezAutoscaling.metricsPort(),
+ initContainers, volumeMounts, volumes, envVars, ports);
+ }
+
String configHash = sha256(
HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHiveServer2HiveSite(hiveCluster,
spec)),
HadoopXmlBuilder.buildXml(HiveConfigBuilder.getTezSite(spec, llap)),
HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHadoopCoreSite(spec)));
- StatefulSet statefulSet = new StatefulSetBuilder()
+ Deployment deployment = new DeploymentBuilder()
.withNewMetadata()
- .withName(ssName)
+ .withName(deployName)
.withNamespace(ns)
.withLabels(allLabels)
.withOwnerReferences(ownerRef(hiveCluster))
.endMetadata()
.withNewSpec()
.withReplicas(replicas)
- .withPodManagementPolicy("Parallel")
- .withServiceName(ssName)
.withNewSelector()
.withMatchLabels(selectorLabels)
.endSelector()
@@ -350,6 +451,8 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster
hiveCluster, LlapSpec ll
.addToAnnotations("hive.apache.org/config-hash", configHash)
.endMetadata()
.withNewSpec()
+ .withServiceAccountName(spec.serviceAccountName())
+ .withSubdomain(deployName)
.withInitContainers(initContainers)
.addNewContainer()
.withName(ConfigUtils.COMPONENT_TEZAM)
@@ -367,13 +470,31 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster
hiveCluster, LlapSpec ll
.build();
applySpreadAffinityIfAbsent(
- statefulSet.getSpec().getTemplate().getSpec(), selectorLabels);
+ deployment.getSpec().getTemplate().getSpec(), selectorLabels);
- appendUserVolumes(statefulSet.getSpec().getTemplate().getSpec(),
+ appendUserVolumes(deployment.getSpec().getTemplate().getSpec(),
spec.volumes(), spec.volumeMounts(),
spec.tezAm().extraVolumes(), spec.tezAm().extraVolumeMounts());
- return statefulSet;
+ // When autoscaling is enabled, add a preStop hook that waits for any
in-flight DAG
+ // to complete before exiting. The operator has already deleted the ZK
registration
+ // node (see HiveClusterAutoscaler) so no new DAGs can arrive. If a DAG
arrives in
+ // via the brief race window before ZK delete, we wait for it to finish.
+ // Once tez_am_dag_running reaches 0, the hook exits and Kubernetes
terminates the pod.
+ if (tezAutoscaling.isEnabled()) {
+ String preStopScript = buildDrainScript(
+ "Waiting for active DAG to complete",
+ TezAmScalingStrategy.METRIC_DAG_RUNNING, "DAG",
+ "TezAM is idle. preStop complete, K8s will terminate pod.",
+ 10, 6, null, tezAutoscaling.metricsPort());
+ applyAutoscalingLifecycle(
+ deployment.getSpec().getTemplate().getSpec(),
+ deployment.getSpec().getTemplate().getMetadata(),
+ preStopScript, tezAutoscaling.gracePeriodSeconds(),
+ tezAutoscaling.metricsScrapeIntervalSeconds());
+ }
+
+ return deployment;
}
private StatefulSet doBuildStatefulSet(HiveCluster hiveCluster, LlapSpec
llap, Integer replicas) {
@@ -478,6 +599,7 @@ private StatefulSet doBuildStatefulSet(HiveCluster
hiveCluster, LlapSpec llap, I
.addToAnnotations("hive.apache.org/config-hash", configHash)
.endMetadata()
.withNewSpec()
+ .withServiceAccountName(spec.serviceAccountName())
.withInitContainers(initContainers)
.addNewContainer()
.withName(ConfigUtils.COMPONENT_LLAP)
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
index ff19afd5c02..7a8616c3559 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java
@@ -161,6 +161,7 @@ protected Deployment desired(HiveCluster hiveCluster,
.addToAnnotations("hive.apache.org/config-hash", configHash)
.endMetadata()
.withNewSpec()
+ .withServiceAccountName(spec.serviceAccountName())
.withInitContainers(initContainers)
.addNewContainer()
.withName(COMPONENT)
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
index fb4b588401c..d68af96e4f0 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java
@@ -128,6 +128,7 @@ protected Job desired(HiveCluster hiveCluster,
hiveCluster, COMPONENT))
.endMetadata()
.withNewSpec()
+ .withServiceAccountName(spec.serviceAccountName())
.withRestartPolicy("OnFailure")
.withInitContainers(initContainers)
.addNewContainer()
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
index b9c0faf42c5..cb1428826bb 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java
@@ -82,6 +82,9 @@ public record HiveClusterSpec(
+ "(e.g., mounting a GCS key file at /etc/gcs/key.json)")
@SchemaFrom(type = Object[].class) @PreserveUnknownFields
List<VolumeMount> volumeMounts,
+ @JsonPropertyDescription("Kubernetes ServiceAccount name for all component
pods. "
+ + "If not specified, pods use the namespace default service account.")
+ String serviceAccountName,
@JsonPropertyDescription("Auto-suspend configuration. When enabled and all
components "
+ "are idle for the configured timeout, the cluster scales to 0
replicas.")
AutoSuspendSpec autoSuspend,
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
index 1bdc6a4ebe2..1afb813aad4 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java
@@ -30,9 +30,11 @@
import io.fabric8.kubernetes.api.model.Condition;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -312,7 +314,7 @@ private HiveClusterStatus buildStatus(HiveCluster resource,
? perLlapTezAm.autoscaling().minReplicas()
: perLlapTezAm.replicas();
tezAmStatuses.put(llapSpec.name(),
- buildComponentStatus(context, StatefulSet.class, tezAmSsName,
+ buildComponentStatus(context, Deployment.class, tezAmSsName,
perLlapTezAm.replicas(), tezAmMin));
}
status.setTezAmClusters(tezAmStatuses);
@@ -562,8 +564,7 @@ private void patchReplicas(KubernetesClient client,
HiveCluster resource,
workloadName = resource.getMetadata().getName() + "-" + component;
}
try {
- if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
- || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
+ if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
client.apps().statefulSets().inNamespace(namespace).withName(workloadName).scale(replicas);
} else {
client.apps().deployments().inNamespace(namespace).withName(workloadName).scale(replicas);
@@ -586,7 +587,8 @@ private void patchSuspendSpec(KubernetesClient client,
HiveCluster resource, boo
oldSpec.hiveServer2(), oldSpec.llapClusters(),
oldSpec.llapClusterRouting(),
oldSpec.tezAm(), oldSpec.zookeeper(),
oldSpec.hadoop(), oldSpec.envVars(), oldSpec.externalJars(),
- oldSpec.volumes(), oldSpec.volumeMounts(),
oldSpec.autoSuspend(), suspend);
+ oldSpec.volumes(), oldSpec.volumeMounts(),
oldSpec.serviceAccountName(),
+ oldSpec.autoSuspend(), suspend);
hc.setSpec(newSpec);
return hc;
});
@@ -642,10 +644,11 @@ private void reconcileLlapClusters(HiveCluster resource,
KubernetesClient client
client.services().inNamespace(ns)
.resource(LlapResourceBuilder.buildTezAmService(resource,
llapSpec))
.serverSideApply();
- client.apps().statefulSets().inNamespace(ns)
- .resource(LlapResourceBuilder.buildTezAmStatefulSet(resource,
llapSpec, tezAmReplicas))
+ client.apps().deployments().inNamespace(ns)
+ .resource(LlapResourceBuilder.buildTezAmDeployment(resource,
llapSpec, tezAmReplicas))
.forceConflicts()
.serverSideApply();
+ reconcileTezAmEndpointSlice(resource, client, llapSpec);
if (llapSpec.tezAm().autoscaling().isEnabled()) {
client.policy().v1().podDisruptionBudget().inNamespace(ns)
.resource(LlapResourceBuilder.buildTezAmPdb(resource, llapSpec))
@@ -743,24 +746,56 @@ private void garbageCollectLlapResources(KubernetesClient
client, String ns,
Labels.APP_INSTANCE, clusterName,
Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM);
-
client.apps().statefulSets().inNamespace(ns).withLabels(tezamSelector).list().getItems()
+
client.apps().deployments().inNamespace(ns).withLabels(tezamSelector).list().getItems()
.stream()
- .filter(ss -> {
- String llapName =
ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
+ .filter(d -> {
+ String llapName =
d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
return llapName != null && !desiredNames.contains(llapName);
})
- .forEach(ss -> {
- String llapName =
ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
+ .forEach(d -> {
+ String llapName =
d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER);
LOG.info("Garbage-collecting TezAM for LLAP cluster '{}' in {}/{}",
llapName, ns, clusterName);
-
client.apps().statefulSets().inNamespace(ns).withName(ss.getMetadata().getName()).delete();
-
client.services().inNamespace(ns).withName(ss.getMetadata().getName()).delete();
+
client.apps().deployments().inNamespace(ns).withName(d.getMetadata().getName()).delete();
+
client.services().inNamespace(ns).withName(d.getMetadata().getName()).delete();
client.configMaps().inNamespace(ns)
- .withName(ss.getMetadata().getName() + "-config").delete();
+ .withName(d.getMetadata().getName() + "-config").delete();
client.policy().v1().podDisruptionBudget().inNamespace(ns)
- .withName(ss.getMetadata().getName() + "-pdb").delete();
+ .withName(d.getMetadata().getName() + "-pdb").delete();
+ client.discovery().v1().endpointSlices().inNamespace(ns)
+ .withName(d.getMetadata().getName() + "-hostnames").delete();
});
}
+ /**
+ * Maintains an operator-managed EndpointSlice for the TezAM headless
Service.
+ * The default EndpointSlice controller does not set the {@code hostname}
field for
+ * Deployment pods, so per-pod DNS records are not created by CoreDNS. This
method
+ * creates/updates an EndpointSlice (with
managed-by=hive-kubernetes-operator)
+ * that includes {@code hostname} for each ready TezAM pod, giving CoreDNS
the data
+ * it needs to serve {@code <pod>.<svc>.<ns>.svc.cluster.local} A-records.
+ */
+ private void reconcileTezAmEndpointSlice(HiveCluster resource,
KubernetesClient client, LlapSpec llapSpec) {
+ String ns = resource.getMetadata().getNamespace();
+ Map<String, String> selector = Labels.selectorForTezAmCluster(resource,
llapSpec.name());
+ List<Pod> pods =
client.pods().inNamespace(ns).withLabels(selector).list().getItems();
+ EndpointSlice slice =
LlapResourceBuilder.buildTezAmEndpointSlice(resource, llapSpec, pods);
+ String sliceName = LlapResourceBuilder.tezAmEndpointSliceName(resource,
llapSpec);
+ if (slice == null) {
+
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).delete();
+ return;
+ }
+ var existing =
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).get();
+ if (existing != null && existing.getAddressType() != null
+ && !existing.getAddressType().equals(slice.getAddressType())) {
+
client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).withGracePeriod(0L).delete();
+ }
+
+ client.discovery().v1().endpointSlices().inNamespace(ns)
+ .resource(slice)
+ .forceConflicts()
+ .serverSideApply();
+ }
+
// --- Auto-Suspend / Wake ---
enum SuspendAction { RUNNING, IDLE_START, IDLE_WAITING, SUSPEND_NOW,
STAY_SUSPENDED, WAKE }
@@ -829,7 +864,7 @@ private boolean isClusterIdle(HiveCluster resource,
KubernetesClient client) {
if (spec.tezAm().isEnabled()) {
for (var llap : spec.llapClusters()) {
if (llap.isEnabled()
- && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(),
true,
+ && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(),
false,
llap.tezAm().autoscaling().minReplicas())) {
return false;
}
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
index 6dbc38f9b67..a86a4fb9c50 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/ConfigUtils.java
@@ -20,8 +20,14 @@
import java.util.Map;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
public final class ConfigUtils {
+ private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
private ConfigUtils() {
}
@@ -71,6 +77,9 @@ public static String tezAmComponentKey(String llapName) {
public static final String HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS_KEY =
"hive.server2.tez.use.external.sessions";
+ public static final String HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS_KEY =
+ "hive.server2.tez.initialize.default.sessions";
+
public static final String HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE_KEY =
"hive.server2.tez.external.sessions.namespace";
@@ -85,6 +94,18 @@ public static String tezAmComponentKey(String llapName) {
public static final String HIVE_ZOOKEEPER_QUORUM_KEY =
"hive.zookeeper.quorum";
+ public static final String HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_KEY =
"hive.zookeeper.connection.timeout";
+ public static final int HIVE_ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT_MS = 15000;
+
+ public static final String HIVE_ZOOKEEPER_SESSION_TIMEOUT_KEY =
"hive.zookeeper.session.timeout";
+ public static final int HIVE_ZOOKEEPER_SESSION_TIMEOUT_DEFAULT_MS = 120000;
+
+ public static final String HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_KEY =
"hive.zookeeper.connection.basesleeptime";
+ public static final int HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME_DEFAULT_MS =
1000;
+
+ public static final String HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_KEY =
"hive.zookeeper.connection.max.retries";
+ public static final int HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES_DEFAULT = 3;
+
public static final String HIVE_EXECUTION_MODE_KEY = "hive.execution.mode";
public static final String HIVE_LLAP_EXECUTION_MODE_KEY =
"hive.llap.execution.mode";
@@ -198,6 +219,31 @@ public static int getInt(Map<String, String> overrides,
return defaultVal;
}
+ public static int getTimeMs(Map<String, String> overrides, String key, int
defaultMs) {
+ if (overrides == null) {
+ return defaultMs;
+ }
+ String val = overrides.get(key);
+ if (val == null) {
+ return defaultMs;
+ }
+ val = val.trim();
+ try {
+ if (val.endsWith("ms")) {
+ return Integer.parseInt(val.substring(0, val.length() - 2).trim());
+ }
+ if (val.endsWith("s")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 1000);
+ }
+ if (val.endsWith("m")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 60000);
+ }
+ return Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ return defaultMs;
+ }
+ }
+
public static boolean getBoolean(Map<String, String> overrides,
String key, boolean defaultVal) {
if (overrides != null) {
@@ -208,4 +254,16 @@ public static boolean getBoolean(Map<String, String>
overrides,
}
return defaultVal;
}
+
+ public static String getJsonStringField(String json, String fieldName) {
+ if (json == null || json.isBlank()) {
+ return null;
+ }
+ try {
+ JsonNode field = JSON_MAPPER.readTree(json).get(fieldName);
+ return field != null && field.isTextual() ? field.asText() : null;
+ } catch (JsonProcessingException e) {
+ return null;
+ }
+ }
}
diff --git
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
index cfcfbacc888..5969d6b9e25 100644
---
a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
+++
b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/util/HiveConfigBuilder.java
@@ -146,6 +146,16 @@ public static Map<String, String> getHiveServer2HiveSite(
if (spec.hiveServer2().configOverrides() != null) {
props.putAll(spec.hiveServer2().configOverrides());
}
+
+ // Default Tez sessions with multi-tenancy is not supported, and the
respective config would be overridden.
+ // In case of single llap cluster, default tez sessions is supported.
+ long enabledLlapClusters = spec.llapClusters().stream()
+ .filter(LlapSpec::isEnabled)
+ .count();
+ if (enabledLlapClusters > 1) {
+ props.put(ConfigUtils.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS_KEY,
"false");
+ }
+
return props;
}