Copilot commented on code in PR #18843:
URL: https://github.com/apache/druid/pull/18843#discussion_r2762482489


##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDiscoveryConfig.java:
##########
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration for Consul-based service discovery.
+ */
+public class ConsulDiscoveryConfig
+{
+  private static final Logger LOGGER = new Logger(ConsulDiscoveryConfig.class);
+  private static final long MIN_LEADER_SESSION_TTL_SECONDS = 10;
+
+  @JsonProperty("connection")
+  private final ConnectionConfig connection;
+
+  @JsonProperty("auth")
+  private final AuthConfig auth;
+
+  @JsonProperty("service")
+  private final ServiceConfig service;
+
+  @JsonProperty("leader")
+  private final LeaderElectionConfig leader;
+
+  @JsonProperty("watch")
+  private final WatchConfig watch;
+
+  @JsonCreator
+  public static ConsulDiscoveryConfig create(
+      @JsonProperty("connection") @Nullable ConnectionConfig connection,
+      @JsonProperty("auth") @Nullable AuthConfig auth,
+      @JsonProperty("service") ServiceConfig service,
+      @JsonProperty("leader") @Nullable LeaderElectionConfig leader,
+      @JsonProperty("watch") @Nullable WatchConfig watch
+  )
+  {
+    if (service == null) {
+      throw new IAE("service cannot be null");
+    }
+
+    LeaderElectionConfig finalLeader = computeLeaderElectionConfig(leader, 
service.getHealthCheckInterval());
+    return new ConsulDiscoveryConfig(connection, auth, service, finalLeader, 
watch);
+  }
+
+  private static LeaderElectionConfig computeLeaderElectionConfig(
+      @Nullable LeaderElectionConfig leader,
+      Duration healthCheckInterval
+  )
+  {
+    if (leader != null) {
+      // Compute default TTL based on health check interval when not 
explicitly set
+      if (leader.getLeaderSessionTtl() == null) {
+        return new LeaderElectionConfig(
+            leader.getCoordinatorLeaderLockPath(),
+            leader.getOverlordLeaderLockPath(),
+            null,
+            leader.getLeaderMaxErrorRetries(),
+            leader.getLeaderRetryBackoffMax(),
+            healthCheckInterval
+        );
+      } else {
+        return leader;
+      }
+    } else {
+      return new LeaderElectionConfig(null, null, null, null, null, 
healthCheckInterval);
+    }

Review Comment:
   `leader.getLeaderSessionTtl()` can never be null because 
`LeaderElectionConfig` always computes a non-null TTL in its constructor. As a 
result, when users provide a `leader` block but omit `leaderSessionTtl`, the 
TTL will incorrectly default as if `healthCheckInterval` were null (45s) rather 
than using `max(45s, 3 * healthCheckInterval)`. Fix by moving the “default TTL 
from healthCheckInterval” computation out of `LeaderElectionConfig` (keep the 
raw TTL nullable), or by adding an explicit “ttlWasProvided” flag so 
`ConsulDiscoveryConfig` can detect omission and recompute correctly.



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDruidNodeDiscoveryProvider.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.BaseNodeRoleWatcher;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.DruidNode;
+
+import javax.annotation.Nullable;
+import java.net.SocketTimeoutException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Consul-based implementation of {@link DruidNodeDiscoveryProvider}.
+ *
+ * <p>Each {@link NodeRoleWatcher} performs synchronous (potentially blocking) 
Consul queries on its own single-thread
+ * executor. Listener callbacks are dispatched via a shared single-thread 
executor to preserve callback ordering.
+ *
+ * <p>Consul queries can block up to {@code watchSeconds}; avoid invoking 
lifecycle methods from time-critical threads.
+ */
+@ManageLifecycle
+public class ConsulDruidNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
+{
+  private static final Logger LOGGER = new 
Logger(ConsulDruidNodeDiscoveryProvider.class);
+
+  private final ConsulApiClient consulApiClient;
+  private final ConsulDiscoveryConfig config;
+
+  @Inject(optional = true)
+  @Nullable
+  private ServiceEmitter emitter;
+
+  private ScheduledExecutorService listenerExecutor;
+
+  private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers 
= new ConcurrentHashMap<>();
+
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public ConsulDruidNodeDiscoveryProvider(
+      ConsulApiClient consulApiClient,
+      ConsulDiscoveryConfig config
+  )
+  {
+    this.consulApiClient = Preconditions.checkNotNull(consulApiClient, 
"consulApiClient");
+    this.config = Preconditions.checkNotNull(config, "config");
+  }
+
+  @Override
+  public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
+  {
+    return () -> {
+      try {
+        List<DiscoveryDruidNode> nodes = 
consulApiClient.getHealthyServices(nodeRole);
+        return nodes.stream()
+                    .anyMatch(n -> 
n.getDruidNode().getHostAndPortToUse().equals(node.getHostAndPortToUse()));
+      }
+      catch (Exception e) {
+        LOGGER.error(e, "Error checking for node [%s] with role [%s]", node, 
nodeRole);
+        return false;
+      }
+    };
+  }
+
+  @Override
+  public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
+  {
+    Preconditions.checkState(lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS));
+
+    return nodeRoleWatchers.computeIfAbsent(
+        nodeRole,
+        role -> {
+          LOGGER.info("Creating NodeRoleWatcher for role[%s].", role);
+          NodeRoleWatcher watcher = new NodeRoleWatcher(
+              listenerExecutor,
+              role,
+              consulApiClient,
+              config,
+              emitter
+          );
+          watcher.start();
+          LOGGER.info("Created NodeRoleWatcher for role[%s].", role);
+          return watcher;
+        }
+    );
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    if (!lifecycleLock.canStart()) {
+      throw new ISE("can't start.");
+    }
+
+    try {
+      LOGGER.info("Starting ConsulDruidNodeDiscoveryProvider");
+
+      // Single-threaded executor ensures listener callbacks execute in-order, 
preventing race conditions
+      listenerExecutor = 
Execs.scheduledSingleThreaded("ConsulDruidNodeDiscoveryProvider-ListenerExecutor");
+
+      LOGGER.info("Started ConsulDruidNodeDiscoveryProvider");
+
+      lifecycleLock.started();
+    }
+    finally {
+      lifecycleLock.exitStart();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    if (!lifecycleLock.canStop()) {
+      throw new ISE("can't stop.");
+    }
+
+    LOGGER.info("Stopping ConsulDruidNodeDiscoveryProvider");
+
+    for (NodeRoleWatcher watcher : nodeRoleWatchers.values()) {
+      watcher.stop();
+    }
+    nodeRoleWatchers.clear();
+
+    // Watcher threads must finish before shutting down listener executor to 
avoid RejectedExecutionException
+    try {
+      listenerExecutor.shutdown();
+      if (!listenerExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOGGER.warn("Listener executor did not terminate in time");
+        listenerExecutor.shutdownNow();
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while waiting for listener executor 
termination");
+      listenerExecutor.shutdownNow();
+    }
+
+    LOGGER.info("Stopped ConsulDruidNodeDiscoveryProvider");
+    lifecycleLock.exitStopAndReset();
+  }
+
+  static class NodeRoleWatcher implements DruidNodeDiscovery
+  {
+    private static final Logger LOGGER = new Logger(NodeRoleWatcher.class);
+
+    private final ConsulApiClient consulApiClient;
+    private final ConsulDiscoveryConfig config;
+    @Nullable
+    private final ServiceEmitter emitter;
+
+    private ExecutorService watchExecutor;
+
+    private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+    private final NodeRole nodeRole;
+    private final BaseNodeRoleWatcher baseNodeRoleWatcher;
+
+    private final AtomicLong retryCount = new AtomicLong(0);
+
+    /**
+     * Creates a watcher for a single {@link NodeRole}. Consul calls are 
performed on the watch executor. Listener
+     * callbacks are dispatched via {@code listenerExecutor}.
+     */
+    NodeRoleWatcher(
+        ScheduledExecutorService listenerExecutor,
+        NodeRole nodeRole,
+        ConsulApiClient consulApiClient,
+        ConsulDiscoveryConfig config,
+        @Nullable ServiceEmitter emitter
+    )
+    {
+      this.nodeRole = nodeRole;
+      this.consulApiClient = consulApiClient;
+      this.config = config;
+      this.emitter = emitter;
+      this.baseNodeRoleWatcher = BaseNodeRoleWatcher.create(listenerExecutor, 
nodeRole);
+    }
+
+    private void watch()
+    {
+      boolean cacheInitialized = false;
+      long consulIndex = 0;
+
+      if (!lifecycleLock.awaitStarted()) {
+        LOGGER.error("Lifecycle not started, Exited Watch for role[%s].", 
nodeRole);
+        return;
+      }
+
+      while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
+        try {
+          if (!cacheInitialized) {
+            List<DiscoveryDruidNode> nodes = 
consulApiClient.getHealthyServices(nodeRole);
+            Map<String, DiscoveryDruidNode> nodeMap = new HashMap<>();
+            for (DiscoveryDruidNode node : nodes) {
+              nodeMap.put(node.getDruidNode().getHostAndPortToUse(), node);
+            }
+            baseNodeRoleWatcher.resetNodes(nodeMap);
+            baseNodeRoleWatcher.cacheInitialized();
+            cacheInitialized = true;
+
+            LOGGER.info("Cache initialized for role[%s] with [%d] nodes", 
nodeRole, nodes.size());
+          }
+
+          long watchStart = System.nanoTime();
+
+          long watchSeconds = 
config.getWatch().getWatchSeconds().getStandardSeconds();
+          ConsulApiClient.ConsulWatchResult watchResult = 
consulApiClient.watchServices(
+              nodeRole,
+              consulIndex,
+              watchSeconds
+          );

Review Comment:
   `Duration.getStandardSeconds()` truncates, so `watchSeconds` becomes `0` for 
sub-second durations (e.g., `PT0.5S`), which can cause non-blocking watch loops 
and high CPU/load. Either validate `watch.watchSeconds >= PT1S` in 
`ConsulDiscoveryConfig` or convert using a ceil-like approach (e.g., 
`Math.max(1, (watchMillis + 999) / 1000)`).



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/DefaultConsulApiClient.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Default implementation of {@link ConsulApiClient} using the Ecwid Consul 
client library.
+ */
+public class DefaultConsulApiClient implements ConsulApiClient
+{
+  private static final Logger LOGGER = new 
Logger(DefaultConsulApiClient.class);
+
+  // Consul service metadata has a limit of 512 characters per value
+  // Use a safe limit to avoid edge cases (450 chars leaves room for key name 
overhead)
+  private static final int MAX_METADATA_VALUE_SIZE = 450;
+  private static final long MIN_SESSION_TTL_SECONDS = 30;
+  private static final long MIN_HEALTH_CHECK_INTERVAL_SECONDS = 1;
+
+  private final ConsulClient consulClient;
+  private final ConsulDiscoveryConfig config;
+  private final ObjectMapper jsonMapper;
+
+  public DefaultConsulApiClient(
+      ConsulClient consulClient,
+      ConsulDiscoveryConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this.consulClient = Preconditions.checkNotNull(consulClient, 
"consulClient");
+    this.config = Preconditions.checkNotNull(config, "config");
+    this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+
+    LOGGER.info(
+        "Created DefaultConsulApiClient for [%s:%d] with service prefix [%s]",
+        config.getConnection().getHost(),
+        config.getConnection().getPort(),
+        config.getService().getServicePrefix()
+    );
+  }
+
+  @Override
+  public void registerService(DiscoveryDruidNode node) throws Exception
+  {
+    String serviceId = ConsulServiceIds.serviceId(config, node);
+    String serviceName = ConsulServiceIds.serviceName(config, 
node.getNodeRole());
+
+    NewService service = new NewService();
+    service.setId(serviceId);
+    service.setName(serviceName);
+    service.setAddress(node.getDruidNode().getHost());
+    service.setPort(node.getDruidNode().getPortToUse());
+
+    List<String> tags = new ArrayList<>();
+    tags.add("druid");
+    tags.add("role:" + node.getNodeRole().getJsonName());
+    if (config.getService().getServiceTags() != null) {
+      for (Map.Entry<String, String> e : 
config.getService().getServiceTags().entrySet()) {
+        if (e.getKey() != null && e.getValue() != null) {
+          tags.add(e.getKey() + ":" + e.getValue());
+        }
+      }
+    }
+    service.setTags(tags);
+
+    // Serialize the full DiscoveryDruidNode as metadata
+    String nodeJson = jsonMapper.writeValueAsString(node);
+
+    // Consul service metadata has a 512 character limit per value
+    // If the JSON is too large, store it in Consul KV and reference it from 
metadata
+    Map<String, String> meta = new HashMap<>();
+    if (nodeJson.length() <= MAX_METADATA_VALUE_SIZE) {
+      // Small enough - store directly in metadata
+      meta.put("druid_node", nodeJson);
+    } else {

Review Comment:
   The limit is effectively bytes-on-the-wire, but the check uses 
`nodeJson.length()` (UTF-16 code units). This can undercount for non-ASCII and 
exceed Consul’s constraint unexpectedly. Use 
`nodeJson.getBytes(StandardCharsets.UTF_8).length` for the size check and set 
`MAX_METADATA_VALUE_SIZE` accordingly.



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDruidNodeAnnouncer.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Announces Druid nodes to Consul and maintains their health status via TTL 
checks.
+ */
+@ManageLifecycle
+public class ConsulDruidNodeAnnouncer implements DruidNodeAnnouncer
+{
+  private static final Logger LOGGER = new 
Logger(ConsulDruidNodeAnnouncer.class);
+
+  private final ConsulApiClient consulApiClient;
+  private final ConsulDiscoveryConfig config;
+  private final ConcurrentMap<String, DiscoveryDruidNode> announcedNodes = new 
ConcurrentHashMap<>();
+  private final Set<String> registeringNodes = ConcurrentHashMap.newKeySet();
+  private final ScheduledExecutorService healthCheckExecutor;
+  
+  private static final int MAX_FAILURES_BEFORE_REREGISTER = 3;
+  private static final long EXECUTOR_TERMINATION_TIMEOUT_SECONDS = 10;
+  private final ConcurrentMap<String, AtomicInteger> consecutiveFailures = new 
ConcurrentHashMap<>();
+
+  @Inject(optional = true)
+  @Nullable
+  private ServiceEmitter emitter;
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public ConsulDruidNodeAnnouncer(
+      ConsulApiClient consulApiClient,
+      ConsulDiscoveryConfig config
+  )
+  {
+    this.consulApiClient = Preconditions.checkNotNull(consulApiClient, 
"consulApiClient");
+    this.config = Preconditions.checkNotNull(config, "config");
+    this.healthCheckExecutor = 
Execs.scheduledSingleThreaded("ConsulHealthCheck-%d");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    if (!lifecycleLock.canStart()) {
+      throw new ISE("can't start");
+    }
+
+    try {
+      LOGGER.info("Starting ConsulDruidNodeAnnouncer");
+
+      long intervalMs = 
config.getService().getHealthCheckInterval().getMillis();
+      healthCheckExecutor.scheduleAtFixedRate(
+          this::updateHealthChecks,
+          0L,
+          intervalMs,
+          TimeUnit.MILLISECONDS
+      );
+      lifecycleLock.started();
+    }
+    finally {
+      lifecycleLock.exitStart();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    if (!lifecycleLock.canStop()) {
+      throw new ISE("can't stop");
+    }
+
+    LOGGER.info("Stopping ConsulDruidNodeAnnouncer");
+
+    healthCheckExecutor.shutdownNow();
+
+    // Wait for health check to finish so we don't deregister while health 
check is in progress
+    try {
+      if 
(!healthCheckExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
+        LOGGER.warn("Health check executor did not terminate in time");
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while waiting for health check termination");
+    }
+
+    for (String serviceId : announcedNodes.keySet()) {
+      try {
+        consulApiClient.deregisterService(serviceId);
+      }
+      catch (Exception e) {
+        LOGGER.error(e, "Failed to deregister service [%s] during shutdown", 
serviceId);
+      }
+    }
+
+    announcedNodes.clear();
+    lifecycleLock.exitStop();
+  }
+
+  @Override
+  public void announce(DiscoveryDruidNode discoveryDruidNode)
+  {
+    if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
+      throw new ISE("Announcer not started");
+    }
+
+    String serviceId = ConsulServiceIds.serviceId(config, discoveryDruidNode);
+
+    // Prevent concurrent duplicate registrations for the same serviceId
+    if (!registeringNodes.add(serviceId)) {
+      LOGGER.warn("Registration already in progress for serviceId [%s]", 
serviceId);
+      return;
+    }
+
+    try {
+      // If already announced, skip duplicate registration
+      if (announcedNodes.containsKey(serviceId)) {
+        LOGGER.warn("ServiceId [%s] already announced, skipping", serviceId);
+        return;
+      }
+
+      long registerStart = System.nanoTime();
+
+      // Register in Consul, then track locally atomically in this block
+      consulApiClient.registerService(discoveryDruidNode);
+      announcedNodes.put(serviceId, discoveryDruidNode);
+
+      long registerLatency = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
registerStart);
+      ConsulMetrics.emitTimer(emitter, "consul/register/latency", 
registerLatency,
+          "role", discoveryDruidNode.getNodeRole().getJsonName());
+
+      LOGGER.info("Successfully announced serviceId [%s]", serviceId);
+      ConsulMetrics.emitCount(
+          emitter,
+          "consul/announce/success",
+          "role",
+          discoveryDruidNode.getNodeRole().getJsonName()
+      );
+    }
+    catch (Exception e) {
+      // Cleanup partial registration if Consul was updated before failure
+      try {
+        consulApiClient.deregisterService(serviceId);
+      }
+      catch (Exception cleanup) {
+        LOGGER.debug(cleanup, "Cleanup deregister failed for serviceId [%s] 
after announce error", serviceId);
+      }
+
+      LOGGER.error(e, "Exception during announce for DiscoveryDruidNode[%s]", 
discoveryDruidNode);
+      ConsulMetrics.emitCount(
+          emitter,
+          "consul/announce/failure",
+          "role",
+          discoveryDruidNode.getNodeRole().getJsonName()
+      );
+      throw new RuntimeException("Failed to announce serviceId [" + serviceId 
+ "]", e);
+    }
+    finally {
+      registeringNodes.remove(serviceId);
+    }
+  }
+
+  @Override
+  public void unannounce(DiscoveryDruidNode discoveryDruidNode)
+  {
+    if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
+      throw new ISE("Announcer not started");
+    }
+
+    LOGGER.info("Unannouncing DiscoveryDruidNode[%s]", discoveryDruidNode);
+
+    try {
+      String serviceId = ConsulServiceIds.serviceId(config, 
discoveryDruidNode);
+      consulApiClient.deregisterService(serviceId);
+      announcedNodes.remove(serviceId);
+
+      LOGGER.info("Successfully unannounced DiscoveryDruidNode[%s]", 
discoveryDruidNode);
+      ConsulMetrics.emitCount(emitter, "consul/unannounce/success",
+          "role", discoveryDruidNode.getNodeRole().getJsonName());
+    }
+    catch (Exception e) {
+      // Unannouncement happens during shutdown, don't throw
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+
+      LOGGER.error(e, "Failed to unannounce DiscoveryDruidNode[%s]", 
discoveryDruidNode);
+      ConsulMetrics.emitCount(emitter, "consul/unannounce/failure",
+          "role", discoveryDruidNode.getNodeRole().getJsonName());
+    }
+  }
+
+  private void updateHealthChecks()
+  {
+    int nodeCount = announcedNodes.size();
+    if (nodeCount == 0) {
+      return; // Silent when nothing to do
+    }
+
+    LOGGER.debug("Updating health checks for %d nodes", nodeCount);
+
+    int successCount = 0;
+    int failureCount = 0;
+
+    for (Map.Entry<String, DiscoveryDruidNode> entry : 
announcedNodes.entrySet()) {
+      String serviceId = entry.getKey();
+      try {
+        long healthCheckStart = System.nanoTime();
+
+        consulApiClient.passTtlCheck(serviceId, "Druid node is healthy");
+
+        long healthCheckLatency = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - healthCheckStart);
+        ConsulMetrics.emitTimer(emitter, "consul/healthcheck/latency", 
healthCheckLatency,
+            "serviceId", serviceId);
+
+        successCount++;
+
+        consecutiveFailures.remove(serviceId);
+      }
+      catch (Exception e) {
+        failureCount++;
+
+        int failures = consecutiveFailures
+            .computeIfAbsent(serviceId, k -> new AtomicInteger(0))
+            .incrementAndGet();
+
+        // Keep WARN for failures - these matter
+        LOGGER.warn(e, "Health check failed [%d/%d] for [%s]",
+                    failures, MAX_FAILURES_BEFORE_REREGISTER, serviceId);
+        ConsulMetrics.emitCount(emitter, "consul/healthcheck/failure",
+            "serviceId", serviceId, "consecutiveFailures", 
String.valueOf(failures));
+
+        if (failures >= MAX_FAILURES_BEFORE_REREGISTER) {
+          // Keep WARN for recovery actions - these are important state changes
+          LOGGER.warn("Re-registering [%s] after %d failures", serviceId, 
failures);
+          try {
+            DiscoveryDruidNode node = announcedNodes.get(serviceId);
+            consulApiClient.registerService(node);
+            consulApiClient.passTtlCheck(serviceId, "Re-registered");
+            consecutiveFailures.remove(serviceId);
+            ConsulMetrics.emitCount(emitter, "consul/healthcheck/reregister",
+                "serviceId", serviceId, "totalFailures", 
String.valueOf(failures));
+            LOGGER.info("Successfully re-registered [%s]", serviceId);

Review Comment:
   `announcedNodes.get(serviceId)` can be null if the node is concurrently 
unannounced or during shutdown, which will lead to a null being passed into 
`registerService` (likely NPE). Add a null check and skip re-registration when 
the node is no longer announced (and consider clearing `consecutiveFailures` 
for that serviceId to avoid repeated re-register attempts).
   ```suggestion
               if (node == null) {
                 // Node may have been concurrently unannounced or removed 
during shutdown.
                 // Skip re-registration and clear failure tracking for this 
service.
                 LOGGER.info("Skipping re-registration for [%s] because it is 
no longer announced", serviceId);
                 consecutiveFailures.remove(serviceId);
               } else {
                 consulApiClient.registerService(node);
                 consulApiClient.passTtlCheck(serviceId, "Re-registered");
                 consecutiveFailures.remove(serviceId);
                 ConsulMetrics.emitCount(emitter, 
"consul/healthcheck/reregister",
                     "serviceId", serviceId, "totalFailures", 
String.valueOf(failures));
                 LOGGER.info("Successfully re-registered [%s]", serviceId);
               }
   ```



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDiscoveryConfig.java:
##########
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration for Consul-based service discovery.
+ */
+public class ConsulDiscoveryConfig
+{
+  private static final Logger LOGGER = new Logger(ConsulDiscoveryConfig.class);
+  private static final long MIN_LEADER_SESSION_TTL_SECONDS = 10;
+
+  @JsonProperty("connection")
+  private final ConnectionConfig connection;
+
+  @JsonProperty("auth")
+  private final AuthConfig auth;
+
+  @JsonProperty("service")
+  private final ServiceConfig service;
+
+  @JsonProperty("leader")
+  private final LeaderElectionConfig leader;
+
+  @JsonProperty("watch")
+  private final WatchConfig watch;
+
+  @JsonCreator
+  public static ConsulDiscoveryConfig create(
+      @JsonProperty("connection") @Nullable ConnectionConfig connection,
+      @JsonProperty("auth") @Nullable AuthConfig auth,
+      @JsonProperty("service") ServiceConfig service,
+      @JsonProperty("leader") @Nullable LeaderElectionConfig leader,
+      @JsonProperty("watch") @Nullable WatchConfig watch
+  )
+  {
+    if (service == null) {
+      throw new IAE("service cannot be null");
+    }
+
+    LeaderElectionConfig finalLeader = computeLeaderElectionConfig(leader, 
service.getHealthCheckInterval());
+    return new ConsulDiscoveryConfig(connection, auth, service, finalLeader, 
watch);
+  }
+
+  private static LeaderElectionConfig computeLeaderElectionConfig(
+      @Nullable LeaderElectionConfig leader,
+      Duration healthCheckInterval
+  )
+  {
+    if (leader != null) {
+      // Compute default TTL based on health check interval when not 
explicitly set
+      if (leader.getLeaderSessionTtl() == null) {
+        return new LeaderElectionConfig(
+            leader.getCoordinatorLeaderLockPath(),
+            leader.getOverlordLeaderLockPath(),
+            null,
+            leader.getLeaderMaxErrorRetries(),
+            leader.getLeaderRetryBackoffMax(),
+            healthCheckInterval
+        );
+      } else {
+        return leader;
+      }
+    } else {
+      return new LeaderElectionConfig(null, null, null, null, null, 
healthCheckInterval);
+    }
+  }
+
+  private ConsulDiscoveryConfig(
+      ConnectionConfig connection,
+      AuthConfig auth,
+      ServiceConfig service,
+      LeaderElectionConfig leader,
+      WatchConfig watch
+  )
+  {
+    this.connection = connection == null ? new ConnectionConfig(null, null, 
null, null, null, null, null) : connection;
+    this.auth = auth == null ? new AuthConfig(null, null, null, null) : auth;
+    this.service = service;
+    this.leader = leader;
+    this.watch = watch == null ? new WatchConfig(null, null, null, null) : 
watch;
+
+    validateCrossFieldConstraints();
+  }
+
+  private void validateCrossFieldConstraints()
+  {
+    // Socket timeout must exceed watch timeout to avoid premature disconnects
+    if (connection.getSocketTimeout().compareTo(watch.getWatchSeconds()) <= 0) 
{
+      throw new IAE(
+          StringUtils.format(
+              "socketTimeout [%s] must be greater than watchSeconds [%s]",
+              connection.getSocketTimeout(),
+              watch.getWatchSeconds()
+          )
+      );
+    }
+
+    long serviceTtlSeconds = Math.max(30, 
service.getHealthCheckInterval().getStandardSeconds() * 3);
+    if (service.getDeregisterAfter().getStandardSeconds() < serviceTtlSeconds) 
{
+      throw new IAE(
+          StringUtils.format(
+              "deregisterAfter (%ds) must be >= service TTL (%ds = 3 × 
healthCheckInterval)",
+              service.getDeregisterAfter().getStandardSeconds(),
+              serviceTtlSeconds
+          )
+      );
+    }
+
+    // Large watchSeconds relative to session TTL can delay failure detection
+    if (watch.getWatchSeconds().getStandardSeconds() > 
leader.getLeaderSessionTtl().getStandardSeconds() * 2) {
+      LOGGER.warn(
+          "watchSeconds (%ds) is much larger than leaderSessionTtl (%ds): 
delayed failure detection possible",
+          watch.getWatchSeconds().getStandardSeconds(),
+          leader.getLeaderSessionTtl().getStandardSeconds()
+      );
+    }
+  }
+
+  public ConnectionConfig getConnection()
+  {
+    return connection;
+  }
+
+  public AuthConfig getAuth()
+  {
+    return auth;
+  }
+
+  public ServiceConfig getService()
+  {
+    return service;
+  }
+
+  public LeaderElectionConfig getLeader()
+  {
+    return leader;
+  }
+
+  public WatchConfig getWatch()
+  {
+    return watch;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConsulDiscoveryConfig that = (ConsulDiscoveryConfig) o;
+    return Objects.equals(connection, that.connection) &&
+           Objects.equals(auth, that.auth) &&
+           Objects.equals(service, that.service) &&
+           Objects.equals(leader, that.leader) &&
+           Objects.equals(watch, that.watch);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(connection, auth, service, leader, watch);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ConsulDiscoveryConfig{" +
+           "connection=" + connection +
+           ", auth=" + auth +
+           ", service=" + service +
+           ", leader=" + leader +
+           ", watch=" + watch +
+           '}';
+  }
+
+  public static class ConnectionConfig
+  {
+    private static final long DEFAULT_CONNECT_TIMEOUT_MS = 10_000;
+    private static final long DEFAULT_SOCKET_TIMEOUT_MS = 75_000;
+    private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 50;
+    private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 20;
+
+    private final String host;
+    private final int port;
+    private final Duration connectTimeout;
+    private final Duration socketTimeout;
+    @Nullable
+    private final ConsulSSLConfig sslClientConfig;
+    private final int maxTotalConnections;
+    private final int maxConnectionsPerRoute;
+
+    @JsonCreator
+    public ConnectionConfig(
+        @JsonProperty("host") @Nullable String host,
+        @JsonProperty("port") @Nullable Integer port,
+        @JsonProperty("connectTimeout") @Nullable Duration connectTimeout,
+        @JsonProperty("socketTimeout") @Nullable Duration socketTimeout,
+        @JsonProperty("sslClientConfig") @Nullable ConsulSSLConfig 
sslClientConfig,
+        @JsonProperty("maxTotalConnections") @Nullable Integer 
maxTotalConnections,
+        @JsonProperty("maxConnectionsPerRoute") @Nullable Integer 
maxConnectionsPerRoute
+    )
+    {
+      this.host = host == null ? "localhost" : host;
+      this.port = validatePort(port);
+      this.connectTimeout = validatePositive(connectTimeout, 
DEFAULT_CONNECT_TIMEOUT_MS, "connectTimeout");
+      this.socketTimeout = validatePositive(socketTimeout, 
DEFAULT_SOCKET_TIMEOUT_MS, "socketTimeout");
+      this.sslClientConfig = sslClientConfig;
+      this.maxTotalConnections = validateConnectionPoolSize(
+          maxTotalConnections,
+          DEFAULT_MAX_TOTAL_CONNECTIONS,
+          "maxTotalConnections"
+      );
+      this.maxConnectionsPerRoute = validateConnectionPoolSize(
+          maxConnectionsPerRoute,
+          DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
+          "maxConnectionsPerRoute"
+      );
+    }
+
+    private static int validatePort(Integer port)
+    {
+      int portValue = port == null ? 8500 : port;
+      if (portValue < 1 || portValue > 65535) {
+        throw new IllegalArgumentException("port must be between 1 and 65535");
+      }
+      return portValue;
+    }
+
+    private static int validateConnectionPoolSize(Integer value, int 
defaultValue, String name)
+    {
+      int result = value == null ? defaultValue : value;
+      if (result <= 0) {
+        throw new IAE(name + " must be positive");
+      }
+      return result;
+    }
+
+    @JsonProperty
+    public String getHost()
+    {
+      return host;
+    }
+
+    @JsonProperty
+    public int getPort()
+    {
+      return port;
+    }
+
+    @JsonProperty
+    public Duration getConnectTimeout()
+    {
+      return connectTimeout;
+    }
+
+    @JsonProperty
+    public Duration getSocketTimeout()
+    {
+      return socketTimeout;
+    }
+
+    @JsonProperty
+    @Nullable
+    public ConsulSSLConfig getSslClientConfig()
+    {
+      return sslClientConfig;
+    }
+
+    @JsonProperty
+    public int getMaxTotalConnections()
+    {
+      return maxTotalConnections;
+    }
+
+    @JsonProperty
+    public int getMaxConnectionsPerRoute()
+    {
+      return maxConnectionsPerRoute;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ConnectionConfig that = (ConnectionConfig) o;
+      return port == that.getPort() &&
+             maxTotalConnections == that.getMaxTotalConnections() &&
+             maxConnectionsPerRoute == that.getMaxConnectionsPerRoute() &&
+             Objects.equals(host, that.getHost()) &&
+             Objects.equals(connectTimeout, that.getConnectTimeout()) &&
+             Objects.equals(socketTimeout, that.getSocketTimeout()) &&
+             Objects.equals(sslClientConfig, that.getSslClientConfig());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(host, port, connectTimeout, socketTimeout, 
sslClientConfig, maxTotalConnections, maxConnectionsPerRoute);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ConnectionConfig{host='" + host + "', port=" + port +
+             ", connectTimeout=" + connectTimeout + ", socketTimeout=" + 
socketTimeout +
+             ", maxTotalConnections=" + maxTotalConnections + ", 
maxConnectionsPerRoute=" + maxConnectionsPerRoute + '}';
+    }
+  }
+
+  public static class AuthConfig
+  {
+    @Nullable
+    private final String aclToken;
+    @Nullable
+    private final String basicAuthUser;
+    @Nullable
+    private final String basicAuthPassword;
+    private final boolean allowBasicAuthOverHttp;
+
+    @JsonCreator
+    public AuthConfig(
+        @JsonProperty("aclToken") @Nullable String aclToken,
+        @JsonProperty("basicAuthUser") @Nullable String basicAuthUser,
+        @JsonProperty("basicAuthPassword") @Nullable String basicAuthPassword,
+        @JsonProperty("allowBasicAuthOverHttp") @Nullable Boolean 
allowBasicAuthOverHttp
+    )
+    {
+      this.aclToken = aclToken;
+      this.basicAuthUser = basicAuthUser;
+      this.basicAuthPassword = basicAuthPassword;
+      this.allowBasicAuthOverHttp = allowBasicAuthOverHttp != null ? 
allowBasicAuthOverHttp : false;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getAclToken()
+    {
+      return aclToken;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getBasicAuthUser()
+    {
+      return basicAuthUser;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getBasicAuthPassword()
+    {
+      return basicAuthPassword;
+    }
+
+    @JsonProperty
+    public boolean getAllowBasicAuthOverHttp()
+    {
+      return allowBasicAuthOverHttp;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      AuthConfig that = (AuthConfig) o;
+      return allowBasicAuthOverHttp == that.getAllowBasicAuthOverHttp() &&
+             Objects.equals(aclToken, that.getAclToken()) &&
+             Objects.equals(basicAuthUser, that.getBasicAuthUser()) &&
+             Objects.equals(basicAuthPassword, that.getBasicAuthPassword());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(aclToken, basicAuthUser, basicAuthPassword, 
allowBasicAuthOverHttp);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "AuthConfig{aclToken=" + mask(aclToken) +
+             ", basicAuthUser=" + mask(basicAuthUser) +
+             ", basicAuthPassword=" + mask(basicAuthPassword) +
+             ", allowBasicAuthOverHttp=" + allowBasicAuthOverHttp + '}';
+    }
+
+    private static String mask(String value)
+    {
+      if (value == null) {
+        return String.valueOf(value);
+      }
+      return "*****";
+    }
+  }
+
+  public static class ServiceConfig
+  {
+    private static final long DEFAULT_HEALTH_CHECK_INTERVAL_MS = 10_000;
+    private static final long DEFAULT_DEREGISTER_AFTER_MS = 90_000;
+
+    private final String servicePrefix;
+    @Nullable
+    private final String datacenter;
+    @Nullable
+    private final Map<String, String> serviceTags;
+    private final Duration healthCheckInterval;
+    private final Duration deregisterAfter;
+
+    @JsonCreator
+    public ServiceConfig(
+        @JsonProperty("servicePrefix") String servicePrefix,
+        @JsonProperty("datacenter") @Nullable String datacenter,
+        @JsonProperty("serviceTags") @Nullable Map<String, String> serviceTags,
+        @JsonProperty("healthCheckInterval") @Nullable Duration 
healthCheckInterval,
+        @JsonProperty("deregisterAfter") @Nullable Duration deregisterAfter
+    )
+    {
+      if (servicePrefix == null || servicePrefix.isEmpty()) {
+        throw new IAE("servicePrefix cannot be null or empty");
+      }
+      this.servicePrefix = servicePrefix;
+      this.datacenter = datacenter;
+      this.serviceTags = serviceTags == null
+                         ? null
+                         : Collections.unmodifiableMap(new 
LinkedHashMap<>(serviceTags));
+      this.healthCheckInterval = validatePositive(healthCheckInterval, 
DEFAULT_HEALTH_CHECK_INTERVAL_MS, "healthCheckInterval");
+      this.deregisterAfter = validateNonNegative(deregisterAfter, 
DEFAULT_DEREGISTER_AFTER_MS, "deregisterAfter");
+    }
+
+    @JsonProperty
+    public String getServicePrefix()
+    {
+      return servicePrefix;
+    }
+
+    @JsonProperty
+    @Nullable
+    public String getDatacenter()
+    {
+      return datacenter;
+    }
+
+    @JsonProperty
+    @Nullable
+    public Map<String, String> getServiceTags()
+    {
+      return serviceTags == null ? null : 
Collections.unmodifiableMap(serviceTags);
+    }
+
+    @JsonProperty
+    public Duration getHealthCheckInterval()
+    {
+      return healthCheckInterval;
+    }
+
+    @JsonProperty
+    public Duration getDeregisterAfter()
+    {
+      return deregisterAfter;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ServiceConfig that = (ServiceConfig) o;
+      return Objects.equals(servicePrefix, that.getServicePrefix()) &&
+             Objects.equals(datacenter, that.getDatacenter()) &&
+             Objects.equals(serviceTags, that.getServiceTags()) &&
+             Objects.equals(healthCheckInterval, 
that.getHealthCheckInterval()) &&
+             Objects.equals(deregisterAfter, that.getDeregisterAfter());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(servicePrefix, datacenter, serviceTags, 
healthCheckInterval, deregisterAfter);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "ServiceConfig{servicePrefix='" + servicePrefix + "', 
datacenter='" + datacenter +
+             "', healthCheckInterval=" + healthCheckInterval + ", 
deregisterAfter=" + deregisterAfter + '}';
+    }
+  }
+
+  public static class LeaderElectionConfig
+  {
+    private static final long DEFAULT_LEADER_RETRY_BACKOFF_MAX_MS = 300_000;
+    private static final long DEFAULT_LEADER_MAX_ERROR_RETRIES = 20;
+
+    private final String coordinatorLeaderLockPath;
+    private final String overlordLeaderLockPath;
+    private final Duration leaderSessionTtl;
+    private final long leaderMaxErrorRetries;
+    private final Duration leaderRetryBackoffMax;
+
+    @JsonCreator
+    public LeaderElectionConfig(
+        @JsonProperty("coordinatorLeaderLockPath") @Nullable String 
coordinatorLeaderLockPath,
+        @JsonProperty("overlordLeaderLockPath") @Nullable String 
overlordLeaderLockPath,
+        @JsonProperty("leaderSessionTtl") @Nullable Duration leaderSessionTtl,
+        @JsonProperty("leaderMaxErrorRetries") @Nullable Long 
leaderMaxErrorRetries,
+        @JsonProperty("leaderRetryBackoffMax") @Nullable Duration 
leaderRetryBackoffMax,
+        @JsonProperty("healthCheckInterval") @Nullable Duration 
healthCheckInterval
+    )
+    {
+      this.coordinatorLeaderLockPath = coordinatorLeaderLockPath != null
+          ? coordinatorLeaderLockPath
+          : "druid/leader/coordinator";
+      this.overlordLeaderLockPath = overlordLeaderLockPath != null
+          ? overlordLeaderLockPath
+          : "druid/leader/overlord";
+      this.leaderSessionTtl = computeLeaderSessionTtl(leaderSessionTtl, 
healthCheckInterval);
+      this.leaderMaxErrorRetries = (leaderMaxErrorRetries == null || 
leaderMaxErrorRetries <= 0)
+          ? DEFAULT_LEADER_MAX_ERROR_RETRIES
+          : leaderMaxErrorRetries;

Review Comment:
   The docs for this PR state `leaderMaxErrorRetries` supports `-1` for 
unlimited retries, but the implementation treats `<= 0` as “use default (20)”. 
This makes `-1` behave differently than documented. Align behavior with 
`WatchConfig` by mapping `leaderMaxErrorRetries <= 0` to `Long.MAX_VALUE` (or 
explicitly treat `-1` as unlimited and `0` as default, if that’s preferred—but 
it should match docs consistently).
   ```suggestion
         if (leaderMaxErrorRetries == null || leaderMaxErrorRetries == 0) {
           this.leaderMaxErrorRetries = DEFAULT_LEADER_MAX_ERROR_RETRIES;
         } else if (leaderMaxErrorRetries < 0) {
           // Treat -1 as unlimited retries, aligning with documentation and 
WatchConfig behavior.
           this.leaderMaxErrorRetries = Long.MAX_VALUE;
         } else {
           this.leaderMaxErrorRetries = leaderMaxErrorRetries;
         }
   ```



##########
extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulLeaderSelector.java:
##########
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.consul.discovery;
+
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.kv.model.GetValue;
+import com.ecwid.consul.v1.kv.model.PutParams;
+import com.ecwid.consul.v1.session.model.NewSession;
+import com.ecwid.consul.v1.session.model.Session;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.DruidNode;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Consul-based implementation of {@link DruidLeaderSelector} using Consul 
sessions and KV locks.
+ *
+ * <p>{@link #registerListener(Listener)} starts background executors. Leader 
election runs on one single-thread
+ * executor and invokes {@link Listener} callbacks; session renewal runs on 
another single-thread executor.
+ * {@link #unregisterListener()} may invoke {@link Listener#stopBeingLeader()} 
on the calling thread.
+ *
+ * <p>Consul RPCs and retry backoffs are performed on the background threads; 
avoid invoking lifecycle methods from
+ * time-sensitive threads.
+ */
+public class ConsulLeaderSelector implements DruidLeaderSelector
+{
+  private static final Logger LOGGER = new Logger(ConsulLeaderSelector.class);
+
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+  private final DruidNode self;
+  private final String lockKey;
+  private final ConsulDiscoveryConfig config;
+  private final ConsulClient consulClient;
+  @Inject(optional = true)
+  @Nullable
+  private ServiceEmitter emitter = null;
+
+  private volatile DruidLeaderSelector.Listener listener = null;
+  private final AtomicBoolean leader = new AtomicBoolean(false);
+  private final AtomicInteger term = new AtomicInteger(0);
+
+  private ScheduledExecutorService executorService;
+  private ScheduledExecutorService sessionKeeperService;
+  private volatile String sessionId;
+  private volatile boolean stopping = false;
+  private long errorRetryCount = 0;
+
+  public ConsulLeaderSelector(
+      DruidNode self,
+      String lockKey,
+      ConsulDiscoveryConfig config,
+      ConsulClient consulClient
+  )
+  {
+    this.self = Preconditions.checkNotNull(self, "self");
+    this.lockKey = Preconditions.checkNotNull(lockKey, "lockKey");
+    this.config = Preconditions.checkNotNull(config, "config");
+    this.consulClient = Preconditions.checkNotNull(consulClient, 
"consulClient");
+
+    if (config.getLeader().getLeaderSessionTtl().getStandardSeconds() > 120) {
+      LOGGER.warn("leaderSessionTtl is %s; leader failover may take up to %s",
+                  config.getLeader().getLeaderSessionTtl(),
+                  
Duration.standardSeconds(config.getLeader().getLeaderSessionTtl().getStandardSeconds()
 * 2));
+    }
+  }
+
+  @Nullable
+  @Override
+  public String getCurrentLeader()
+  {
+    try {
+      Response<GetValue> response = consulClient.getKVValue(
+          lockKey,
+          config.getAuth().getAclToken(),
+          buildQueryParams()
+      );
+      if (response != null && response.getValue() != null && 
response.getValue().getValue() != null) {
+        return new 
String(Base64.getDecoder().decode(response.getValue().getValue()), 
StandardCharsets.UTF_8);
+      }
+      return null;
+    }
+    catch (Exception e) {
+      LOGGER.error(e, "Failed to get current leader from Consul");
+      return null;
+    }
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    return leader.get();
+  }
+
+  @Override
+  public int localTerm()
+  {
+    return term.get();
+  }
+
+  @Override
+  public void registerListener(Listener listener)
+  {
+    Preconditions.checkArgument(listener != null, "listener is null");
+
+    if (!lifecycleLock.canStart()) {
+      throw new ISE("can't start");
+    }
+
+    try {
+      this.listener = listener;
+      this.executorService = 
Execs.scheduledSingleThreaded("ConsulLeaderSelector-%d");
+      this.sessionKeeperService = 
Execs.scheduledSingleThreaded("ConsulSessionKeeper-%d");
+
+      startLeaderElection();
+
+      lifecycleLock.started();
+    }
+    catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    finally {
+      lifecycleLock.exitStart();
+    }
+  }
+
+  @Override
+  public void unregisterListener()
+  {
+    if (!lifecycleLock.canStop()) {
+      throw new ISE("can't stop");
+    }
+
+    LOGGER.info("Unregistering leader selector for [%s]", lockKey);
+    stopping = true;
+
+    try {
+      if (leader.get()) {
+        try {
+          listener.stopBeingLeader();
+        }
+        catch (Exception e) {
+          LOGGER.error(e, "Exception while stopping being leader");
+        }
+        leader.set(false);
+      }
+
+      // Destroying session releases the Consul lock, allowing another node to 
become leader
+      if (sessionId != null) {
+        try {
+          consulClient.sessionDestroy(sessionId, buildQueryParams(), 
config.getAuth().getAclToken());
+        }
+        catch (Exception e) {
+          LOGGER.error(e, "Failed to destroy Consul session");
+        }
+        sessionId = null;
+      }
+
+      if (executorService != null) {
+        executorService.shutdownNow();
+        try {
+          if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+            LOGGER.warn("Leader selector executor did not terminate in time");
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      if (sessionKeeperService != null) {
+        sessionKeeperService.shutdownNow();
+        try {
+          if (!sessionKeeperService.awaitTermination(5, TimeUnit.SECONDS)) {
+            LOGGER.warn("Session keeper service did not terminate in time");
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    finally {
+      lifecycleLock.exitStop();
+    }
+  }
+
+  private void startLeaderElection()
+  {
+    executorService.submit(this::leaderElectionLoop);
+    sessionKeeperService.submit(this::sessionKeeperLoop);
+  }
+
+  private void leaderElectionLoop()
+  {
+    LOGGER.info("Starting leader election loop for [%s]", lockKey);
+    ConsulMetrics.emitCount(emitter, "consul/leader/loop", "lock", lockKey, 
"state", "start");
+
+    while (!stopping && !Thread.currentThread().isInterrupted()) {
+      try {
+        if (sessionId == null) {
+          sessionId = createSession();
+        }
+
+        if (sessionId != null && !leader.get()) {
+          if (!isSessionValid(sessionId)) {
+            LOGGER.info("Follower session [%s] expired or invalid, 
recreating", shortSessionId(sessionId));
+            sessionId = null;
+            continue;
+          }
+        }
+

Review Comment:
   If `createSession()` fails and returns null (or returns a response without a 
value), `tryAcquireLock(sessionId)` is invoked with `sessionId == null`, which 
then calls `putParams.setAcquireSession(sessionId)` and can lead to avoidable 
exceptions/log spam and a tight retry loop. Add a null guard before attempting 
to acquire the lock (e.g., if sessionId is null then sleep/backoff and retry 
session creation) and/or make `createSession()` throw when it can’t create a 
session.
   ```suggestion
   
           if (sessionId == null) {
             LOGGER.warn(
                 "Session is null for [%s]; backing off before retrying session 
creation",
                 lockKey
             );
             
Thread.sleep(config.getService().getHealthCheckInterval().getMillis());
             continue;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to