This is an automated email from the ASF dual-hosted git repository.

li4wang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aaffd5ee ZOOKEEPER-4956: Provide a HostProvider that uses DNS SRV 
record for dynamic server discovery (#2320)
6aaffd5ee is described below

commit 6aaffd5ee007e8b53772c02111207cdc0cc17235
Author: li4wang <[email protected]>
AuthorDate: Tue Jan 20 16:10:39 2026 -0800

    ZOOKEEPER-4956: Provide a HostProvider that uses DNS SRV record for dynamic 
server discovery (#2320)
    
    Author: Li Wang <[email protected]>
    
    Co-authored-by: liwang <[email protected]>
---
 pom.xml                                            |   6 +
 .../resources/markdown/zookeeperProgrammers.md     |   5 +
 zookeeper-server/pom.xml                           |   4 +
 .../main/java/org/apache/zookeeper/ZooKeeper.java  |   8 +-
 .../zookeeper/client/ConnectStringParser.java      |  94 ++++--
 .../zookeeper/client/DnsSrvHostProvider.java       | 329 ++++++++++++++++++++
 ...ostProvider.java => HostConnectionManager.java} | 341 +++++++++++----------
 .../org/apache/zookeeper/client/HostProvider.java  |  10 +-
 .../zookeeper/client/HostProviderFactory.java      |  66 ++++
 .../zookeeper/client/StaticHostProvider.java       | 329 ++------------------
 .../apache/zookeeper/client/ZKClientConfig.java    |   8 +
 .../zookeeper/HostProviderSelectionTest.java       |  95 ++++++
 .../zookeeper/client/DnsSrvHostProviderTest.java   | 214 +++++++++++++
 .../zookeeper/test/ConnectStringParserTest.java    |  90 ++++++
 14 files changed, 1109 insertions(+), 490 deletions(-)

diff --git a/pom.xml b/pom.xml
index 52f023482..552659bc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -573,6 +573,7 @@
     <enforcer.version>3.0.0-M3</enforcer.version>
     <commons-io.version>2.17.0</commons-io.version>
     <burningwave.mockdns.version>0.25.4</burningwave.mockdns.version>
+    <dnsjava.version>3.5.1</dnsjava.version>
     <clover-maven-plugin.version>4.4.1</clover-maven-plugin.version>
     <sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
 
@@ -774,6 +775,11 @@
         <artifactId>tools</artifactId>
         <version>${burningwave.mockdns.version}</version>
       </dependency>
+      <dependency>
+        <groupId>dnsjava</groupId>
+        <artifactId>dnsjava</artifactId>
+        <version>${dnsjava.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
index 59008215c..a9b7fd711 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
@@ -1396,6 +1396,11 @@ and [SASL authentication for 
ZooKeeper](https://cwiki.apache.org/confluence/disp
   you want to randomize that.
   Default: false
 
+* *zookeeper.hostProvider.dnsSrvRefreshIntervalSeconds* :
+  **New in 3.10.0:**
+  Specifies the refresh interval in seconds for DNS SRV record lookups when 
using DnsSrvHostProvider.
+  A value of 0 disables periodic refresh.
+  Default: 60 seconds
 
 <a name="C+Binding"></a>
 
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index b2ecf1902..f4b76095d 100644
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -190,6 +190,10 @@
       <artifactId>commons-io</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>dnsjava</groupId>
+      <artifactId>dnsjava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 7533e01a9..32893d46a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -46,6 +46,7 @@
 import org.apache.zookeeper.client.Chroot;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.HostProviderFactory;
 import org.apache.zookeeper.client.StaticHostProvider;
 import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.client.ZooKeeperBuilder;
@@ -1142,10 +1143,9 @@ public ZooKeeper(ZooKeeperOptions options) throws 
IOException {
         if (options.getHostProvider() != null) {
             hostProvider = 
options.getHostProvider().apply(connectStringParser.getServerAddresses());
         } else {
-            hostProvider = new 
StaticHostProvider(connectStringParser.getServerAddresses(), clientConfig);
+            hostProvider = HostProviderFactory.create(connectStringParser, 
clientConfig);
         }
         this.hostProvider = hostProvider;
-
         chroot = Chroot.ofNullable(connectStringParser.getChrootPath());
         cnxn = createConnection(
             hostProvider,
@@ -1332,6 +1332,10 @@ public synchronized void close() throws 
InterruptedException {
             LOG.debug("Ignoring unexpected exception during close", e);
         }
 
+        if (hostProvider != null) {
+            hostProvider.close();
+        }
+
         LOG.info("Session: 0x{} closed", Long.toHexString(getSessionId()));
     }
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
index 5d2928196..6c2c7ec6b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import org.apache.zookeeper.common.NetUtils;
 import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
 
 /**
  * A parser for ZooKeeper Client connect strings.
@@ -38,38 +39,88 @@
 public final class ConnectStringParser {
 
     private static final int DEFAULT_PORT = 2181;
+    private static final String DNS_SRV_PREFIX = "dns-srv://";
 
-    private final String chrootPath;
+    public enum ConnectionType {
+        DNS_SRV,
+        HOST_PORT
+    }
 
+    private String chrootPath;
     private final ArrayList<InetSocketAddress> serverAddresses = new 
ArrayList<>();
+    private final ConnectionType connectionType;
+    private final String connectString;
 
     /**
-     * Parse host and port by splitting client connectString
-     * with support for IPv6 literals
-     * @throws IllegalArgumentException
-     *             for an invalid chroot path.
+     * Constructs a ConnectStringParser with given connect string.
+     *
+     * <p>Supports two connection string formats:</p>
+     * <ul>
+     * <li><strong>Host:Port format:</strong> 
"host1:2181,host2:2181/chroot"</li>
+     * <li><strong>DNS SRV format:</strong> 
"dns-srv://service.domain.com/chroot"</li>
+     * </ul>
+     *
+     * @param connectString the connect string to parse
+     * @throws IllegalArgumentException if connectString is null/empty or 
contains invalid chroot path
      */
-    public ConnectStringParser(String connectString) {
+    public ConnectStringParser(final String connectString) {
+        if (StringUtils.isBlank(connectString)) {
+            throw new IllegalArgumentException("Connect string cannot be null 
or empty");
+        }
+
+        if (connectString.startsWith(DNS_SRV_PREFIX)) {
+            connectionType = ConnectionType.DNS_SRV;
+            parseDnsSrvFormat(connectString);
+        } else {
+            connectionType = ConnectionType.HOST_PORT;
+            parseHostPortFormat(connectString);
+        }
+        this.connectString = connectString;
+    }
+
+    public String getChrootPath() {
+        return chrootPath;
+    }
+
+    public ArrayList<InetSocketAddress> getServerAddresses() {
+        return serverAddresses;
+    }
+
+    public ConnectionType getConnectionType() {
+        return connectionType;
+    }
+
+    public String getConnectString() {
+        return connectString;
+    }
+
+    private String[] parseConnectString(final String connectString) {
+        String serverPart = connectString;
+        String chrootPath = null;
+
         // parse out chroot, if any
-        int off = connectString.indexOf('/');
+        final int off = connectString.indexOf('/');
         if (off >= 0) {
-            String chrootPath = connectString.substring(off);
+            chrootPath = connectString.substring(off);
             // ignore "/" chroot spec, same as null
             if (chrootPath.length() == 1) {
-                this.chrootPath = null;
+                chrootPath = null;
             } else {
                 PathUtils.validatePath(chrootPath);
-                this.chrootPath = chrootPath;
             }
-            connectString = connectString.substring(0, off);
-        } else {
-            this.chrootPath = null;
+            serverPart = connectString.substring(0, off);
         }
+        return new String[]{serverPart, chrootPath};
+    }
+
+    private void parseHostPortFormat(final String connectString) {
+        final String[] parts = parseConnectString(connectString);
+        chrootPath = parts[1];
 
-        List<String> hostsList = split(connectString, ",");
+        final List<String> hostsList = split(parts[0], ",");
         for (String host : hostsList) {
             int port = DEFAULT_PORT;
-            String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
+            final String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
             if (hostAndPort.length != 0) {
                 host = hostAndPort[0];
                 if (hostAndPort.length == 2) {
@@ -89,12 +140,11 @@ public ConnectStringParser(String connectString) {
         }
     }
 
-    public String getChrootPath() {
-        return chrootPath;
+    private void parseDnsSrvFormat(final String connectString) {
+        final String[] parts = 
parseConnectString(connectString.substring(DNS_SRV_PREFIX.length()));
+        chrootPath = parts[1];
+        // The DNS service name is stored as a placeholder address
+        // The actual resolution will be handled by DnsSrvHostProvider
+        serverAddresses.add(InetSocketAddress.createUnresolved(parts[0], 
DEFAULT_PORT));
     }
-
-    public ArrayList<InetSocketAddress> getServerAddresses() {
-        return serverAddresses;
-    }
-
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
new file mode 100644
index 000000000..55cdfcd8e
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.common.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xbill.DNS.Lookup;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SRVRecord;
+import org.xbill.DNS.Type;
+
+/**
+ * DNS SRV-based HostProvider that dynamically resolves host port names from 
DNS SRV records.
+ *
+ * <p>This implementation periodically refreshes the server list by querying 
DNS SRV records
+ * and uses HostConnectionManager for all connection management and 
reconfiguration logic.</p>
+ *
+ * <p><strong>Two-Phase Update Strategy:</strong></p>
+ * <ul>
+ * <li><strong>Phase 1 (Background):</strong> Timer thread detects DNS changes 
without caching results</li>
+ * <li><strong>Phase 2 (Connection-time):</strong> Fresh DNS lookup during 
connect attempt if changes detected</li>
+ * </ul>
+ */
[email protected]
+public final class DnsSrvHostProvider implements HostProvider {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DnsSrvHostProvider.class);
+
+    public interface DnsSrvResolver {
+        SRVRecord[] lookupSrvRecords(String dnsSrvName) throws IOException;
+    }
+
+    private final String dnsSrvName;
+    private final DnsSrvResolver dnsResolver;
+
+    private final HostConnectionManager connectionManager;
+    // Track the previous server list to detect changes
+    private volatile Set<InetSocketAddress> previousServerSet;
+    private Timer dnsRefreshTimer;
+
+    // Track the current connected host for accurate load balancing decisions
+    private final AtomicReference<InetSocketAddress> currentConnectedHost = 
new AtomicReference<>();
+    private final AtomicBoolean serverListChanged = new AtomicBoolean(false);
+
+    /**
+     * Constructs a DnsSrvHostProvider with the given DNS name
+     *
+     * @param dnsSrvName the DNS name to query for SRV records
+     * @throws IllegalArgumentException if dnsSrvName is null or empty or 
invalid
+     *                                  or if no SRV records are found for the 
DNS name
+     *                                  or if DNS lookup fails
+     */
+    public DnsSrvHostProvider(final String dnsSrvName) {
+        this(dnsSrvName, null);
+    }
+
+
+    /**
+     * Constructs a DnsSrvHostProvider with the given DNS name and 
ZKClientConfig
+     *
+     * @param dnsSrvName the DNS name to query for SRV records
+     * @param clientConfig ZooKeeper client configuration
+     * @throws IllegalArgumentException if dnsSrvName is null or empty or 
invalid
+     *                                  or if no SRV records are found for the 
DNS name
+     *                                  or if DNS lookup fails
+     */
+    public DnsSrvHostProvider(final String dnsSrvName, final ZKClientConfig 
clientConfig) {
+        this(dnsSrvName, System.currentTimeMillis() ^ dnsSrvName.hashCode(), 
clientConfig);
+    }
+
+    /**
+     * Constructs a DnsSrvHostProvider with the given DNS name, randomness 
seed and ZKClientConfig
+     *
+     * @param dnsSrvName the DNS name to query for SRV records
+     * @param randomnessSeed seed for randomization
+     * @param clientConfig ZooKeeper client configuration
+     * @throws IllegalArgumentException if dnsSrvName is null or empty or 
invalid
+     *                                  or if no SRV records are found for the 
DNS name
+     *                                  or if DNS lookup fails
+     */
+    public DnsSrvHostProvider(final String dnsSrvName, final long 
randomnessSeed, final ZKClientConfig clientConfig) {
+        this(dnsSrvName, randomnessSeed, new DefaultDnsResolver(), 
clientConfig);
+    }
+
+    /**
+     * Constructs a DnsSrvHostProvider with the given DNS name, randomization 
seed, DNS resolver and ZKClientConfig
+     *
+     * @param dnsSrvName the DNS name to query for SRV records
+     * @param randomnessSeed seed for randomization
+     * @param dnsResolver custom DNS resolver
+     * @param clientConfig ZooKeeper client configuration
+     * @throws IllegalArgumentException if dnsSrvName is null or empty or 
invalid
+     *                                  or if no SRV records are found for the 
DNS name
+     *                                  or if DNS lookup fails
+     */
+    public DnsSrvHostProvider(final String dnsSrvName, final long 
randomnessSeed, final DnsSrvResolver dnsResolver, final ZKClientConfig 
clientConfig) {
+        if (StringUtils.isBlank(dnsSrvName)) {
+            throw new IllegalArgumentException("DNS name cannot be null or 
empty");
+        }
+
+        this.dnsSrvName = dnsSrvName;
+        this.dnsResolver = dnsResolver;
+        try {
+            final List<InetSocketAddress> serverAddresses = 
lookupDnsSrvRecords();
+            if (serverAddresses.isEmpty()) {
+                LOG.error("No SRV records found for DNS name: {}", dnsSrvName);
+                throw new IllegalArgumentException("No SRV records found for 
DNS name: " + dnsSrvName);
+            }
+
+            this.connectionManager = new 
HostConnectionManager(serverAddresses, randomnessSeed, clientConfig);
+            this.previousServerSet = new HashSet<>(serverAddresses);
+
+
+            final long refreshIntervalInSeconds = getRefreshInterval();
+            if (refreshIntervalInSeconds > 0) {
+                dnsRefreshTimer = new Timer("DnsSrvRefresh-" + dnsSrvName, 
true);
+                dnsRefreshTimer.scheduleAtFixedRate(new TimerTask() {
+                                                        @Override
+                                                        public void run() {
+                                                            
refreshServerListInBackground();
+                                                        }
+                                                    },
+                        refreshIntervalInSeconds * 1000,
+                        refreshIntervalInSeconds * 1000);
+            }
+            LOG.info("DnsSrvHostProvider initialized with {} servers from DNS 
name: {} with refresh interval: {} seconds",
+                    serverAddresses.size(), dnsSrvName, 
refreshIntervalInSeconds);
+        } catch (final Exception e) {
+            LOG.error("Failed to initialize DnsSrvHostProvider for DNS name: 
{}", dnsSrvName, e);
+
+            if (dnsRefreshTimer != null) {
+                dnsRefreshTimer.cancel();
+            }
+
+            if (e instanceof IllegalArgumentException) {
+                throw e;
+            } else {
+                throw new IllegalArgumentException("Failed to initialize 
DnsSrvHostProvider for DNS name: " + dnsSrvName, e);
+            }
+        }
+    }
+
+    @Override
+    public int size() {
+        return connectionManager.size();
+    }
+
+    @Override
+    public InetSocketAddress next(long spinDelay) {
+        applyServerListUpdate();
+        return connectionManager.next(spinDelay);
+    }
+
+    @Override
+    public void onConnected() {
+        currentConnectedHost.set(connectionManager.getServerAtCurrentIndex());
+        connectionManager.onConnected();
+    }
+
+    @Override
+    public boolean updateServerList(Collection<InetSocketAddress> 
serverAddresses, InetSocketAddress currentHost) {
+        return connectionManager.updateServerList(serverAddresses, 
currentHost);
+    }
+
+    @Override
+    public void close() {
+        if (dnsRefreshTimer != null) {
+            dnsRefreshTimer.cancel();
+        }
+    }
+
+    private long getRefreshInterval() {
+        final long defaultInterval = 
ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS_DEFAULT;
+        final long interval = 
Long.getLong(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS, defaultInterval);
+        if (interval < 0) {
+            LOG.error("Invalid DNS SRV refresh interval {} seconds", interval);
+            throw new IllegalArgumentException("Invalid DNS SRV refresh 
interval: " + interval);
+        }
+        if (interval == 0) {
+            LOG.info("DNS SRV refresh disabled (interval = 0) for {}", 
dnsSrvName);
+        }
+        return interval;
+    }
+
+    private List<InetSocketAddress> lookupDnsSrvRecords() {
+        final List<InetSocketAddress> addresses = new ArrayList<>();
+
+        try {
+            final SRVRecord[] srvRecords = 
dnsResolver.lookupSrvRecords(dnsSrvName);
+            for (final SRVRecord srvRecord : srvRecords) {
+                final InetSocketAddress address = 
createAddressFromSrvRecord(srvRecord);
+                if (address != null) {
+                    addresses.add(address);
+                }
+            }
+        } catch (final Exception e) {
+            LOG.error("DNS SRV lookup failed for {}", dnsSrvName, e);
+            throw new RuntimeException("DNS SRV lookup failed for " + 
dnsSrvName, e);
+        }
+        return addresses;
+    }
+
+    private InetSocketAddress createAddressFromSrvRecord(final SRVRecord 
srvRecord) {
+        if (srvRecord == null) {
+            LOG.error("Null SRV record encountered from DnsSrvResolver 
implementation");
+            return null;
+        }
+
+        try {
+            final String target = srvRecord.getTarget().toString(true);
+            final int port = srvRecord.getPort();
+
+            if (port <= 0 || port > 65535) {
+                LOG.error("Invalid port {} in SRV record for target {}", port, 
target);
+                return null;
+            }
+
+            if (StringUtils.isBlank(target)) {
+                LOG.error("Empty or blank target in SRV record {}", srvRecord);
+                return null;
+            }
+
+            return new InetSocketAddress(target, port);
+        } catch (final Exception e) {
+            LOG.error("Failed to create InetSocketAddress from SRV record {}", 
srvRecord, e);
+            return null;
+        }
+    }
+
+    /**
+     * Performs background DNS refresh to detect server list changes without 
caching DNS responses.
+     *
+     * <p><strong>Important:</strong> This method only detects changes, the 
actual fresh DNS query happens
+     * in {@link #applyServerListUpdate()} when needed.</p>
+     */
+    private void refreshServerListInBackground() {
+        try {
+            // Refresh the server list
+            final List<InetSocketAddress> newAddresses = lookupDnsSrvRecords();
+            if (newAddresses.isEmpty()) {
+                LOG.warn("DNS SRV lookup returned no records for {}, will 
retry on next refresh", dnsSrvName);
+                return;
+            }
+
+            // Check if server list has changed
+            final Set<InetSocketAddress> newServerSet = new 
HashSet<>(newAddresses);
+            if (!Objects.equals(previousServerSet, newServerSet)) {
+                serverListChanged.set(true);
+                LOG.info("Server list change detected from DNS SRV: {} 
servers", newAddresses.size());
+            }
+        } catch (final Exception e) {
+            LOG.warn("Failed to refresh server list from DNS SRV records for 
{}: {}", dnsSrvName, e.getMessage());
+        }
+    }
+
+    /**
+     * Apply pending server list updates when connection is actually needed.
+     * It is called from next() to ensure server list changes are applied with 
proper connection context.
+     */
+    private synchronized void applyServerListUpdate() {
+        if (serverListChanged.get()) {
+            try {
+                // Query DNS to get the updated server list
+                final List<InetSocketAddress> latestServerList = 
lookupDnsSrvRecords();
+                if (latestServerList.isEmpty()) {
+                    LOG.warn("DNS SRV lookup returned no records for {}, will 
use the existing ones", dnsSrvName);
+                    return;
+                }
+
+                final boolean needReconnect = 
connectionManager.updateServerList(latestServerList, 
currentConnectedHost.get());
+                previousServerSet = new HashSet<>(latestServerList);
+                serverListChanged.set(false);
+
+                LOG.info("Applied server list update during connection 
attempt. servers size: {}, need reconnection: {}",
+                        latestServerList.size(), needReconnect);
+            } catch (final Exception e) {
+                LOG.warn("Failed to apply server list update", e);
+            }
+        }
+    }
+
+    private static class DefaultDnsResolver implements DnsSrvResolver {
+        @Override
+        public SRVRecord[] lookupSrvRecords(final String dnsSrvName) throws 
IOException {
+            final Lookup lookup = new Lookup(dnsSrvName, Type.SRV);
+            final Record[] records = lookup.run();
+            if (lookup.getResult() != Lookup.SUCCESSFUL) {
+                final String errorMsg = lookup.getErrorString();
+                throw new IOException("DNS SRV lookup failed for " + 
dnsSrvName + ": " + errorMsg);
+            }
+
+            if (records == null) {
+                return new SRVRecord[0];
+            }
+            return 
Arrays.stream(records).map(SRVRecord.class::cast).toArray(SRVRecord[]::new);
+        }
+    }
+}
+
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
similarity index 71%
copy from 
zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
copy to 
zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
index e07754c35..b278014a3 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
@@ -32,159 +32,99 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Most simple HostProvider, resolves on every next() call.
+ * Manages ZooKeeper server connections with round-robin selection and load 
balancing during
+ * cluster reconfiguration.
  *
- * Please be aware that although this class doesn't do any DNS caching, 
there're multiple levels of caching already
- * present across the stack like in JVM, OS level, hardware, etc. The best we 
could do here is to get the most recent
- * address from the underlying system which is considered up-to-date.
+ * <p><strong>Key Features:</strong></p>
+ * <ul>
+ * <li>Round-robin server selection with IP address resolution</li>
+ * <li>Migrates clients to balance load when servers are added or removed</li>
+ * <li>Probabilistic client migration during server list updates</li>
+ * </ul>
  *
+ * <p><strong>Reconfiguration Behavior:</strong></p>
+ * When server list changes, enters "reconfigMode" and calculates migration 
probabilities
+ * to balance load between old and new servers.
+ *
+ * <p>See <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355";>ZOOKEEPER-1355</a>
+ * for more details.</p>
  */
[email protected]
-public final class StaticHostProvider implements HostProvider {
[email protected]
+public final class HostConnectionManager {
 
+    /**
+     * Interface for address resolution to support testing and different 
resolution strategies.
+     */
     public interface Resolver {
-
         InetAddress[] getAllByName(String name) throws UnknownHostException;
-
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(StaticHostProvider.class);
-
-    private ZKClientConfig clientConfig = null;
-
-    private List<InetSocketAddress> serverAddresses = new ArrayList<>(5);
+    private static final Logger LOG = 
LoggerFactory.getLogger(HostConnectionManager.class);
 
-    private Random sourceOfRandomness;
-    private int lastIndex = -1;
+    private List<InetSocketAddress> serverAddresses;
+    private final Random sourceOfRandomness;
+    private final Resolver resolver;
+    private final ZKClientConfig clientConfig;
 
-    private int currentIndex = -1;
+    private int lastIndex;
+    private int currentIndex;
 
     /**
      * The following fields are used to migrate clients during reconfiguration
      */
     private boolean reconfigMode = false;
-
     private final List<InetSocketAddress> oldServers = new ArrayList<>(5);
-
     private final List<InetSocketAddress> newServers = new ArrayList<>(5);
-
     private int currentIndexOld = -1;
     private int currentIndexNew = -1;
-
     private float pOld, pNew;
 
-    private Resolver resolver;
-
     /**
-     * Constructs a SimpleHostSet.
+     * Constructs a HostConnectionManager with default resolver.
      *
      * @param serverAddresses
-     *            possibly unresolved ZooKeeper server addresses
-     * @throws IllegalArgumentException
-     *             if serverAddresses is empty or resolves to an empty list
-     */
-    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        });
-    }
-
-    /**
-     * Constructs a SimpleHostSet with ZKClientConfig.
-     *
-     * Introduced this new overload in 3.10.0 in order to take advantage of 
some newly introduced feature flags. Like
-     * the shuffle (old) / not to shuffle (new) behavior of DNS resolution.
-     *
-     * @param serverAddresses
-     *            possibly unresolved ZooKeeper server addresses
+     *              possibly unresolved ZooKeeper server addresses
+     *  @param randomnessSeed
+     *              a seed used to initialize sourceOfRandomness
      * @param clientConfig
-     *            ZooKeeper client configuration
+     *              ZooKeeper client configuration
+     * @throws IllegalArgumentException
+     *              if serverAddresses is empty
      */
-    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
ZKClientConfig clientConfig) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        }, clientConfig);
+    public HostConnectionManager(Collection<InetSocketAddress> 
serverAddresses, long randomnessSeed, ZKClientConfig clientConfig) {
+        this(serverAddresses, randomnessSeed, null, clientConfig);
     }
 
     /**
-     * Constructs a SimpleHostSet.
-     *
-     * Introduced for testing purposes. getAllByName() is a static method of 
InetAddress, therefore cannot be easily mocked.
-     * By abstraction of Resolver interface we can easily inject a mocked 
implementation in tests.
+     * Constructs a HostConnectionManager with custom resolver.
      *
      * @param serverAddresses
      *              possibly unresolved ZooKeeper server addresses
+     * @param randomnessSeed
+     *              a seed used to initialize sourceOfRandomness
      * @param resolver
      *              custom resolver implementation
-     */
-    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
Resolver resolver) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
resolver);
-    }
-
-    /**
-     * Constructs a SimpleHostSet. This constructor is used from 
StaticHostProviderTest to produce deterministic test results
-     * by initializing sourceOfRandomness with the same seed
-     *
-     * @param serverAddresses
-     *            possibly unresolved ZooKeeper server addresses
-     * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+     * @param clientConfig
+     *              ZooKeeper client configuration
      * @throws IllegalArgumentException
-     *             if serverAddresses is empty or resolves to an empty list
+     *              if serverAddresses is empty
      */
-    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
long randomnessSeed) {
-        init(serverAddresses, randomnessSeed, new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        });
-    }
-
-    private void init(Collection<InetSocketAddress> serverAddresses, long 
randomnessSeed, Resolver resolver) {
-        init(serverAddresses, randomnessSeed, resolver, null);
-    }
-
-    private void init(Collection<InetSocketAddress> serverAddresses, long 
randomnessSeed, Resolver resolver,
-                      ZKClientConfig clientConfig) {
-        this.clientConfig = clientConfig == null ? new ZKClientConfig() : 
clientConfig;
-        this.sourceOfRandomness = new Random(randomnessSeed);
-        this.resolver = resolver;
+    public HostConnectionManager(Collection<InetSocketAddress> serverAddresses,
+                                long randomnessSeed,
+                                Resolver resolver,
+                                ZKClientConfig clientConfig) {
         if (serverAddresses.isEmpty()) {
             throw new IllegalArgumentException("A HostProvider may not be 
empty!");
         }
+        this.sourceOfRandomness = new Random(randomnessSeed);
         this.serverAddresses = shuffle(serverAddresses);
+        this.resolver = resolver == null ? InetAddress::getAllByName : 
resolver;
+        this.clientConfig = clientConfig == null ? new ZKClientConfig() : 
clientConfig;
+
         currentIndex = -1;
         lastIndex = -1;
-    }
 
-    private InetSocketAddress resolve(InetSocketAddress address) {
-        try {
-            String curHostString = address.getHostString();
-            List<InetAddress> resolvedAddresses = new 
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
-            if (resolvedAddresses.isEmpty()) {
-                return address;
-            }
-            if (clientConfig.isShuffleDnsResponseEnabled()) {
-                Collections.shuffle(resolvedAddresses);
-            }
-            return new InetSocketAddress(resolvedAddresses.get(0), 
address.getPort());
-        } catch (UnknownHostException e) {
-            LOG.error("Unable to resolve address: {}", address.toString(), e);
-            return address;
-        }
-    }
-
-    private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> 
serverAddresses) {
-        List<InetSocketAddress> tmpList = new 
ArrayList<>(serverAddresses.size());
-        tmpList.addAll(serverAddresses);
-        Collections.shuffle(tmpList, sourceOfRandomness);
-        return tmpList;
+        LOG.info("HostConnectionManager initialized with {} servers", 
serverAddresses.size());
     }
 
     /**
@@ -197,29 +137,30 @@ private List<InetSocketAddress> 
shuffle(Collection<InetSocketAddress> serverAddr
      *    this client migrates or not (i.e., whether true or false is 
returned) is probabilistic so that the expected
      *    number of clients connected to each server is the same.
      *
-     * If true is returned, the function sets pOld and pNew that correspond to 
the probability to migrate to ones of the
+     * If true is returned, the function sets pOld and pNew that correspond to 
the probability to migrate to one of the
      * new servers in serverAddresses or one of the old servers (migrating to 
one of the old servers is done only
      * if our client's currentHost is not in serverAddresses). See 
nextHostInReconfigMode for the selection logic.
      *
      * See <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355";>ZOOKEEPER-1355</a>
-     * for the protocol and its evaluation, and StaticHostProviderTest for the 
tests that illustrate how load balancing
-     * works with this policy.
+     * for the protocol and its evaluation.
      *
-     * @param serverAddresses new host list
-     * @param currentHost the host to which this client is currently connected
+     * @param serverAddresses
+     *              new host list
+     * @param currentHost
+     *              the host to which this client is currently connected
      * @return true if changing connections is necessary for load-balancing, 
false otherwise
      */
-    @Override
-    public synchronized boolean updateServerList(
-        Collection<InetSocketAddress> serverAddresses,
-        InetSocketAddress currentHost) {
+    synchronized boolean updateServerList(Collection<InetSocketAddress> 
serverAddresses, InetSocketAddress currentHost) {
         List<InetSocketAddress> shuffledList = shuffle(serverAddresses);
         if (shuffledList.isEmpty()) {
-            throw new IllegalArgumentException("A HostProvider may not be 
empty!");
+            throw new IllegalArgumentException("The server list may not be 
empty!");
         }
+
+        int oldSize = this.serverAddresses.size();
+        int newSize = shuffledList.size();
+
         // Check if client's current server is in the new list of servers
         boolean myServerInNewConfig = false;
-
         InetSocketAddress myServer = currentHost;
 
         // choose "current" server according to the client rebalancing 
algorithm
@@ -308,24 +249,133 @@ public synchronized boolean updateServerList(
         currentIndexOld = -1;
         currentIndexNew = -1;
         lastIndex = currentIndex;
+
+        LOG.info("Server list updated: oldSize={}, newSize={}, 
reconfigMode={}", oldSize, newSize, reconfigMode);
+
         return reconfigMode;
     }
 
-    public synchronized InetSocketAddress getServerAtIndex(int i) {
+    /**
+     * Get the next server to connect to.
+     *
+     * @param spinDelay milliseconds to wait if all hosts have been tried once
+     * @return the next server address to connect to
+     */
+    InetSocketAddress next(long spinDelay) {
+        boolean needToSleep = false;
+        InetSocketAddress addr;
+
+        synchronized (this) {
+            if (reconfigMode) {
+                addr = nextHostInReconfigMode();
+                if (addr != null) {
+                    currentIndex = serverAddresses.indexOf(addr);
+                    return resolve(addr);
+                }
+                //tried all servers and couldn't connect
+                reconfigMode = false;
+                needToSleep = (spinDelay > 0);
+            }
+            ++currentIndex;
+            if (currentIndex == serverAddresses.size()) {
+                currentIndex = 0;
+            }
+            addr = serverAddresses.get(currentIndex);
+            needToSleep = needToSleep || (currentIndex == lastIndex && 
spinDelay > 0);
+            if (lastIndex == -1) {
+                // We don't want to sleep on the first ever connect attempt.
+                lastIndex = 0;
+            }
+        }
+
+        if (needToSleep) {
+            try {
+                Thread.sleep(spinDelay);
+            } catch (InterruptedException e) {
+                LOG.warn("Unexpected exception", e);
+            }
+        }
+
+        return resolve(addr);
+    }
+
+    /**
+     * Notify that a connection has been established successfully.
+     */
+    synchronized void onConnected() {
+        lastIndex = currentIndex;
+        reconfigMode = false;
+    }
+
+    /**
+     * Get the server at the specified index.
+     *
+     * @param i the index
+     * @return the server address at the index, or null if index is out of 
bounds
+     */
+    synchronized InetSocketAddress getServerAtIndex(int i) {
         if (i < 0 || i >= serverAddresses.size()) {
             return null;
         }
         return serverAddresses.get(i);
     }
 
-    public synchronized InetSocketAddress getServerAtCurrentIndex() {
+    /**
+     * Get the server at the current index.
+     *
+     * @return the server address at the current index
+     */
+    synchronized InetSocketAddress getServerAtCurrentIndex() {
         return getServerAtIndex(currentIndex);
     }
 
-    public synchronized int size() {
+    /**
+     * Get the number of servers in the list.
+     *
+     * @return the number of servers
+     */
+    synchronized int size() {
         return serverAddresses.size();
     }
 
+    /**
+     * Resolves an InetSocketAddress to a concrete address.
+     *
+     * @param address
+     *              the address to resolve
+     * @return resolved address or original address if resolution fails
+     */
+    private InetSocketAddress resolve(InetSocketAddress address) {
+        try {
+            String curHostString = address.getHostString();
+            List<InetAddress> resolvedAddresses = new 
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
+            if (resolvedAddresses.isEmpty()) {
+                return address;
+            }
+            if (clientConfig.isShuffleDnsResponseEnabled()) {
+                Collections.shuffle(resolvedAddresses);
+            }
+            return new InetSocketAddress(resolvedAddresses.get(0), 
address.getPort());
+        } catch (UnknownHostException e) {
+            LOG.error("Unable to resolve address: {}", address, e);
+            return address;
+        }
+    }
+
+    /**
+     * Shuffles the server addresses using the internal random source.
+     *
+     * @param serverAddresses
+     *              collection of server addresses to shuffle
+     * @return shuffled list of server addresses
+     */
+    private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> 
serverAddresses) {
+        List<InetSocketAddress> tmpList = new 
ArrayList<>(serverAddresses.size());
+        tmpList.addAll(serverAddresses);
+        Collections.shuffle(tmpList, sourceOfRandomness);
+        return tmpList;
+    }
+
     /**
      * Get the next server to connect to, when in "reconfigMode", which means 
that
      * you've just updated the server list, and now trying to find some server 
to connect to.
@@ -360,47 +410,4 @@ private InetSocketAddress nextHostInReconfigMode() {
 
         return null;
     }
-
-    public InetSocketAddress next(long spinDelay) {
-        boolean needToSleep = false;
-        InetSocketAddress addr;
-
-        synchronized (this) {
-            if (reconfigMode) {
-                addr = nextHostInReconfigMode();
-                if (addr != null) {
-                    currentIndex = serverAddresses.indexOf(addr);
-                    return resolve(addr);
-                }
-                //tried all servers and couldn't connect
-                reconfigMode = false;
-                needToSleep = (spinDelay > 0);
-            }
-            ++currentIndex;
-            if (currentIndex == serverAddresses.size()) {
-                currentIndex = 0;
-            }
-            addr = serverAddresses.get(currentIndex);
-            needToSleep = needToSleep || (currentIndex == lastIndex && 
spinDelay > 0);
-            if (lastIndex == -1) {
-                // We don't want to sleep on the first ever connect attempt.
-                lastIndex = 0;
-            }
-        }
-        if (needToSleep) {
-            try {
-                Thread.sleep(spinDelay);
-            } catch (InterruptedException e) {
-                LOG.warn("Unexpected exception", e);
-            }
-        }
-
-        return resolve(addr);
-    }
-
-    public synchronized void onConnected() {
-        lastIndex = currentIndex;
-        reconfigMode = false;
-    }
-
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
index 73a102f1a..f154f080b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
@@ -43,7 +43,7 @@
  * * A HostProvider that prefers nearby hosts.
  */
 @InterfaceAudience.Public
-public interface HostProvider {
+public interface HostProvider extends AutoCloseable {
 
     int size();
 
@@ -72,4 +72,12 @@ public interface HostProvider {
      */
     boolean updateServerList(Collection<InetSocketAddress> serverAddresses, 
InetSocketAddress currentHost);
 
+    /**
+     * Close the HostProvider and release any resources.
+     *
+     * Default implementation does nothing for backward compatibility
+     */
+    @Override
+    default void close() {
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
new file mode 100644
index 000000000..4f3f3a374
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory for creating appropriate HostProvider instances based on connect 
string format.
+ * This factory enables zero-code-change migration by automatically detecting 
the connect
+ * string format and creating the appropriate HostProvider implementation.
+ *
+ * Supported formats:
+ * - Host:Port: "host1:port1,host2:port2,host3:port3" (StaticHostProvider)
+ * - DNS SRV: "dns-srv://service.domain.com"  (DnsSrvHostProvider)
+ *
+ * - Future formats can be easily added by extending the factory
+ */
[email protected]
+public class HostProviderFactory {
+    /**
+     * Creates a HostProvider based on the connect string format.
+     * This is a convenience method for zero-code-change migration.
+     *
+     * @param connectString the connect string
+     * @return appropriate HostProvider
+     */
+    public static HostProvider create(final String connectString) {
+        final ConnectStringParser connectStringParser = new 
ConnectStringParser(connectString);
+        return create(connectStringParser, null);
+    }
+
+    /**
+     * Creates a HostProvider based on the connect string format.
+     *
+     * @param connectStringParser the connect string parser
+     * @param clientConfig ZooKeeper client configuration
+     *
+     * @return appropriate HostProvider
+     */
+    @InterfaceAudience.Private
+    public static HostProvider create(final ConnectStringParser 
connectStringParser, final ZKClientConfig clientConfig) {
+        switch (connectStringParser.getConnectionType()) {
+            case DNS_SRV:
+                final String dnsSrvName = 
connectStringParser.getServerAddresses().get(0).getHostString();
+                return new DnsSrvHostProvider(dnsSrvName, clientConfig);
+            default:
+                return new 
StaticHostProvider(connectStringParser.getServerAddresses(), clientConfig);
+        }
+    }
+}
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
index e07754c35..6cc45a1a0 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
@@ -21,15 +21,8 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Most simple HostProvider, resolves on every next() call.
@@ -38,6 +31,7 @@
  * present across the stack like in JVM, OS level, hardware, etc. The best we 
could do here is to get the most recent
  * address from the underlying system which is considered up-to-date.
  *
+ * It uses HostConnectionManager for all connection management and 
reconfiguration logic.
  */
 @InterfaceAudience.Public
 public final class StaticHostProvider implements HostProvider {
@@ -48,32 +42,7 @@ public interface Resolver {
 
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(StaticHostProvider.class);
-
-    private ZKClientConfig clientConfig = null;
-
-    private List<InetSocketAddress> serverAddresses = new ArrayList<>(5);
-
-    private Random sourceOfRandomness;
-    private int lastIndex = -1;
-
-    private int currentIndex = -1;
-
-    /**
-     * The following fields are used to migrate clients during reconfiguration
-     */
-    private boolean reconfigMode = false;
-
-    private final List<InetSocketAddress> oldServers = new ArrayList<>(5);
-
-    private final List<InetSocketAddress> newServers = new ArrayList<>(5);
-
-    private int currentIndexOld = -1;
-    private int currentIndexNew = -1;
-
-    private float pOld, pNew;
-
-    private Resolver resolver;
+    private final HostConnectionManager connectionManager;
 
     /**
      * Constructs a SimpleHostSet.
@@ -84,12 +53,7 @@ public interface Resolver {
      *             if serverAddresses is empty or resolves to an empty list
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        });
+        connectionManager = new HostConnectionManager(serverAddresses, 
System.currentTimeMillis() ^ this.hashCode(), null);
     }
 
     /**
@@ -104,12 +68,10 @@ public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
      *            ZooKeeper client configuration
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
ZKClientConfig clientConfig) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        }, clientConfig);
+        connectionManager = new HostConnectionManager(serverAddresses,
+                                                        
System.currentTimeMillis() ^ this.hashCode(),
+                                                        null,
+                                                        clientConfig);
     }
 
     /**
@@ -124,7 +86,10 @@ public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
      *              custom resolver implementation
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
Resolver resolver) {
-        init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), 
resolver);
+        this.connectionManager = new HostConnectionManager(serverAddresses,
+                                                    System.currentTimeMillis() 
^ this.hashCode(),
+                                                    resolver::getAllByName,
+                                                    null);
     }
 
     /**
@@ -133,274 +98,42 @@ public StaticHostProvider(Collection<InetSocketAddress> 
serverAddresses, Resolve
      *
      * @param serverAddresses
      *            possibly unresolved ZooKeeper server addresses
-     * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+     * @param randomnessSeed a seed used to initialize sourceOfRandomness
      * @throws IllegalArgumentException
      *             if serverAddresses is empty or resolves to an empty list
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, 
long randomnessSeed) {
-        init(serverAddresses, randomnessSeed, new Resolver() {
-            @Override
-            public InetAddress[] getAllByName(String name) throws 
UnknownHostException {
-                return InetAddress.getAllByName(name);
-            }
-        });
-    }
-
-    private void init(Collection<InetSocketAddress> serverAddresses, long 
randomnessSeed, Resolver resolver) {
-        init(serverAddresses, randomnessSeed, resolver, null);
-    }
-
-    private void init(Collection<InetSocketAddress> serverAddresses, long 
randomnessSeed, Resolver resolver,
-                      ZKClientConfig clientConfig) {
-        this.clientConfig = clientConfig == null ? new ZKClientConfig() : 
clientConfig;
-        this.sourceOfRandomness = new Random(randomnessSeed);
-        this.resolver = resolver;
-        if (serverAddresses.isEmpty()) {
-            throw new IllegalArgumentException("A HostProvider may not be 
empty!");
-        }
-        this.serverAddresses = shuffle(serverAddresses);
-        currentIndex = -1;
-        lastIndex = -1;
-    }
-
-    private InetSocketAddress resolve(InetSocketAddress address) {
-        try {
-            String curHostString = address.getHostString();
-            List<InetAddress> resolvedAddresses = new 
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
-            if (resolvedAddresses.isEmpty()) {
-                return address;
-            }
-            if (clientConfig.isShuffleDnsResponseEnabled()) {
-                Collections.shuffle(resolvedAddresses);
-            }
-            return new InetSocketAddress(resolvedAddresses.get(0), 
address.getPort());
-        } catch (UnknownHostException e) {
-            LOG.error("Unable to resolve address: {}", address.toString(), e);
-            return address;
-        }
-    }
-
-    private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> 
serverAddresses) {
-        List<InetSocketAddress> tmpList = new 
ArrayList<>(serverAddresses.size());
-        tmpList.addAll(serverAddresses);
-        Collections.shuffle(tmpList, sourceOfRandomness);
-        return tmpList;
+        this.connectionManager = new HostConnectionManager(serverAddresses,
+                                                            randomnessSeed,
+                                                            null,
+                                                            null);
     }
 
-    /**
-     * Update the list of servers. This returns true if changing connections 
is necessary for load-balancing, false
-     * otherwise. Changing connections is necessary if one of the following 
holds:
-     * a) the host to which this client is currently connected is not in 
serverAddresses.
-     *    Otherwise (if currentHost is in the new list serverAddresses):
-     * b) the number of servers in the cluster is increasing - in this case 
the load on currentHost should decrease,
-     *    which means that SOME of the clients connected to it will migrate to 
the new servers. The decision whether
-     *    this client migrates or not (i.e., whether true or false is 
returned) is probabilistic so that the expected
-     *    number of clients connected to each server is the same.
-     *
-     * If true is returned, the function sets pOld and pNew that correspond to 
the probability to migrate to ones of the
-     * new servers in serverAddresses or one of the old servers (migrating to 
one of the old servers is done only
-     * if our client's currentHost is not in serverAddresses). See 
nextHostInReconfigMode for the selection logic.
-     *
-     * See <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355";>ZOOKEEPER-1355</a>
-     * for the protocol and its evaluation, and StaticHostProviderTest for the 
tests that illustrate how load balancing
-     * works with this policy.
-     *
-     * @param serverAddresses new host list
-     * @param currentHost the host to which this client is currently connected
-     * @return true if changing connections is necessary for load-balancing, 
false otherwise
-     */
     @Override
-    public synchronized boolean updateServerList(
-        Collection<InetSocketAddress> serverAddresses,
-        InetSocketAddress currentHost) {
-        List<InetSocketAddress> shuffledList = shuffle(serverAddresses);
-        if (shuffledList.isEmpty()) {
-            throw new IllegalArgumentException("A HostProvider may not be 
empty!");
-        }
-        // Check if client's current server is in the new list of servers
-        boolean myServerInNewConfig = false;
-
-        InetSocketAddress myServer = currentHost;
-
-        // choose "current" server according to the client rebalancing 
algorithm
-        if (reconfigMode) {
-            myServer = next(0);
-        }
-
-        // if the client is not currently connected to any server
-        if (myServer == null) {
-            // reconfigMode = false (next shouldn't return null).
-            if (lastIndex >= 0) {
-                // take the last server to which we were connected
-                myServer = this.serverAddresses.get(lastIndex);
-            } else {
-                // take the first server on the list
-                myServer = this.serverAddresses.get(0);
-            }
-        }
-
-        for (InetSocketAddress addr : shuffledList) {
-            if (addr.getPort() == myServer.getPort()
-                && ((addr.getAddress() != null
-                     && myServer.getAddress() != null
-                     && addr.getAddress().equals(myServer.getAddress()))
-                    || addr.getHostString().equals(myServer.getHostString()))) 
{
-                myServerInNewConfig = true;
-                break;
-            }
-        }
-
-        reconfigMode = true;
-
-        newServers.clear();
-        oldServers.clear();
-        // Divide the new servers into oldServers that were in the previous 
list
-        // and newServers that were not in the previous list
-        for (InetSocketAddress address : shuffledList) {
-            if (this.serverAddresses.contains(address)) {
-                oldServers.add(address);
-            } else {
-                newServers.add(address);
-            }
-        }
-
-        int numOld = oldServers.size();
-        int numNew = newServers.size();
-
-        // number of servers increased
-        if (numOld + numNew > this.serverAddresses.size()) {
-            if (myServerInNewConfig) {
-                // my server is in new config, but load should be decreased.
-                // Need to decide if this client
-                // is moving to one of the new servers
-                if (sourceOfRandomness.nextFloat() <= (1 - ((float) 
this.serverAddresses.size()) / (numOld + numNew))) {
-                    pNew = 1;
-                    pOld = 0;
-                } else {
-                    // do nothing special - stay with the current server
-                    reconfigMode = false;
-                }
-            } else {
-                // my server is not in new config, and load on old servers must
-                // be decreased, so connect to
-                // one of the new servers
-                pNew = 1;
-                pOld = 0;
-            }
-        } else { // number of servers stayed the same or decreased
-            if (myServerInNewConfig) {
-                // my server is in new config, and load should be increased, so
-                // stay with this server and do nothing special
-                reconfigMode = false;
-            } else {
-                pOld = ((float) (numOld * (this.serverAddresses.size() - 
(numOld + numNew))))
-                       / ((numOld + numNew) * (this.serverAddresses.size() - 
numOld));
-                pNew = 1 - pOld;
-            }
-        }
-
-        if (!reconfigMode) {
-            currentIndex = shuffledList.indexOf(getServerAtCurrentIndex());
-        } else {
-            currentIndex = -1;
-        }
-        this.serverAddresses = shuffledList;
-        currentIndexOld = -1;
-        currentIndexNew = -1;
-        lastIndex = currentIndex;
-        return reconfigMode;
-    }
-
-    public synchronized InetSocketAddress getServerAtIndex(int i) {
-        if (i < 0 || i >= serverAddresses.size()) {
-            return null;
-        }
-        return serverAddresses.get(i);
+    public boolean updateServerList(Collection<InetSocketAddress> 
serverAddresses, InetSocketAddress currentHost) {
+        return connectionManager.updateServerList(serverAddresses, 
currentHost);
     }
 
-    public synchronized InetSocketAddress getServerAtCurrentIndex() {
-        return getServerAtIndex(currentIndex);
+    @Override
+    public int size() {
+        return connectionManager.size();
     }
 
-    public synchronized int size() {
-        return serverAddresses.size();
+    @Override
+    public InetSocketAddress next(long spinDelay) {
+        return connectionManager.next(spinDelay);
     }
 
-    /**
-     * Get the next server to connect to, when in "reconfigMode", which means 
that
-     * you've just updated the server list, and now trying to find some server 
to connect to.
-     * Once onConnected() is called, reconfigMode is set to false. Similarly, 
if we tried to connect
-     * to all servers in new config and failed, reconfigMode is set to false.
-     *
-     * While in reconfigMode, we should connect to a server in newServers with 
probability pNew and to servers in
-     * oldServers with probability pOld (which is just 1-pNew). If we tried 
out all servers in either oldServers
-     * or newServers we continue to try servers from the other set, regardless 
of pNew or pOld. If we tried all servers
-     * we give up and go back to the normal round robin mode
-     *
-     * When called, this should be protected by synchronized(this)
-     */
-    private InetSocketAddress nextHostInReconfigMode() {
-        boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew);
-
-        // take one of the new servers if it is possible (there are still such
-        // servers we didn't try),
-        // and either the probability tells us to connect to one of the new
-        // servers or if we already
-        // tried all the old servers
-        if (((currentIndexNew + 1) < newServers.size()) && (takeNew || 
(currentIndexOld + 1) >= oldServers.size())) {
-            ++currentIndexNew;
-            return newServers.get(currentIndexNew);
-        }
-
-        // start taking old servers
-        if ((currentIndexOld + 1) < oldServers.size()) {
-            ++currentIndexOld;
-            return oldServers.get(currentIndexOld);
-        }
-
-        return null;
+    @Override
+    public void onConnected() {
+        connectionManager.onConnected();
     }
 
-    public InetSocketAddress next(long spinDelay) {
-        boolean needToSleep = false;
-        InetSocketAddress addr;
-
-        synchronized (this) {
-            if (reconfigMode) {
-                addr = nextHostInReconfigMode();
-                if (addr != null) {
-                    currentIndex = serverAddresses.indexOf(addr);
-                    return resolve(addr);
-                }
-                //tried all servers and couldn't connect
-                reconfigMode = false;
-                needToSleep = (spinDelay > 0);
-            }
-            ++currentIndex;
-            if (currentIndex == serverAddresses.size()) {
-                currentIndex = 0;
-            }
-            addr = serverAddresses.get(currentIndex);
-            needToSleep = needToSleep || (currentIndex == lastIndex && 
spinDelay > 0);
-            if (lastIndex == -1) {
-                // We don't want to sleep on the first ever connect attempt.
-                lastIndex = 0;
-            }
-        }
-        if (needToSleep) {
-            try {
-                Thread.sleep(spinDelay);
-            } catch (InterruptedException e) {
-                LOG.warn("Unexpected exception", e);
-            }
-        }
-
-        return resolve(addr);
+    public InetSocketAddress getServerAtIndex(int i) {
+        return connectionManager.getServerAtIndex(i);
     }
 
-    public synchronized void onConnected() {
-        lastIndex = currentIndex;
-        reconfigMode = false;
+    public InetSocketAddress getServerAtCurrentIndex() {
+        return connectionManager.getServerAtCurrentIndex();
     }
-
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
index 91a390dda..a429d4838 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
@@ -69,6 +69,13 @@ public class ZKClientConfig extends ZKConfig {
     public static final String ZOOKEEPER_SHUFFLE_DNS_RESPONSE = 
"zookeeper.shuffleDnsResponse";
     public static final boolean ZOOKEEPER_SHUFFLE_DNS_RESPONSE_DEFAULT = false;
 
+    /**
+     * DNS SRV refresh interval in seconds for DnsSrvHostProvider.
+     * A value of 0 disables periodic refresh.
+     */
+    public static final String DNS_SRV_REFRESH_INTERVAL_SECONDS = 
"zookeeper.hostProvider.dnsSrvRefreshIntervalSeconds";
+    public static final long DNS_SRV_REFRESH_INTERVAL_SECONDS_DEFAULT = 60;
+
     public ZKClientConfig() {
         super();
         initFromJavaSystemProperties();
@@ -129,6 +136,7 @@ protected void handleBackwardCompatibility() {
         setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET));
         setProperty(SECURE_CLIENT, System.getProperty(SECURE_CLIENT));
         setProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS, 
System.getProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS));
+        setProperty(DNS_SRV_REFRESH_INTERVAL_SECONDS, 
System.getProperty(DNS_SRV_REFRESH_INTERVAL_SECONDS));
     }
 
     /**
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
new file mode 100644
index 000000000..386cb9d5a
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Test;
+
+public class HostProviderSelectionTest extends ZKTestCase {
+    @Test
+    public void testStaticHostProviderSelection() throws Exception {
+        final String[] staticFormats = {
+                "localhost:2181",
+                "zk1:2181,zk2:2181,zk3:2181",
+                "zk1:2181,zk2:2181/myapp",
+                "[::1]:2181,[2001:db8::1]:2181"
+        };
+
+        for (final String connectString : staticFormats) {
+            // Test without config
+            try (final ZooKeeper zk = new ZooKeeper(connectString,
+                    ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE)) {
+                assertNotNull(zk);
+            }
+
+            // Test with config
+            final ZKClientConfig config = new ZKClientConfig();
+            try (final ZooKeeper zk = new ZooKeeper(connectString,
+                    ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE, 
config)) {
+                assertNotNull(zk);
+            }
+        }
+    }
+
+    @Test
+    public void testDnsSrvHostProviderSelection() {
+        final String[] dnsSrvFormats = {
+                "dns-srv://nonexistent.test.local",
+                "dns-srv://nonexistent.test.local/myapp"
+        };
+
+        for (final String connectString : dnsSrvFormats) {
+            // Test without config
+            final IllegalArgumentException exception1 = 
assertThrows(IllegalArgumentException.class, () ->
+                    new ZooKeeper(connectString, 
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE));
+            validateDnsSrvError(exception1);
+
+            // Test with config
+            final ZKClientConfig config = new ZKClientConfig();
+            final IllegalArgumentException exception2 = 
assertThrows(IllegalArgumentException.class, () ->
+                    new ZooKeeper(connectString, 
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE, config));
+            validateDnsSrvError(exception2);
+        }
+    }
+
+    @Test
+    public void testInvalidFormats() {
+        final String[] invalidFormats = {
+                "",
+                "dns-srv://"
+        };
+
+        for (final String connectString : invalidFormats) {
+            final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+                    new ZooKeeper(connectString, 
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE));
+            final String message = exception.getMessage();
+            assertTrue(message.contains("Connect string cannot be null or 
empty")
+                || message.contains("DNS name cannot be null or empty"));
+        }
+    }
+
+    private void validateDnsSrvError(final IllegalArgumentException exception) 
{
+        final String message = exception.getMessage();
+        assertTrue(message.contains("Failed to initialize DnsSrvHostProvider 
for DNS name:"));
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
new file mode 100644
index 000000000..136b6e47a
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.zookeeper.client.DnsSrvHostProvider.DnsSrvResolver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.SRVRecord;
+
+public class DnsSrvHostProviderTest {
+
+    private static final String TEST_DNS_NAME = "_zookeeper._tcp.example.com.";
+    private static final long TEST_SEED = 12345L;
+
+    private DnsSrvResolver mockDnsSrvResolver;
+
+    @BeforeEach
+    public void setUp() {
+        mockDnsSrvResolver = mock(DnsSrvResolver.class);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        System.clearProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS);
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        final SRVRecord[] srvRecords = createMockSrvRecords();
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        try (final DnsSrvHostProvider hostProvider = new 
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+            assertEquals(3, hostProvider.size());
+            assertNotNull(hostProvider.next(0));
+        }
+    }
+
+    @Test
+    public void testServerIteration() throws Exception {
+        final SRVRecord[] srvRecords = createMockSrvRecords();
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        try (final DnsSrvHostProvider hostProvider = new 
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+            final InetSocketAddress addr1 = hostProvider.next(0);
+            final InetSocketAddress addr2 = hostProvider.next(0);
+            final InetSocketAddress addr3 = hostProvider.next(0);
+
+            final Set<InetSocketAddress> actualAddresses = new HashSet<>();
+            actualAddresses.add(addr1);
+            actualAddresses.add(addr2);
+            actualAddresses.add(addr3);
+
+            final Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+            expectedAddresses.add(new InetSocketAddress("server1.example.com", 
2181));
+            expectedAddresses.add(new InetSocketAddress("server2.example.com", 
2181));
+            expectedAddresses.add(new InetSocketAddress("server3.example.com", 
2181));
+
+            assertEquals(expectedAddresses, actualAddresses);
+
+            // cycle back
+            final InetSocketAddress addr4 = hostProvider.next(0);
+            assertTrue(expectedAddresses.contains(addr4));
+        }
+    }
+
+    @Test
+    public void testEmptyDnsName() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new DnsSrvHostProvider("", TEST_SEED, mockDnsSrvResolver, 
null));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new DnsSrvHostProvider(null, TEST_SEED, mockDnsSrvResolver, 
null));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new DnsSrvHostProvider("   ", TEST_SEED, mockDnsSrvResolver, 
null));
+    }
+
+    @Test
+    public void testNoSrvRecords() throws Exception {
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(new 
SRVRecord[0]);
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, 
mockDnsSrvResolver, null));
+    }
+
+    @Test
+    public void testDnsLookupFailure() throws Exception {
+        when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME))
+                .thenThrow(new java.io.IOException("DNS lookup failed"));
+
+        assertThrows(IllegalArgumentException.class,
+                () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, 
mockDnsSrvResolver, null));
+    }
+
+    @Test
+    public void testInvalidPortFiltering() throws Exception {
+        // Create SRV record with invalid port (0)
+        final SRVRecord invalidPortRecord = 
createMockSrvRecord("server1.example.com.", 0);
+        final SRVRecord[] srvRecords = new SRVRecord[]{invalidPortRecord};
+
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        assertThrows(IllegalArgumentException.class,
+            () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, 
mockDnsSrvResolver, null));
+    }
+
+    @Test
+    public void testTrailingDotRemoval() throws Exception {
+        final SRVRecord recordWithDot = 
createMockSrvRecord("server1.example.com.", 2181);
+        final SRVRecord[] srvRecords = new SRVRecord[]{recordWithDot};
+
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        try (final DnsSrvHostProvider hostProvider = new 
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+            assertEquals(1, hostProvider.size());
+            final InetSocketAddress addr = hostProvider.next(0);
+
+            // validate trailing dot is removed
+            assertEquals("server1.example.com", addr.getHostString());
+        }
+    }
+
+    @Test
+    public void testRefreshIntervalZeroDisablesPeriodicRefresh() throws 
Exception {
+        // Set system property to disable refresh
+        System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS, 
"0");
+
+        final SRVRecord[] srvRecords = createMockSrvRecords();
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        try (final DnsSrvHostProvider hostProvider = new 
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+            // Verify initial setup works
+            assertEquals(3, hostProvider.size());
+
+            // Wait to ensure no background refresh occurs
+            Thread.sleep(1000);
+
+            // Verify DNS resolver was only called once during initialization 
(no periodic refresh)
+            verify(mockDnsSrvResolver, 
times(1)).lookupSrvRecords(TEST_DNS_NAME);
+
+            // Verify host provider still works normally
+            assertNotNull(hostProvider.next(0));
+
+            // Test multiple next() calls to ensure functionality is not 
affected
+            for (int i = 0; i < 5; i++) {
+                assertNotNull(hostProvider.next(0));
+            }
+
+            // Verify no additional DNS calls were made
+            verify(mockDnsSrvResolver, 
times(1)).lookupSrvRecords(TEST_DNS_NAME);
+        }
+    }
+
+    @Test
+    public void testRefreshIntervalNegative() throws Exception {
+        System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS, 
"-1");
+
+        final SRVRecord[] srvRecords = createMockSrvRecords();
+        
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+                () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, 
mockDnsSrvResolver, null));
+
+        assertEquals("Invalid DNS SRV refresh interval: -1", 
exception.getMessage());
+    }
+
+    private SRVRecord[] createMockSrvRecords() {
+        return new SRVRecord[]{
+            createMockSrvRecord("server1.example.com.", 2181),
+            createMockSrvRecord("server2.example.com.", 2181),
+            createMockSrvRecord("server3.example.com.", 2181)
+        };
+    }
+
+    private SRVRecord createMockSrvRecord(final String target, final int port) 
{
+        try {
+            final Name targetName = Name.fromString(target);
+            final Name serviceName = Name.fromString(TEST_DNS_NAME);
+            return new SRVRecord(serviceName, 1, 300, 1, 1, port, targetName);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to create mock SRV record", e);
+        }
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
index fc30e6992..d7d55a4b1 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
@@ -19,6 +19,11 @@
 package org.apache.zookeeper.test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.junit.jupiter.api.Test;
@@ -106,4 +111,89 @@ public void testParseIPV6ConnectionString() {
         assertEquals(2183, parser.getServerAddresses().get(2).getPort());
     }
 
+    @Test
+    public void testDnsSrvFormatNoChroot() {
+        testDnsSrvFormat("dns-srv://zookeeper.myapp.com", 
"zookeeper.myapp.com", null);
+    }
+
+    @Test
+    public void testDnsSrvFormatWithChroot() {
+        testDnsSrvFormat("dns-srv://zookeeper.myapp.com/myapp", 
"zookeeper.myapp.com", "/myapp");
+    }
+
+    @Test
+    public void testDnsSrvFormatWithNestedChroot() {
+        testDnsSrvFormat("dns-srv://zookeeper.shared.com/services/auth", 
"zookeeper.shared.com", "/services/auth");
+    }
+
+    @Test
+    public void testDnsSrvFormatWithRootChroot() {
+        testDnsSrvFormat("dns-srv://zookeeper.myapp.com/", 
"zookeeper.myapp.com", null);
+    }
+
+    @Test
+    public void testDnsSrvFormatWithSubdomain() {
+        testDnsSrvFormat("dns-srv://zk.prod.myapp.com/production", 
"zk.prod.myapp.com", "/production");
+    }
+
+    @Test
+    public void testDnsSrvFormatInvalidChroot() {
+        final String connectString = "dns-srv://zookeeper.myapp.com/invalid/";
+        assertThrows(IllegalArgumentException.class, () -> new 
ConnectStringParser(connectString));
+    }
+
+    @Test
+    public void testMixedFormatsComparison() {
+        final String hostPortFormat = "zk1:2181,zk2:2181/myapp";
+        final String dnsSrvFormat = "dns-srv://zookeeper.myapp.com/myapp";
+
+        final ConnectStringParser hostPortParser = new 
ConnectStringParser(hostPortFormat);
+        final ConnectStringParser dnsSrvParser = new 
ConnectStringParser(dnsSrvFormat);
+
+        // Both should have the same chroot path
+        assertEquals("/myapp", hostPortParser.getChrootPath());
+        assertEquals("/myapp", dnsSrvParser.getChrootPath());
+
+        // Host:Port format should have multiple server addresses
+        assertEquals(2, hostPortParser.getServerAddresses().size());
+        assertEquals("zk1", 
hostPortParser.getServerAddresses().get(0).getHostString());
+        assertEquals("zk2", 
hostPortParser.getServerAddresses().get(1).getHostString());
+
+        // DNS SRV format should have single DNS service name
+        assertEquals(1, dnsSrvParser.getServerAddresses().size());
+        assertEquals("zookeeper.myapp.com", 
dnsSrvParser.getServerAddresses().get(0).getHostString());
+    }
+
+    @Test
+    public void testBackwardCompatibility() {
+        final Map<String, Integer> testCases = new HashMap<>();
+
+        testCases.put("localhost:2181", 1);
+        testCases.put("zk1:2181,zk2:2181,zk3:2181", 3);
+        testCases.put("zk1:2181,zk2:2181/myapp", 2);
+        testCases.put("[::1]:2181", 1);
+        testCases.put("[2001:db8::1]:2181,[2001:db8::2]:2181/test", 2);
+
+        for (final Map.Entry<String, Integer> testCase : testCases.entrySet()) 
{
+            final String connectString = testCase.getKey();
+            final int expectedSize = testCase.getValue();
+
+            final ConnectStringParser parser = new 
ConnectStringParser(connectString);
+            assertNotNull(parser.getServerAddresses());
+            assertEquals(expectedSize, parser.getServerAddresses().size());
+        }
+    }
+
+    private void testDnsSrvFormat(final String connectString, final String 
expectedHostName, final String expectedChrootPath) {
+        final ConnectStringParser parser = new 
ConnectStringParser(connectString);
+
+        if (expectedChrootPath == null) {
+            assertNull(parser.getChrootPath());
+        } else {
+            assertEquals(expectedChrootPath, parser.getChrootPath());
+        }
+        assertEquals(1, parser.getServerAddresses().size());
+        assertEquals(expectedHostName, 
parser.getServerAddresses().get(0).getHostString());
+        assertEquals(2181, parser.getServerAddresses().get(0).getPort());
+    }
 }

Reply via email to