ayushtkn commented on code in PR #6561:
URL: https://github.com/apache/hive/pull/6561#discussion_r3480941222
##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java:
##########
@@ -38,7 +38,8 @@ public enum LlapTaskSchedulerInfo implements MetricsInfo {
SchedulerRunningTaskCount("Total number of running tasks"),
SchedulerPendingPreemptionTaskCount("Total number of tasks pending for
pre-emption"),
SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
- SchedulerCompletedDagCount("Number of DAGs completed");
+ SchedulerCompletedDagCount("Number of DAGs completed"),
Review Comment:
don't change the name here, it would be incompatible
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
Review Comment:
Define these configs in `ConfigUtils`
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
+
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ try {
+ client.start();
+ if (!client.blockUntilConnected(connTimeoutMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration",
llapName);
+ return;
+ }
+ if (client.checkExists().forPath(registryPath) == null) {
+ return;
+ }
+ for (String appId : client.getChildren().forPath(registryPath)) {
+ String nodePath = registryPath + PATH_SEPARATOR + appId;
+ byte[] data = client.getData().forPath(nodePath);
+ if (data == null || data.length == 0) {
+ continue;
+ }
+ String hostName = extractHostName(new String(data,
StandardCharsets.UTF_8));
+ if (hostName != null) {
+ // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
Review Comment:
I am not sure it is always true, it is like docker desktop thing I belive,
or can be changed
##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java:
##########
@@ -38,7 +38,8 @@ public enum LlapTaskSchedulerInfo implements MetricsInfo {
SchedulerRunningTaskCount("Total number of running tasks"),
SchedulerPendingPreemptionTaskCount("Total number of tasks pending for
pre-emption"),
SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
- SchedulerCompletedDagCount("Number of DAGs completed");
+ SchedulerCompletedDagCount("Number of DAGs completed"),
+ SchedulerDagRunning("Binary that represents if the AM is idle or running a
DAG");
Review Comment:
Why `Binary`? It would be like only we know what is 1 what is 0 and this
isn't a `Binary that represents if the AM is idle or running a DAG` too big,
maybe be `DAG Status` should be the name
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.util.Map;
+
+/**
+ * Interprets TezAM JMX Exporter metrics to determine whether an AM is busy
executing
+ * a DAG or idle and safe to remove during scale-down.
+ * <p>
+ * Signal: {@code tez_am_dag_running} (SchedulerDagRunning gauge in
LlapTaskSchedulerMetrics).
+ * Set to 1 when the AM receives its first task for a new DAG, cleared to 0 in
dagComplete().
+ */
+public final class TezAmBusyMetrics {
Review Comment:
Why is it required, why aren't we using `PodMetrics` which we use for all
services
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java:
##########
@@ -193,7 +203,7 @@ public static PodDisruptionBudget buildPdb(HiveCluster hc,
LlapSpec llap) {
// --- TezAM resource builders (one TezAM per LLAP cluster) ---
- /** TezAM StatefulSet name for a specific LLAP cluster. */
+ /** TezAM Deployment/Service name for a specific LLAP cluster. */
Review Comment:
I know what is `Deployment`, what does Service here means? isn't Service
TezAM?
##########
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java:
##########
@@ -1080,6 +1080,9 @@ public void dagComplete() {
writeLock.lock();
try {
dagRunning = false;
+ if (metrics != null) {
+ metrics.setDagRunning(false);
+ }
Review Comment:
I am curious, if the DAG doesn't complete but fails due to whatever reason
who sets this `DagRunning` to `false` after it was set to `true`
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
+
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ try {
+ client.start();
Review Comment:
a new `CuratorFramework` client is built, started, and closed every time
`deregisterIdlePods` is called. Because the `autoscaler` evaluates state
periodically (likely every few seconds), this will cause significant connection
churn and overhead on the ZooKeeper quorum. Can we consider caching the client
per HiveCluster/ZK-quorum inside the operator, reusing the connection pool, and
only closing it when the cluster is deleted or reconfigured.
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
+
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ try {
+ client.start();
+ if (!client.blockUntilConnected(connTimeoutMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration",
llapName);
+ return;
+ }
+ if (client.checkExists().forPath(registryPath) == null) {
+ return;
+ }
+ for (String appId : client.getChildren().forPath(registryPath)) {
+ String nodePath = registryPath + PATH_SEPARATOR + appId;
+ byte[] data = client.getData().forPath(nodePath);
+ if (data == null || data.length == 0) {
+ continue;
+ }
+ String hostName = extractHostName(new String(data,
StandardCharsets.UTF_8));
+ if (hostName != null) {
+ // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
+ String podName = hostName.contains(".") ? hostName.substring(0,
hostName.indexOf('.')) : hostName;
+ if (idlePodNames.contains(podName)) {
+ deleteRegistrationNode(client, llapName, nodePath, podName, appId);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName,
e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName,
e.getMessage());
+ } finally {
+ client.close();
+ }
+ }
+
+ private static void deleteRegistrationNode(CuratorFramework client, String
llapName,
+ String nodePath, String podName, String appId) {
+ try {
+ client.delete().forPath(nodePath);
+ LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will
stop routing to it",
+ llapName, podName, appId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted):
{}",
+ llapName, podName, e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName,
podName, e.getMessage());
+ }
+ }
+
+ /**
+ * Reads a time value (in milliseconds) from the config map.
+ * If the key is absent or un-parseable the defaultMs value is returned.
+ */
+ static int getTimeMs(Map<String, String> config, String key, int defaultMs) {
+ if (config == null) {
+ return defaultMs;
+ }
+ String val = config.get(key);
+ if (val == null) {
+ return defaultMs;
+ }
+ val = val.trim();
+ try {
+ if (val.endsWith("ms")) {
+ return Integer.parseInt(val.substring(0, val.length() - 2).trim());
+ }
+ if (val.endsWith("s")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 1_000);
+ }
+ if (val.endsWith("m")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 60_000);
+ }
+ return Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key,
val, defaultMs);
+ return defaultMs;
+ }
+ }
Review Comment:
move to `ConfigUtils`
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
+
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ try {
+ client.start();
+ if (!client.blockUntilConnected(connTimeoutMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration",
llapName);
+ return;
+ }
+ if (client.checkExists().forPath(registryPath) == null) {
+ return;
+ }
+ for (String appId : client.getChildren().forPath(registryPath)) {
+ String nodePath = registryPath + PATH_SEPARATOR + appId;
+ byte[] data = client.getData().forPath(nodePath);
+ if (data == null || data.length == 0) {
+ continue;
+ }
+ String hostName = extractHostName(new String(data,
StandardCharsets.UTF_8));
+ if (hostName != null) {
+ // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
+ String podName = hostName.contains(".") ? hostName.substring(0,
hostName.indexOf('.')) : hostName;
+ if (idlePodNames.contains(podName)) {
+ deleteRegistrationNode(client, llapName, nodePath, podName, appId);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName,
e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName,
e.getMessage());
+ } finally {
+ client.close();
+ }
+ }
+
+ private static void deleteRegistrationNode(CuratorFramework client, String
llapName,
+ String nodePath, String podName, String appId) {
+ try {
+ client.delete().forPath(nodePath);
+ LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will
stop routing to it",
+ llapName, podName, appId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted):
{}",
+ llapName, podName, e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName,
podName, e.getMessage());
+ }
+ }
+
+ /**
+ * Reads a time value (in milliseconds) from the config map.
+ * If the key is absent or un-parseable the defaultMs value is returned.
+ */
+ static int getTimeMs(Map<String, String> config, String key, int defaultMs) {
+ if (config == null) {
+ return defaultMs;
+ }
+ String val = config.get(key);
+ if (val == null) {
+ return defaultMs;
+ }
+ val = val.trim();
+ try {
+ if (val.endsWith("ms")) {
+ return Integer.parseInt(val.substring(0, val.length() - 2).trim());
+ }
+ if (val.endsWith("s")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 1_000);
+ }
+ if (val.endsWith("m")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 60_000);
+ }
+ return Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key,
val, defaultMs);
+ return defaultMs;
+ }
+ }
+
+ static int getInt(Map<String, String> config, String key, int defaultVal) {
+ if (config == null) {
+ return defaultVal;
+ }
+ String val = config.get(key);
+ if (val == null) {
+ return defaultVal;
+ }
+ try {
+ return Integer.parseInt(val.trim());
+ } catch (NumberFormatException e) {
+ LOG.debug("Unparseable ZK config '{}' = '{}', using default {}", key,
val, defaultVal);
+ return defaultVal;
+ }
+ }
+
+ static String extractHostName(String json) {
+ String marker = "\"hostName\":\"";
+ int start = json.indexOf(marker);
+ if (start < 0) {
+ return null;
+ }
+ start += marker.length();
+ int end = json.indexOf('"', start);
+ return end > start ? json.substring(start, end) : null;
+ }
Review Comment:
move it to utils class, I am not sure this is the correct way for parson
Json, if space or something won't break your parsing, `Jackson` is already
there in the classpath
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TezAmZkDeregistrar.class);
+ private static final String PATH_SEPARATOR = "/";
+
+ private TezAmZkDeregistrar() {}
+
+ /**
+ * Deletes ZK registration nodes for the given idle TezAM pods.
+ * Failures are logged as warnings and do not block the scale-down — the
+ * preStop drain on the pod provides a safety net.
+ *
+ * @param zkQuorum ZooKeeper connection string
+ * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK
namespace
+ * @param idlePodNames pod names that are idle and about to be removed
+ * @param hiveSiteConfig HS2 configOverrides map — used to read ZK
connection settings
+ * using the same keys as {@code
ZookeeperExternalSessionsRegistryClient}
+ */
+ public static void deregisterIdlePods(String zkQuorum, String llapName,
+ List<String> idlePodNames, Map<String, String> hiveSiteConfig) {
+ if (idlePodNames.isEmpty()) {
+ return;
+ }
+ int connTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.timeout", 15000);
+ int sessionTimeoutMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.session.timeout", 120000);
+ int baseSleepMs = getTimeMs(hiveSiteConfig,
"hive.zookeeper.connection.basesleeptime", 1000);
+ int maxRetries = getInt(hiveSiteConfig,
"hive.zookeeper.connection.max.retries", 3);
+
+ String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX +
PATH_SEPARATOR + llapName;
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkQuorum)
+ .connectionTimeoutMs(connTimeoutMs)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries))
+ .build();
+ try {
+ client.start();
+ if (!client.blockUntilConnected(connTimeoutMs, TimeUnit.MILLISECONDS)) {
+ LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration",
llapName);
+ return;
+ }
+ if (client.checkExists().forPath(registryPath) == null) {
+ return;
+ }
+ for (String appId : client.getChildren().forPath(registryPath)) {
+ String nodePath = registryPath + PATH_SEPARATOR + appId;
+ byte[] data = client.getData().forPath(nodePath);
+ if (data == null || data.length == 0) {
+ continue;
+ }
+ String hostName = extractHostName(new String(data,
StandardCharsets.UTF_8));
+ if (hostName != null) {
+ // hostName = "<podName>.<svcName>.<ns>.svc.cluster.local"
+ String podName = hostName.contains(".") ? hostName.substring(0,
hostName.indexOf('.')) : hostName;
+ if (idlePodNames.contains(podName)) {
+ deleteRegistrationNode(client, llapName, nodePath, podName, appId);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName,
e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName,
e.getMessage());
+ } finally {
+ client.close();
+ }
+ }
+
+ private static void deleteRegistrationNode(CuratorFramework client, String
llapName,
+ String nodePath, String podName, String appId) {
+ try {
+ client.delete().forPath(nodePath);
+ LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will
stop routing to it",
+ llapName, podName, appId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted):
{}",
+ llapName, podName, e.getMessage());
+ } catch (Exception e) {
+ LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName,
podName, e.getMessage());
+ }
+ }
+
+ /**
+ * Reads a time value (in milliseconds) from the config map.
+ * If the key is absent or un-parseable the defaultMs value is returned.
+ */
+ static int getTimeMs(Map<String, String> config, String key, int defaultMs) {
+ if (config == null) {
+ return defaultMs;
+ }
+ String val = config.get(key);
+ if (val == null) {
+ return defaultMs;
+ }
+ val = val.trim();
+ try {
+ if (val.endsWith("ms")) {
+ return Integer.parseInt(val.substring(0, val.length() - 2).trim());
+ }
+ if (val.endsWith("s")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 1_000);
+ }
+ if (val.endsWith("m")) {
+ return (int) (Double.parseDouble(val.substring(0, val.length() -
1).trim()) * 60_000);
+ }
+ return Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key,
val, defaultMs);
+ return defaultMs;
+ }
+ }
+
+ static int getInt(Map<String, String> config, String key, int defaultVal) {
+ if (config == null) {
+ return defaultVal;
+ }
+ String val = config.get(key);
+ if (val == null) {
+ return defaultVal;
+ }
+ try {
+ return Integer.parseInt(val.trim());
+ } catch (NumberFormatException e) {
+ LOG.debug("Unparseable ZK config '{}' = '{}', using default {}", key,
val, defaultVal);
+ return defaultVal;
+ }
Review Comment:
`ConfigUtils` has this I believe
```
public static int getInt(Map<String, String> overrides,
String key, String altKey, int defaultVal) {
```
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.kubernetes.operator.autoscaling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hive.kubernetes.operator.util.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is
applied.
+ * <p>
+ * Each TezAM registers at {@code
/tez-external-sessions/<llapName>/<applicationId>}.
+ * When we delete these nodes, every HS2 instance (including all replicas in an
+ * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and
immediately
+ * stops routing new sessions to those AMs. This prevents new DAGs from
arriving
+ * on a pod that is about to be terminated.
+ * <p>
+ * Only the registration node is deleted — the ephemeral claim node
+ * ({@code /tez-external-sessions/<llapName>-claims/<applicationId>}) is not
touched;
+ * it belongs to the HS2 ZK session and disappears naturally when HS2 releases
it.
+ * <p>
+ * ZK connection parameters are read from the cluster's HS2 configOverrides
using the
+ * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from
HiveConf:
+ * <ul>
+ * <li>{@code hive.zookeeper.connection.timeout} (default: 15s → 15000
ms)</li>
+ * <li>{@code hive.zookeeper.session.timeout} (default: 120000ms)</li>
+ * <li>{@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)</li>
+ * <li>{@code hive.zookeeper.connection.max.retries} (default: 3)</li>
+ * </ul>
+ */
+public final class TezAmZkDeregistrar {
Review Comment:
We need a seperate class for deregistering TezAM :-)
##########
packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java:
##########
@@ -229,9 +239,102 @@ public static PodDisruptionBudget
buildTezAmPdb(HiveCluster hc, LlapSpec llap) {
.build();
}
- /** Builds the TezAM StatefulSet for a specific LLAP cluster. */
- public static StatefulSet buildTezAmStatefulSet(HiveCluster hc, LlapSpec
llap, Integer replicas) {
- return INSTANCE.doBuildTezAmStatefulSet(hc, llap, replicas);
+ /** Builds the TezAM Deployment for a specific LLAP cluster. */
+ public static Deployment buildTezAmDeployment(HiveCluster hc, LlapSpec llap,
Integer replicas) {
+ return INSTANCE.doBuildTezAmDeployment(hc, llap, replicas);
+ }
+
+ /**
+ * Name for the operator-managed EndpointSlice that provides per-pod DNS for
TezAM.
+ * CoreDNS creates {@code <pod-name>.<svc>.<ns>.svc.cluster.local} A-records
using it.
+ */
+ public static String tezAmEndpointSliceName(HiveCluster hc, LlapSpec llap) {
+ return tezAmResourceName(hc, llap) + "-hostnames";
+ }
+
+ /**
+ * Builds a custom EndpointSlice for the TezAM headless Service.
+ * <p>
+ * Kubernetes only creates per-pod DNS records ({@code
<pod>.<svc>.<ns>.svc.cluster.local})
+ * when the Endpoints/EndpointSlice has a {@code hostname} field for each
address. The
+ * default EndpointSlice controller omits {@code hostname} for Deployment
pods (it only
+ * sets it automatically for StatefulSet pods). This operator-managed
EndpointSlice fills
+ * that gap, giving every ready TezAM pod a resolvable FQDN.
+ *
+ * @param pods list of TezAM pods
+ * @return the EndpointSlice, or {@code null} if there are no pod IPs yet or
if pods have mixed IPv4/IPv6 addresses
+ */
+ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec
llap, List<Pod> pods) {
+ String ns = hc.getMetadata().getNamespace();
+ String svcName = tezAmResourceName(hc, llap);
+ Map<String, String> labels = new HashMap<>(Map.of(
+ "kubernetes.io/service-name", svcName,
+ "endpointslice.kubernetes.io/managed-by", "hive-kubernetes-operator",
+ Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE,
+ Labels.APP_INSTANCE, hc.getMetadata().getName(),
+ Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM));
+
+ List<Endpoint> endpoints = new ArrayList<>();
+ String addressType = null;
+ for (var pod : pods) {
+ String ip = pod.getStatus() != null ? pod.getStatus().getPodIP() : null;
+ if (ip == null || ip.isEmpty()) {
+ continue;
+ }
+ String ipFamily = ipAddressType(ip);
+ if (addressType == null) {
+ addressType = ipFamily;
+ } else if (!addressType.equals(ipFamily)) {
+ return null;
+ }
+ boolean ready = isPodReady(pod);
+ String hostname = pod.getMetadata().getName();
+ if (hostname.length() > 63) {
+ hostname = StringUtils.stripEnd(hostname.substring(0, 63), "-");
+ }
+ endpoints.add(new EndpointBuilder()
+ .withHostname(hostname)
+ .withAddresses(ip)
+ .withNewConditions()
+ .withReady(ready)
+ .withServing(ready)
+ .withTerminating(false)
+ .endConditions()
+ .withNewTargetRef()
+ .withKind("Pod")
+ .withNamespace(ns)
+ .withName(pod.getMetadata().getName())
+ .endTargetRef()
+ .build());
+ }
+
+ if (endpoints.isEmpty()) {
+ return null;
+ }
+
+ return new EndpointSliceBuilder()
+ .withNewMetadata()
+ .withName(tezAmEndpointSliceName(hc, llap))
+ .withNamespace(ns)
+ .withLabels(labels)
+ .withOwnerReferences(ownerRef(hc))
+ .endMetadata()
+ .withAddressType(addressType)
+ .withEndpoints(endpoints)
+ .build();
+ }
+
+ /** Returns {@code IPv4} or {@code IPv6} for a pod IP string. */
+ private static String ipAddressType(String ip) {
+ return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4";
+ }
+
+ private static boolean isPodReady(io.fabric8.kubernetes.api.model.Pod pod) {
Review Comment:
metrics scrapper has this method, we can reuse. We can import it
`io.fabric8.kubernetes.api.model.` instead of prefix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]