This is an automated email from the ASF dual-hosted git repository.
li4wang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6aaffd5ee ZOOKEEPER-4956: Provide a HostProvider that uses DNS SRV
record for dynamic server discovery (#2320)
6aaffd5ee is described below
commit 6aaffd5ee007e8b53772c02111207cdc0cc17235
Author: li4wang <[email protected]>
AuthorDate: Tue Jan 20 16:10:39 2026 -0800
ZOOKEEPER-4956: Provide a HostProvider that uses DNS SRV record for dynamic
server discovery (#2320)
Author: Li Wang <[email protected]>
Co-authored-by: liwang <[email protected]>
---
pom.xml | 6 +
.../resources/markdown/zookeeperProgrammers.md | 5 +
zookeeper-server/pom.xml | 4 +
.../main/java/org/apache/zookeeper/ZooKeeper.java | 8 +-
.../zookeeper/client/ConnectStringParser.java | 94 ++++--
.../zookeeper/client/DnsSrvHostProvider.java | 329 ++++++++++++++++++++
...ostProvider.java => HostConnectionManager.java} | 341 +++++++++++----------
.../org/apache/zookeeper/client/HostProvider.java | 10 +-
.../zookeeper/client/HostProviderFactory.java | 66 ++++
.../zookeeper/client/StaticHostProvider.java | 329 ++------------------
.../apache/zookeeper/client/ZKClientConfig.java | 8 +
.../zookeeper/HostProviderSelectionTest.java | 95 ++++++
.../zookeeper/client/DnsSrvHostProviderTest.java | 214 +++++++++++++
.../zookeeper/test/ConnectStringParserTest.java | 90 ++++++
14 files changed, 1109 insertions(+), 490 deletions(-)
diff --git a/pom.xml b/pom.xml
index 52f023482..552659bc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -573,6 +573,7 @@
<enforcer.version>3.0.0-M3</enforcer.version>
<commons-io.version>2.17.0</commons-io.version>
<burningwave.mockdns.version>0.25.4</burningwave.mockdns.version>
+ <dnsjava.version>3.5.1</dnsjava.version>
<clover-maven-plugin.version>4.4.1</clover-maven-plugin.version>
<sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
@@ -774,6 +775,11 @@
<artifactId>tools</artifactId>
<version>${burningwave.mockdns.version}</version>
</dependency>
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ <version>${dnsjava.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
index 59008215c..a9b7fd711 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
@@ -1396,6 +1396,11 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
you want to randomize that.
Default: false
+* *zookeeper.hostProvider.dnsSrvRefreshIntervalSeconds* :
+ **New in 3.10.0:**
+ Specifies the refresh interval in seconds for DNS SRV record lookups when
using DnsSrvHostProvider.
+ A value of 0 disables periodic refresh.
+ Default: 60 seconds
<a name="C+Binding"></a>
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index b2ecf1902..f4b76095d 100644
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -190,6 +190,10 @@
<artifactId>commons-io</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index 7533e01a9..32893d46a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -46,6 +46,7 @@
import org.apache.zookeeper.client.Chroot;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.HostProviderFactory;
import org.apache.zookeeper.client.StaticHostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.client.ZooKeeperBuilder;
@@ -1142,10 +1143,9 @@ public ZooKeeper(ZooKeeperOptions options) throws
IOException {
if (options.getHostProvider() != null) {
hostProvider =
options.getHostProvider().apply(connectStringParser.getServerAddresses());
} else {
- hostProvider = new
StaticHostProvider(connectStringParser.getServerAddresses(), clientConfig);
+ hostProvider = HostProviderFactory.create(connectStringParser,
clientConfig);
}
this.hostProvider = hostProvider;
-
chroot = Chroot.ofNullable(connectStringParser.getChrootPath());
cnxn = createConnection(
hostProvider,
@@ -1332,6 +1332,10 @@ public synchronized void close() throws
InterruptedException {
LOG.debug("Ignoring unexpected exception during close", e);
}
+ if (hostProvider != null) {
+ hostProvider.close();
+ }
+
LOG.info("Session: 0x{} closed", Long.toHexString(getSessionId()));
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
index 5d2928196..6c2c7ec6b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java
@@ -24,6 +24,7 @@
import java.util.List;
import org.apache.zookeeper.common.NetUtils;
import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
/**
* A parser for ZooKeeper Client connect strings.
@@ -38,38 +39,88 @@
public final class ConnectStringParser {
private static final int DEFAULT_PORT = 2181;
+ private static final String DNS_SRV_PREFIX = "dns-srv://";
- private final String chrootPath;
+ public enum ConnectionType {
+ DNS_SRV,
+ HOST_PORT
+ }
+ private String chrootPath;
private final ArrayList<InetSocketAddress> serverAddresses = new
ArrayList<>();
+ private final ConnectionType connectionType;
+ private final String connectString;
/**
- * Parse host and port by splitting client connectString
- * with support for IPv6 literals
- * @throws IllegalArgumentException
- * for an invalid chroot path.
+ * Constructs a ConnectStringParser with given connect string.
+ *
+ * <p>Supports two connection string formats:</p>
+ * <ul>
+ * <li><strong>Host:Port format:</strong>
"host1:2181,host2:2181/chroot"</li>
+ * <li><strong>DNS SRV format:</strong>
"dns-srv://service.domain.com/chroot"</li>
+ * </ul>
+ *
+ * @param connectString the connect string to parse
+ * @throws IllegalArgumentException if connectString is null/empty or
contains invalid chroot path
*/
- public ConnectStringParser(String connectString) {
+ public ConnectStringParser(final String connectString) {
+ if (StringUtils.isBlank(connectString)) {
+ throw new IllegalArgumentException("Connect string cannot be null
or empty");
+ }
+
+ if (connectString.startsWith(DNS_SRV_PREFIX)) {
+ connectionType = ConnectionType.DNS_SRV;
+ parseDnsSrvFormat(connectString);
+ } else {
+ connectionType = ConnectionType.HOST_PORT;
+ parseHostPortFormat(connectString);
+ }
+ this.connectString = connectString;
+ }
+
+ public String getChrootPath() {
+ return chrootPath;
+ }
+
+ public ArrayList<InetSocketAddress> getServerAddresses() {
+ return serverAddresses;
+ }
+
+ public ConnectionType getConnectionType() {
+ return connectionType;
+ }
+
+ public String getConnectString() {
+ return connectString;
+ }
+
+ private String[] parseConnectString(final String connectString) {
+ String serverPart = connectString;
+ String chrootPath = null;
+
// parse out chroot, if any
- int off = connectString.indexOf('/');
+ final int off = connectString.indexOf('/');
if (off >= 0) {
- String chrootPath = connectString.substring(off);
+ chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
- this.chrootPath = null;
+ chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
- this.chrootPath = chrootPath;
}
- connectString = connectString.substring(0, off);
- } else {
- this.chrootPath = null;
+ serverPart = connectString.substring(0, off);
}
+ return new String[]{serverPart, chrootPath};
+ }
+
+ private void parseHostPortFormat(final String connectString) {
+ final String[] parts = parseConnectString(connectString);
+ chrootPath = parts[1];
- List<String> hostsList = split(connectString, ",");
+ final List<String> hostsList = split(parts[0], ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
- String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
+ final String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
if (hostAndPort.length != 0) {
host = hostAndPort[0];
if (hostAndPort.length == 2) {
@@ -89,12 +140,11 @@ public ConnectStringParser(String connectString) {
}
}
- public String getChrootPath() {
- return chrootPath;
+ private void parseDnsSrvFormat(final String connectString) {
+ final String[] parts =
parseConnectString(connectString.substring(DNS_SRV_PREFIX.length()));
+ chrootPath = parts[1];
+ // The DNS service name is stored as a placeholder address
+ // The actual resolution will be handled by DnsSrvHostProvider
+ serverAddresses.add(InetSocketAddress.createUnresolved(parts[0],
DEFAULT_PORT));
}
-
- public ArrayList<InetSocketAddress> getServerAddresses() {
- return serverAddresses;
- }
-
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
new file mode 100644
index 000000000..55cdfcd8e
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.common.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xbill.DNS.Lookup;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.SRVRecord;
+import org.xbill.DNS.Type;
+
+/**
+ * DNS SRV-based HostProvider that dynamically resolves host port names from
DNS SRV records.
+ *
+ * <p>This implementation periodically refreshes the server list by querying
DNS SRV records
+ * and uses HostConnectionManager for all connection management and
reconfiguration logic.</p>
+ *
+ * <p><strong>Two-Phase Update Strategy:</strong></p>
+ * <ul>
+ * <li><strong>Phase 1 (Background):</strong> Timer thread detects DNS changes
without caching results</li>
+ * <li><strong>Phase 2 (Connection-time):</strong> Fresh DNS lookup during
connect attempt if changes detected</li>
+ * </ul>
+ */
[email protected]
+public final class DnsSrvHostProvider implements HostProvider {
+ private static final Logger LOG =
LoggerFactory.getLogger(DnsSrvHostProvider.class);
+
+ public interface DnsSrvResolver {
+ SRVRecord[] lookupSrvRecords(String dnsSrvName) throws IOException;
+ }
+
+ private final String dnsSrvName;
+ private final DnsSrvResolver dnsResolver;
+
+ private final HostConnectionManager connectionManager;
+ // Track the previous server list to detect changes
+ private volatile Set<InetSocketAddress> previousServerSet;
+ private Timer dnsRefreshTimer;
+
+ // Track the current connected host for accurate load balancing decisions
+ private final AtomicReference<InetSocketAddress> currentConnectedHost =
new AtomicReference<>();
+ private final AtomicBoolean serverListChanged = new AtomicBoolean(false);
+
+ /**
+ * Constructs a DnsSrvHostProvider with the given DNS name
+ *
+ * @param dnsSrvName the DNS name to query for SRV records
+ * @throws IllegalArgumentException if dnsSrvName is null or empty or
invalid
+ * or if no SRV records are found for the
DNS name
+ * or if DNS lookup fails
+ */
+ public DnsSrvHostProvider(final String dnsSrvName) {
+ this(dnsSrvName, null);
+ }
+
+
+ /**
+ * Constructs a DnsSrvHostProvider with the given DNS name and
ZKClientConfig
+ *
+ * @param dnsSrvName the DNS name to query for SRV records
+ * @param clientConfig ZooKeeper client configuration
+ * @throws IllegalArgumentException if dnsSrvName is null or empty or
invalid
+ * or if no SRV records are found for the
DNS name
+ * or if DNS lookup fails
+ */
+ public DnsSrvHostProvider(final String dnsSrvName, final ZKClientConfig
clientConfig) {
+ this(dnsSrvName, System.currentTimeMillis() ^ dnsSrvName.hashCode(),
clientConfig);
+ }
+
+ /**
+ * Constructs a DnsSrvHostProvider with the given DNS name, randomness
seed and ZKClientConfig
+ *
+ * @param dnsSrvName the DNS name to query for SRV records
+ * @param randomnessSeed seed for randomization
+ * @param clientConfig ZooKeeper client configuration
+ * @throws IllegalArgumentException if dnsSrvName is null or empty or
invalid
+ * or if no SRV records are found for the
DNS name
+ * or if DNS lookup fails
+ */
+ public DnsSrvHostProvider(final String dnsSrvName, final long
randomnessSeed, final ZKClientConfig clientConfig) {
+ this(dnsSrvName, randomnessSeed, new DefaultDnsResolver(),
clientConfig);
+ }
+
+ /**
+ * Constructs a DnsSrvHostProvider with the given DNS name, randomization
seed, DNS resolver and ZKClientConfig
+ *
+ * @param dnsSrvName the DNS name to query for SRV records
+ * @param randomnessSeed seed for randomization
+ * @param dnsResolver custom DNS resolver
+ * @param clientConfig ZooKeeper client configuration
+ * @throws IllegalArgumentException if dnsSrvName is null or empty or
invalid
+ * or if no SRV records are found for the
DNS name
+ * or if DNS lookup fails
+ */
+ public DnsSrvHostProvider(final String dnsSrvName, final long
randomnessSeed, final DnsSrvResolver dnsResolver, final ZKClientConfig
clientConfig) {
+ if (StringUtils.isBlank(dnsSrvName)) {
+ throw new IllegalArgumentException("DNS name cannot be null or
empty");
+ }
+
+ this.dnsSrvName = dnsSrvName;
+ this.dnsResolver = dnsResolver;
+ try {
+ final List<InetSocketAddress> serverAddresses =
lookupDnsSrvRecords();
+ if (serverAddresses.isEmpty()) {
+ LOG.error("No SRV records found for DNS name: {}", dnsSrvName);
+ throw new IllegalArgumentException("No SRV records found for
DNS name: " + dnsSrvName);
+ }
+
+ this.connectionManager = new
HostConnectionManager(serverAddresses, randomnessSeed, clientConfig);
+ this.previousServerSet = new HashSet<>(serverAddresses);
+
+
+ final long refreshIntervalInSeconds = getRefreshInterval();
+ if (refreshIntervalInSeconds > 0) {
+ dnsRefreshTimer = new Timer("DnsSrvRefresh-" + dnsSrvName,
true);
+ dnsRefreshTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+
refreshServerListInBackground();
+ }
+ },
+ refreshIntervalInSeconds * 1000,
+ refreshIntervalInSeconds * 1000);
+ }
+ LOG.info("DnsSrvHostProvider initialized with {} servers from DNS
name: {} with refresh interval: {} seconds",
+ serverAddresses.size(), dnsSrvName,
refreshIntervalInSeconds);
+ } catch (final Exception e) {
+ LOG.error("Failed to initialize DnsSrvHostProvider for DNS name:
{}", dnsSrvName, e);
+
+ if (dnsRefreshTimer != null) {
+ dnsRefreshTimer.cancel();
+ }
+
+ if (e instanceof IllegalArgumentException) {
+ throw e;
+ } else {
+ throw new IllegalArgumentException("Failed to initialize
DnsSrvHostProvider for DNS name: " + dnsSrvName, e);
+ }
+ }
+ }
+
+ @Override
+ public int size() {
+ return connectionManager.size();
+ }
+
+ @Override
+ public InetSocketAddress next(long spinDelay) {
+ applyServerListUpdate();
+ return connectionManager.next(spinDelay);
+ }
+
+ @Override
+ public void onConnected() {
+ currentConnectedHost.set(connectionManager.getServerAtCurrentIndex());
+ connectionManager.onConnected();
+ }
+
+ @Override
+ public boolean updateServerList(Collection<InetSocketAddress>
serverAddresses, InetSocketAddress currentHost) {
+ return connectionManager.updateServerList(serverAddresses,
currentHost);
+ }
+
+ @Override
+ public void close() {
+ if (dnsRefreshTimer != null) {
+ dnsRefreshTimer.cancel();
+ }
+ }
+
+ private long getRefreshInterval() {
+ final long defaultInterval =
ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS_DEFAULT;
+ final long interval =
Long.getLong(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS, defaultInterval);
+ if (interval < 0) {
+ LOG.error("Invalid DNS SRV refresh interval {} seconds", interval);
+ throw new IllegalArgumentException("Invalid DNS SRV refresh
interval: " + interval);
+ }
+ if (interval == 0) {
+ LOG.info("DNS SRV refresh disabled (interval = 0) for {}",
dnsSrvName);
+ }
+ return interval;
+ }
+
+ private List<InetSocketAddress> lookupDnsSrvRecords() {
+ final List<InetSocketAddress> addresses = new ArrayList<>();
+
+ try {
+ final SRVRecord[] srvRecords =
dnsResolver.lookupSrvRecords(dnsSrvName);
+ for (final SRVRecord srvRecord : srvRecords) {
+ final InetSocketAddress address =
createAddressFromSrvRecord(srvRecord);
+ if (address != null) {
+ addresses.add(address);
+ }
+ }
+ } catch (final Exception e) {
+ LOG.error("DNS SRV lookup failed for {}", dnsSrvName, e);
+ throw new RuntimeException("DNS SRV lookup failed for " +
dnsSrvName, e);
+ }
+ return addresses;
+ }
+
+ private InetSocketAddress createAddressFromSrvRecord(final SRVRecord
srvRecord) {
+ if (srvRecord == null) {
+ LOG.error("Null SRV record encountered from DnsSrvResolver
implementation");
+ return null;
+ }
+
+ try {
+ final String target = srvRecord.getTarget().toString(true);
+ final int port = srvRecord.getPort();
+
+ if (port <= 0 || port > 65535) {
+ LOG.error("Invalid port {} in SRV record for target {}", port,
target);
+ return null;
+ }
+
+ if (StringUtils.isBlank(target)) {
+ LOG.error("Empty or blank target in SRV record {}", srvRecord);
+ return null;
+ }
+
+ return new InetSocketAddress(target, port);
+ } catch (final Exception e) {
+ LOG.error("Failed to create InetSocketAddress from SRV record {}",
srvRecord, e);
+ return null;
+ }
+ }
+
+ /**
+ * Performs background DNS refresh to detect server list changes without
caching DNS responses.
+ *
+ * <p><strong>Important:</strong> This method only detects changes, the
actual fresh DNS query happens
+ * in {@link #applyServerListUpdate()} when needed.</p>
+ */
+ private void refreshServerListInBackground() {
+ try {
+ // Refresh the server list
+ final List<InetSocketAddress> newAddresses = lookupDnsSrvRecords();
+ if (newAddresses.isEmpty()) {
+ LOG.warn("DNS SRV lookup returned no records for {}, will
retry on next refresh", dnsSrvName);
+ return;
+ }
+
+ // Check if server list has changed
+ final Set<InetSocketAddress> newServerSet = new
HashSet<>(newAddresses);
+ if (!Objects.equals(previousServerSet, newServerSet)) {
+ serverListChanged.set(true);
+ LOG.info("Server list change detected from DNS SRV: {}
servers", newAddresses.size());
+ }
+ } catch (final Exception e) {
+ LOG.warn("Failed to refresh server list from DNS SRV records for
{}: {}", dnsSrvName, e.getMessage());
+ }
+ }
+
+ /**
+ * Apply pending server list updates when connection is actually needed.
+ * It is called from next() to ensure server list changes are applied with
proper connection context.
+ */
+ private synchronized void applyServerListUpdate() {
+ if (serverListChanged.get()) {
+ try {
+ // Query DNS to get the updated server list
+ final List<InetSocketAddress> latestServerList =
lookupDnsSrvRecords();
+ if (latestServerList.isEmpty()) {
+ LOG.warn("DNS SRV lookup returned no records for {}, will
use the existing ones", dnsSrvName);
+ return;
+ }
+
+ final boolean needReconnect =
connectionManager.updateServerList(latestServerList,
currentConnectedHost.get());
+ previousServerSet = new HashSet<>(latestServerList);
+ serverListChanged.set(false);
+
+ LOG.info("Applied server list update during connection
attempt. servers size: {}, need reconnection: {}",
+ latestServerList.size(), needReconnect);
+ } catch (final Exception e) {
+ LOG.warn("Failed to apply server list update", e);
+ }
+ }
+ }
+
+ private static class DefaultDnsResolver implements DnsSrvResolver {
+ @Override
+ public SRVRecord[] lookupSrvRecords(final String dnsSrvName) throws
IOException {
+ final Lookup lookup = new Lookup(dnsSrvName, Type.SRV);
+ final Record[] records = lookup.run();
+ if (lookup.getResult() != Lookup.SUCCESSFUL) {
+ final String errorMsg = lookup.getErrorString();
+ throw new IOException("DNS SRV lookup failed for " +
dnsSrvName + ": " + errorMsg);
+ }
+
+ if (records == null) {
+ return new SRVRecord[0];
+ }
+ return
Arrays.stream(records).map(SRVRecord.class::cast).toArray(SRVRecord[]::new);
+ }
+ }
+}
+
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
similarity index 71%
copy from
zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
copy to
zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
index e07754c35..b278014a3 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java
@@ -32,159 +32,99 @@
import org.slf4j.LoggerFactory;
/**
- * Most simple HostProvider, resolves on every next() call.
+ * Manages ZooKeeper server connections with round-robin selection and load
balancing during
+ * cluster reconfiguration.
*
- * Please be aware that although this class doesn't do any DNS caching,
there're multiple levels of caching already
- * present across the stack like in JVM, OS level, hardware, etc. The best we
could do here is to get the most recent
- * address from the underlying system which is considered up-to-date.
+ * <p><strong>Key Features:</strong></p>
+ * <ul>
+ * <li>Round-robin server selection with IP address resolution</li>
+ * <li>Migrates clients to balance load when servers are added or removed</li>
+ * <li>Probabilistic client migration during server list updates</li>
+ * </ul>
*
+ * <p><strong>Reconfiguration Behavior:</strong></p>
+ * When server list changes, enters "reconfigMode" and calculates migration
probabilities
+ * to balance load between old and new servers.
+ *
+ * <p>See <a
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355">ZOOKEEPER-1355</a>
+ * for more details.</p>
*/
[email protected]
-public final class StaticHostProvider implements HostProvider {
[email protected]
+public final class HostConnectionManager {
+ /**
+ * Interface for address resolution to support testing and different
resolution strategies.
+ */
public interface Resolver {
-
InetAddress[] getAllByName(String name) throws UnknownHostException;
-
}
- private static final Logger LOG =
LoggerFactory.getLogger(StaticHostProvider.class);
-
- private ZKClientConfig clientConfig = null;
-
- private List<InetSocketAddress> serverAddresses = new ArrayList<>(5);
+ private static final Logger LOG =
LoggerFactory.getLogger(HostConnectionManager.class);
- private Random sourceOfRandomness;
- private int lastIndex = -1;
+ private List<InetSocketAddress> serverAddresses;
+ private final Random sourceOfRandomness;
+ private final Resolver resolver;
+ private final ZKClientConfig clientConfig;
- private int currentIndex = -1;
+ private int lastIndex;
+ private int currentIndex;
/**
* The following fields are used to migrate clients during reconfiguration
*/
private boolean reconfigMode = false;
-
private final List<InetSocketAddress> oldServers = new ArrayList<>(5);
-
private final List<InetSocketAddress> newServers = new ArrayList<>(5);
-
private int currentIndexOld = -1;
private int currentIndexNew = -1;
-
private float pOld, pNew;
- private Resolver resolver;
-
/**
- * Constructs a SimpleHostSet.
+ * Constructs a HostConnectionManager with default resolver.
*
* @param serverAddresses
- * possibly unresolved ZooKeeper server addresses
- * @throws IllegalArgumentException
- * if serverAddresses is empty or resolves to an empty list
- */
- public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- });
- }
-
- /**
- * Constructs a SimpleHostSet with ZKClientConfig.
- *
- * Introduced this new overload in 3.10.0 in order to take advantage of
some newly introduced feature flags. Like
- * the shuffle (old) / not to shuffle (new) behavior of DNS resolution.
- *
- * @param serverAddresses
- * possibly unresolved ZooKeeper server addresses
+ * possibly unresolved ZooKeeper server addresses
+ * @param randomnessSeed
+ * a seed used to initialize sourceOfRandomness
* @param clientConfig
- * ZooKeeper client configuration
+ * ZooKeeper client configuration
+ * @throws IllegalArgumentException
+ * if serverAddresses is empty
*/
- public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
ZKClientConfig clientConfig) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- }, clientConfig);
+ public HostConnectionManager(Collection<InetSocketAddress>
serverAddresses, long randomnessSeed, ZKClientConfig clientConfig) {
+ this(serverAddresses, randomnessSeed, null, clientConfig);
}
/**
- * Constructs a SimpleHostSet.
- *
- * Introduced for testing purposes. getAllByName() is a static method of
InetAddress, therefore cannot be easily mocked.
- * By abstraction of Resolver interface we can easily inject a mocked
implementation in tests.
+ * Constructs a HostConnectionManager with custom resolver.
*
* @param serverAddresses
* possibly unresolved ZooKeeper server addresses
+ * @param randomnessSeed
+ * a seed used to initialize sourceOfRandomness
* @param resolver
* custom resolver implementation
- */
- public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
Resolver resolver) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
resolver);
- }
-
- /**
- * Constructs a SimpleHostSet. This constructor is used from
StaticHostProviderTest to produce deterministic test results
- * by initializing sourceOfRandomness with the same seed
- *
- * @param serverAddresses
- * possibly unresolved ZooKeeper server addresses
- * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+ * @param clientConfig
+ * ZooKeeper client configuration
* @throws IllegalArgumentException
- * if serverAddresses is empty or resolves to an empty list
+ * if serverAddresses is empty
*/
- public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
long randomnessSeed) {
- init(serverAddresses, randomnessSeed, new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- });
- }
-
- private void init(Collection<InetSocketAddress> serverAddresses, long
randomnessSeed, Resolver resolver) {
- init(serverAddresses, randomnessSeed, resolver, null);
- }
-
- private void init(Collection<InetSocketAddress> serverAddresses, long
randomnessSeed, Resolver resolver,
- ZKClientConfig clientConfig) {
- this.clientConfig = clientConfig == null ? new ZKClientConfig() :
clientConfig;
- this.sourceOfRandomness = new Random(randomnessSeed);
- this.resolver = resolver;
+ public HostConnectionManager(Collection<InetSocketAddress> serverAddresses,
+ long randomnessSeed,
+ Resolver resolver,
+ ZKClientConfig clientConfig) {
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException("A HostProvider may not be
empty!");
}
+ this.sourceOfRandomness = new Random(randomnessSeed);
this.serverAddresses = shuffle(serverAddresses);
+ this.resolver = resolver == null ? InetAddress::getAllByName :
resolver;
+ this.clientConfig = clientConfig == null ? new ZKClientConfig() :
clientConfig;
+
currentIndex = -1;
lastIndex = -1;
- }
- private InetSocketAddress resolve(InetSocketAddress address) {
- try {
- String curHostString = address.getHostString();
- List<InetAddress> resolvedAddresses = new
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
- if (resolvedAddresses.isEmpty()) {
- return address;
- }
- if (clientConfig.isShuffleDnsResponseEnabled()) {
- Collections.shuffle(resolvedAddresses);
- }
- return new InetSocketAddress(resolvedAddresses.get(0),
address.getPort());
- } catch (UnknownHostException e) {
- LOG.error("Unable to resolve address: {}", address.toString(), e);
- return address;
- }
- }
-
- private List<InetSocketAddress> shuffle(Collection<InetSocketAddress>
serverAddresses) {
- List<InetSocketAddress> tmpList = new
ArrayList<>(serverAddresses.size());
- tmpList.addAll(serverAddresses);
- Collections.shuffle(tmpList, sourceOfRandomness);
- return tmpList;
+ LOG.info("HostConnectionManager initialized with {} servers",
serverAddresses.size());
}
/**
@@ -197,29 +137,30 @@ private List<InetSocketAddress>
shuffle(Collection<InetSocketAddress> serverAddr
* this client migrates or not (i.e., whether true or false is
returned) is probabilistic so that the expected
* number of clients connected to each server is the same.
*
- * If true is returned, the function sets pOld and pNew that correspond to
the probability to migrate to ones of the
+ * If true is returned, the function sets pOld and pNew that correspond to
the probability to migrate to one of the
* new servers in serverAddresses or one of the old servers (migrating to
one of the old servers is done only
* if our client's currentHost is not in serverAddresses). See
nextHostInReconfigMode for the selection logic.
*
* See <a
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355">ZOOKEEPER-1355</a>
- * for the protocol and its evaluation, and StaticHostProviderTest for the
tests that illustrate how load balancing
- * works with this policy.
+ * for the protocol and its evaluation.
*
- * @param serverAddresses new host list
- * @param currentHost the host to which this client is currently connected
+ * @param serverAddresses
+ * new host list
+ * @param currentHost
+ * the host to which this client is currently connected
* @return true if changing connections is necessary for load-balancing,
false otherwise
*/
- @Override
- public synchronized boolean updateServerList(
- Collection<InetSocketAddress> serverAddresses,
- InetSocketAddress currentHost) {
+ synchronized boolean updateServerList(Collection<InetSocketAddress>
serverAddresses, InetSocketAddress currentHost) {
List<InetSocketAddress> shuffledList = shuffle(serverAddresses);
if (shuffledList.isEmpty()) {
- throw new IllegalArgumentException("A HostProvider may not be
empty!");
+ throw new IllegalArgumentException("The server list may not be
empty!");
}
+
+ int oldSize = this.serverAddresses.size();
+ int newSize = shuffledList.size();
+
// Check if client's current server is in the new list of servers
boolean myServerInNewConfig = false;
-
InetSocketAddress myServer = currentHost;
// choose "current" server according to the client rebalancing
algorithm
@@ -308,24 +249,133 @@ public synchronized boolean updateServerList(
currentIndexOld = -1;
currentIndexNew = -1;
lastIndex = currentIndex;
+
+ LOG.info("Server list updated: oldSize={}, newSize={},
reconfigMode={}", oldSize, newSize, reconfigMode);
+
return reconfigMode;
}
- public synchronized InetSocketAddress getServerAtIndex(int i) {
+ /**
+ * Get the next server to connect to.
+ *
+ * @param spinDelay milliseconds to wait if all hosts have been tried once
+ * @return the next server address to connect to
+ */
+ InetSocketAddress next(long spinDelay) {
+ boolean needToSleep = false;
+ InetSocketAddress addr;
+
+ synchronized (this) {
+ if (reconfigMode) {
+ addr = nextHostInReconfigMode();
+ if (addr != null) {
+ currentIndex = serverAddresses.indexOf(addr);
+ return resolve(addr);
+ }
+ //tried all servers and couldn't connect
+ reconfigMode = false;
+ needToSleep = (spinDelay > 0);
+ }
+ ++currentIndex;
+ if (currentIndex == serverAddresses.size()) {
+ currentIndex = 0;
+ }
+ addr = serverAddresses.get(currentIndex);
+ needToSleep = needToSleep || (currentIndex == lastIndex &&
spinDelay > 0);
+ if (lastIndex == -1) {
+ // We don't want to sleep on the first ever connect attempt.
+ lastIndex = 0;
+ }
+ }
+
+ if (needToSleep) {
+ try {
+ Thread.sleep(spinDelay);
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected exception", e);
+ }
+ }
+
+ return resolve(addr);
+ }
+
+ /**
+ * Notify that a connection has been established successfully.
+ */
+ synchronized void onConnected() {
+ lastIndex = currentIndex;
+ reconfigMode = false;
+ }
+
+ /**
+ * Get the server at the specified index.
+ *
+ * @param i the index
+ * @return the server address at the index, or null if index is out of
bounds
+ */
+ synchronized InetSocketAddress getServerAtIndex(int i) {
if (i < 0 || i >= serverAddresses.size()) {
return null;
}
return serverAddresses.get(i);
}
- public synchronized InetSocketAddress getServerAtCurrentIndex() {
+ /**
+ * Get the server at the current index.
+ *
+ * @return the server address at the current index
+ */
+ synchronized InetSocketAddress getServerAtCurrentIndex() {
return getServerAtIndex(currentIndex);
}
- public synchronized int size() {
+ /**
+ * Get the number of servers in the list.
+ *
+ * @return the number of servers
+ */
+ synchronized int size() {
return serverAddresses.size();
}
+ /**
+ * Resolves an InetSocketAddress to a concrete address.
+ *
+ * @param address
+ * the address to resolve
+ * @return resolved address or original address if resolution fails
+ */
+ private InetSocketAddress resolve(InetSocketAddress address) {
+ try {
+ String curHostString = address.getHostString();
+ List<InetAddress> resolvedAddresses = new
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
+ if (resolvedAddresses.isEmpty()) {
+ return address;
+ }
+ if (clientConfig.isShuffleDnsResponseEnabled()) {
+ Collections.shuffle(resolvedAddresses);
+ }
+ return new InetSocketAddress(resolvedAddresses.get(0),
address.getPort());
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve address: {}", address, e);
+ return address;
+ }
+ }
+
+ /**
+ * Shuffles the server addresses using the internal random source.
+ *
+ * @param serverAddresses
+ * collection of server addresses to shuffle
+ * @return shuffled list of server addresses
+ */
+ private List<InetSocketAddress> shuffle(Collection<InetSocketAddress>
serverAddresses) {
+ List<InetSocketAddress> tmpList = new
ArrayList<>(serverAddresses.size());
+ tmpList.addAll(serverAddresses);
+ Collections.shuffle(tmpList, sourceOfRandomness);
+ return tmpList;
+ }
+
/**
* Get the next server to connect to, when in "reconfigMode", which means
that
* you've just updated the server list, and now trying to find some server
to connect to.
@@ -360,47 +410,4 @@ private InetSocketAddress nextHostInReconfigMode() {
return null;
}
-
- public InetSocketAddress next(long spinDelay) {
- boolean needToSleep = false;
- InetSocketAddress addr;
-
- synchronized (this) {
- if (reconfigMode) {
- addr = nextHostInReconfigMode();
- if (addr != null) {
- currentIndex = serverAddresses.indexOf(addr);
- return resolve(addr);
- }
- //tried all servers and couldn't connect
- reconfigMode = false;
- needToSleep = (spinDelay > 0);
- }
- ++currentIndex;
- if (currentIndex == serverAddresses.size()) {
- currentIndex = 0;
- }
- addr = serverAddresses.get(currentIndex);
- needToSleep = needToSleep || (currentIndex == lastIndex &&
spinDelay > 0);
- if (lastIndex == -1) {
- // We don't want to sleep on the first ever connect attempt.
- lastIndex = 0;
- }
- }
- if (needToSleep) {
- try {
- Thread.sleep(spinDelay);
- } catch (InterruptedException e) {
- LOG.warn("Unexpected exception", e);
- }
- }
-
- return resolve(addr);
- }
-
- public synchronized void onConnected() {
- lastIndex = currentIndex;
- reconfigMode = false;
- }
-
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
index 73a102f1a..f154f080b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java
@@ -43,7 +43,7 @@
* * A HostProvider that prefers nearby hosts.
*/
@InterfaceAudience.Public
-public interface HostProvider {
+public interface HostProvider extends AutoCloseable {
int size();
@@ -72,4 +72,12 @@ public interface HostProvider {
*/
boolean updateServerList(Collection<InetSocketAddress> serverAddresses,
InetSocketAddress currentHost);
+ /**
+ * Close the HostProvider and release any resources.
+ *
+ * Default implementation does nothing for backward compatibility
+ */
+ @Override
+ default void close() {
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
new file mode 100644
index 000000000..4f3f3a374
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory for creating appropriate HostProvider instances based on connect
string format.
+ * This factory enables zero-code-change migration by automatically detecting
the connect
+ * string format and creating the appropriate HostProvider implementation.
+ *
+ * Supported formats:
+ * - Host:Port: "host1:port1,host2:port2,host3:port3" (StaticHostProvider)
+ * - DNS SRV: "dns-srv://service.domain.com" (DnsSrvHostProvider)
+ *
+ * - Future formats can be easily added by extending the factory
+ */
[email protected]
+public class HostProviderFactory {
+ /**
+ * Creates a HostProvider based on the connect string format.
+ * This is a convenience method for zero-code-change migration.
+ *
+ * @param connectString the connect string
+ * @return appropriate HostProvider
+ */
+ public static HostProvider create(final String connectString) {
+ final ConnectStringParser connectStringParser = new
ConnectStringParser(connectString);
+ return create(connectStringParser, null);
+ }
+
+ /**
+ * Creates a HostProvider based on the connect string format.
+ *
+ * @param connectStringParser the connect string parser
+ * @param clientConfig ZooKeeper client configuration
+ *
+ * @return appropriate HostProvider
+ */
+ @InterfaceAudience.Private
+ public static HostProvider create(final ConnectStringParser
connectStringParser, final ZKClientConfig clientConfig) {
+ switch (connectStringParser.getConnectionType()) {
+ case DNS_SRV:
+ final String dnsSrvName =
connectStringParser.getServerAddresses().get(0).getHostString();
+ return new DnsSrvHostProvider(dnsSrvName, clientConfig);
+ default:
+ return new
StaticHostProvider(connectStringParser.getServerAddresses(), clientConfig);
+ }
+ }
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
index e07754c35..6cc45a1a0 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java
@@ -21,15 +21,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Most simple HostProvider, resolves on every next() call.
@@ -38,6 +31,7 @@
* present across the stack like in JVM, OS level, hardware, etc. The best we
could do here is to get the most recent
* address from the underlying system which is considered up-to-date.
*
+ * It uses HostConnectionManager for all connection management and
reconfiguration logic.
*/
@InterfaceAudience.Public
public final class StaticHostProvider implements HostProvider {
@@ -48,32 +42,7 @@ public interface Resolver {
}
- private static final Logger LOG =
LoggerFactory.getLogger(StaticHostProvider.class);
-
- private ZKClientConfig clientConfig = null;
-
- private List<InetSocketAddress> serverAddresses = new ArrayList<>(5);
-
- private Random sourceOfRandomness;
- private int lastIndex = -1;
-
- private int currentIndex = -1;
-
- /**
- * The following fields are used to migrate clients during reconfiguration
- */
- private boolean reconfigMode = false;
-
- private final List<InetSocketAddress> oldServers = new ArrayList<>(5);
-
- private final List<InetSocketAddress> newServers = new ArrayList<>(5);
-
- private int currentIndexOld = -1;
- private int currentIndexNew = -1;
-
- private float pOld, pNew;
-
- private Resolver resolver;
+ private final HostConnectionManager connectionManager;
/**
* Constructs a SimpleHostSet.
@@ -84,12 +53,7 @@ public interface Resolver {
* if serverAddresses is empty or resolves to an empty list
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- });
+ connectionManager = new HostConnectionManager(serverAddresses,
System.currentTimeMillis() ^ this.hashCode(), null);
}
/**
@@ -104,12 +68,10 @@ public InetAddress[] getAllByName(String name) throws
UnknownHostException {
* ZooKeeper client configuration
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
ZKClientConfig clientConfig) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- }, clientConfig);
+ connectionManager = new HostConnectionManager(serverAddresses,
+
System.currentTimeMillis() ^ this.hashCode(),
+ null,
+ clientConfig);
}
/**
@@ -124,7 +86,10 @@ public InetAddress[] getAllByName(String name) throws
UnknownHostException {
* custom resolver implementation
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
Resolver resolver) {
- init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(),
resolver);
+ this.connectionManager = new HostConnectionManager(serverAddresses,
+ System.currentTimeMillis()
^ this.hashCode(),
+ resolver::getAllByName,
+ null);
}
/**
@@ -133,274 +98,42 @@ public StaticHostProvider(Collection<InetSocketAddress>
serverAddresses, Resolve
*
* @param serverAddresses
* possibly unresolved ZooKeeper server addresses
- * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+ * @param randomnessSeed a seed used to initialize sourceOfRandomness
* @throws IllegalArgumentException
* if serverAddresses is empty or resolves to an empty list
*/
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses,
long randomnessSeed) {
- init(serverAddresses, randomnessSeed, new Resolver() {
- @Override
- public InetAddress[] getAllByName(String name) throws
UnknownHostException {
- return InetAddress.getAllByName(name);
- }
- });
- }
-
- private void init(Collection<InetSocketAddress> serverAddresses, long
randomnessSeed, Resolver resolver) {
- init(serverAddresses, randomnessSeed, resolver, null);
- }
-
- private void init(Collection<InetSocketAddress> serverAddresses, long
randomnessSeed, Resolver resolver,
- ZKClientConfig clientConfig) {
- this.clientConfig = clientConfig == null ? new ZKClientConfig() :
clientConfig;
- this.sourceOfRandomness = new Random(randomnessSeed);
- this.resolver = resolver;
- if (serverAddresses.isEmpty()) {
- throw new IllegalArgumentException("A HostProvider may not be
empty!");
- }
- this.serverAddresses = shuffle(serverAddresses);
- currentIndex = -1;
- lastIndex = -1;
- }
-
- private InetSocketAddress resolve(InetSocketAddress address) {
- try {
- String curHostString = address.getHostString();
- List<InetAddress> resolvedAddresses = new
ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString)));
- if (resolvedAddresses.isEmpty()) {
- return address;
- }
- if (clientConfig.isShuffleDnsResponseEnabled()) {
- Collections.shuffle(resolvedAddresses);
- }
- return new InetSocketAddress(resolvedAddresses.get(0),
address.getPort());
- } catch (UnknownHostException e) {
- LOG.error("Unable to resolve address: {}", address.toString(), e);
- return address;
- }
- }
-
- private List<InetSocketAddress> shuffle(Collection<InetSocketAddress>
serverAddresses) {
- List<InetSocketAddress> tmpList = new
ArrayList<>(serverAddresses.size());
- tmpList.addAll(serverAddresses);
- Collections.shuffle(tmpList, sourceOfRandomness);
- return tmpList;
+ this.connectionManager = new HostConnectionManager(serverAddresses,
+ randomnessSeed,
+ null,
+ null);
}
- /**
- * Update the list of servers. This returns true if changing connections
is necessary for load-balancing, false
- * otherwise. Changing connections is necessary if one of the following
holds:
- * a) the host to which this client is currently connected is not in
serverAddresses.
- * Otherwise (if currentHost is in the new list serverAddresses):
- * b) the number of servers in the cluster is increasing - in this case
the load on currentHost should decrease,
- * which means that SOME of the clients connected to it will migrate to
the new servers. The decision whether
- * this client migrates or not (i.e., whether true or false is
returned) is probabilistic so that the expected
- * number of clients connected to each server is the same.
- *
- * If true is returned, the function sets pOld and pNew that correspond to
the probability to migrate to ones of the
- * new servers in serverAddresses or one of the old servers (migrating to
one of the old servers is done only
- * if our client's currentHost is not in serverAddresses). See
nextHostInReconfigMode for the selection logic.
- *
- * See <a
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355">ZOOKEEPER-1355</a>
- * for the protocol and its evaluation, and StaticHostProviderTest for the
tests that illustrate how load balancing
- * works with this policy.
- *
- * @param serverAddresses new host list
- * @param currentHost the host to which this client is currently connected
- * @return true if changing connections is necessary for load-balancing,
false otherwise
- */
@Override
- public synchronized boolean updateServerList(
- Collection<InetSocketAddress> serverAddresses,
- InetSocketAddress currentHost) {
- List<InetSocketAddress> shuffledList = shuffle(serverAddresses);
- if (shuffledList.isEmpty()) {
- throw new IllegalArgumentException("A HostProvider may not be
empty!");
- }
- // Check if client's current server is in the new list of servers
- boolean myServerInNewConfig = false;
-
- InetSocketAddress myServer = currentHost;
-
- // choose "current" server according to the client rebalancing
algorithm
- if (reconfigMode) {
- myServer = next(0);
- }
-
- // if the client is not currently connected to any server
- if (myServer == null) {
- // reconfigMode = false (next shouldn't return null).
- if (lastIndex >= 0) {
- // take the last server to which we were connected
- myServer = this.serverAddresses.get(lastIndex);
- } else {
- // take the first server on the list
- myServer = this.serverAddresses.get(0);
- }
- }
-
- for (InetSocketAddress addr : shuffledList) {
- if (addr.getPort() == myServer.getPort()
- && ((addr.getAddress() != null
- && myServer.getAddress() != null
- && addr.getAddress().equals(myServer.getAddress()))
- || addr.getHostString().equals(myServer.getHostString())))
{
- myServerInNewConfig = true;
- break;
- }
- }
-
- reconfigMode = true;
-
- newServers.clear();
- oldServers.clear();
- // Divide the new servers into oldServers that were in the previous
list
- // and newServers that were not in the previous list
- for (InetSocketAddress address : shuffledList) {
- if (this.serverAddresses.contains(address)) {
- oldServers.add(address);
- } else {
- newServers.add(address);
- }
- }
-
- int numOld = oldServers.size();
- int numNew = newServers.size();
-
- // number of servers increased
- if (numOld + numNew > this.serverAddresses.size()) {
- if (myServerInNewConfig) {
- // my server is in new config, but load should be decreased.
- // Need to decide if this client
- // is moving to one of the new servers
- if (sourceOfRandomness.nextFloat() <= (1 - ((float)
this.serverAddresses.size()) / (numOld + numNew))) {
- pNew = 1;
- pOld = 0;
- } else {
- // do nothing special - stay with the current server
- reconfigMode = false;
- }
- } else {
- // my server is not in new config, and load on old servers must
- // be decreased, so connect to
- // one of the new servers
- pNew = 1;
- pOld = 0;
- }
- } else { // number of servers stayed the same or decreased
- if (myServerInNewConfig) {
- // my server is in new config, and load should be increased, so
- // stay with this server and do nothing special
- reconfigMode = false;
- } else {
- pOld = ((float) (numOld * (this.serverAddresses.size() -
(numOld + numNew))))
- / ((numOld + numNew) * (this.serverAddresses.size() -
numOld));
- pNew = 1 - pOld;
- }
- }
-
- if (!reconfigMode) {
- currentIndex = shuffledList.indexOf(getServerAtCurrentIndex());
- } else {
- currentIndex = -1;
- }
- this.serverAddresses = shuffledList;
- currentIndexOld = -1;
- currentIndexNew = -1;
- lastIndex = currentIndex;
- return reconfigMode;
- }
-
- public synchronized InetSocketAddress getServerAtIndex(int i) {
- if (i < 0 || i >= serverAddresses.size()) {
- return null;
- }
- return serverAddresses.get(i);
+ public boolean updateServerList(Collection<InetSocketAddress>
serverAddresses, InetSocketAddress currentHost) {
+ return connectionManager.updateServerList(serverAddresses,
currentHost);
}
- public synchronized InetSocketAddress getServerAtCurrentIndex() {
- return getServerAtIndex(currentIndex);
+ @Override
+ public int size() {
+ return connectionManager.size();
}
- public synchronized int size() {
- return serverAddresses.size();
+ @Override
+ public InetSocketAddress next(long spinDelay) {
+ return connectionManager.next(spinDelay);
}
- /**
- * Get the next server to connect to, when in "reconfigMode", which means
that
- * you've just updated the server list, and now trying to find some server
to connect to.
- * Once onConnected() is called, reconfigMode is set to false. Similarly,
if we tried to connect
- * to all servers in new config and failed, reconfigMode is set to false.
- *
- * While in reconfigMode, we should connect to a server in newServers with
probability pNew and to servers in
- * oldServers with probability pOld (which is just 1-pNew). If we tried
out all servers in either oldServers
- * or newServers we continue to try servers from the other set, regardless
of pNew or pOld. If we tried all servers
- * we give up and go back to the normal round robin mode
- *
- * When called, this should be protected by synchronized(this)
- */
- private InetSocketAddress nextHostInReconfigMode() {
- boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew);
-
- // take one of the new servers if it is possible (there are still such
- // servers we didn't try),
- // and either the probability tells us to connect to one of the new
- // servers or if we already
- // tried all the old servers
- if (((currentIndexNew + 1) < newServers.size()) && (takeNew ||
(currentIndexOld + 1) >= oldServers.size())) {
- ++currentIndexNew;
- return newServers.get(currentIndexNew);
- }
-
- // start taking old servers
- if ((currentIndexOld + 1) < oldServers.size()) {
- ++currentIndexOld;
- return oldServers.get(currentIndexOld);
- }
-
- return null;
+ @Override
+ public void onConnected() {
+ connectionManager.onConnected();
}
- public InetSocketAddress next(long spinDelay) {
- boolean needToSleep = false;
- InetSocketAddress addr;
-
- synchronized (this) {
- if (reconfigMode) {
- addr = nextHostInReconfigMode();
- if (addr != null) {
- currentIndex = serverAddresses.indexOf(addr);
- return resolve(addr);
- }
- //tried all servers and couldn't connect
- reconfigMode = false;
- needToSleep = (spinDelay > 0);
- }
- ++currentIndex;
- if (currentIndex == serverAddresses.size()) {
- currentIndex = 0;
- }
- addr = serverAddresses.get(currentIndex);
- needToSleep = needToSleep || (currentIndex == lastIndex &&
spinDelay > 0);
- if (lastIndex == -1) {
- // We don't want to sleep on the first ever connect attempt.
- lastIndex = 0;
- }
- }
- if (needToSleep) {
- try {
- Thread.sleep(spinDelay);
- } catch (InterruptedException e) {
- LOG.warn("Unexpected exception", e);
- }
- }
-
- return resolve(addr);
+ public InetSocketAddress getServerAtIndex(int i) {
+ return connectionManager.getServerAtIndex(i);
}
- public synchronized void onConnected() {
- lastIndex = currentIndex;
- reconfigMode = false;
+ public InetSocketAddress getServerAtCurrentIndex() {
+ return connectionManager.getServerAtCurrentIndex();
}
-
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
index 91a390dda..a429d4838 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java
@@ -69,6 +69,13 @@ public class ZKClientConfig extends ZKConfig {
public static final String ZOOKEEPER_SHUFFLE_DNS_RESPONSE =
"zookeeper.shuffleDnsResponse";
public static final boolean ZOOKEEPER_SHUFFLE_DNS_RESPONSE_DEFAULT = false;
+ /**
+ * DNS SRV refresh interval in seconds for DnsSrvHostProvider.
+ * A value of 0 disables periodic refresh.
+ */
+ public static final String DNS_SRV_REFRESH_INTERVAL_SECONDS =
"zookeeper.hostProvider.dnsSrvRefreshIntervalSeconds";
+ public static final long DNS_SRV_REFRESH_INTERVAL_SECONDS_DEFAULT = 60;
+
public ZKClientConfig() {
super();
initFromJavaSystemProperties();
@@ -129,6 +136,7 @@ protected void handleBackwardCompatibility() {
setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET,
System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET));
setProperty(SECURE_CLIENT, System.getProperty(SECURE_CLIENT));
setProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS,
System.getProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS));
+ setProperty(DNS_SRV_REFRESH_INTERVAL_SECONDS,
System.getProperty(DNS_SRV_REFRESH_INTERVAL_SECONDS));
}
/**
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
new file mode 100644
index 000000000..386cb9d5a
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Test;
+
+public class HostProviderSelectionTest extends ZKTestCase {
+ @Test
+ public void testStaticHostProviderSelection() throws Exception {
+ final String[] staticFormats = {
+ "localhost:2181",
+ "zk1:2181,zk2:2181,zk3:2181",
+ "zk1:2181,zk2:2181/myapp",
+ "[::1]:2181,[2001:db8::1]:2181"
+ };
+
+ for (final String connectString : staticFormats) {
+ // Test without config
+ try (final ZooKeeper zk = new ZooKeeper(connectString,
+ ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE)) {
+ assertNotNull(zk);
+ }
+
+ // Test with config
+ final ZKClientConfig config = new ZKClientConfig();
+ try (final ZooKeeper zk = new ZooKeeper(connectString,
+ ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE,
config)) {
+ assertNotNull(zk);
+ }
+ }
+ }
+
+ @Test
+ public void testDnsSrvHostProviderSelection() {
+ final String[] dnsSrvFormats = {
+ "dns-srv://nonexistent.test.local",
+ "dns-srv://nonexistent.test.local/myapp"
+ };
+
+ for (final String connectString : dnsSrvFormats) {
+ // Test without config
+ final IllegalArgumentException exception1 =
assertThrows(IllegalArgumentException.class, () ->
+ new ZooKeeper(connectString,
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE));
+ validateDnsSrvError(exception1);
+
+ // Test with config
+ final ZKClientConfig config = new ZKClientConfig();
+ final IllegalArgumentException exception2 =
assertThrows(IllegalArgumentException.class, () ->
+ new ZooKeeper(connectString,
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE, config));
+ validateDnsSrvError(exception2);
+ }
+ }
+
+ @Test
+ public void testInvalidFormats() {
+ final String[] invalidFormats = {
+ "",
+ "dns-srv://"
+ };
+
+ for (final String connectString : invalidFormats) {
+ final IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
+ new ZooKeeper(connectString,
ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE));
+ final String message = exception.getMessage();
+ assertTrue(message.contains("Connect string cannot be null or
empty")
+ || message.contains("DNS name cannot be null or empty"));
+ }
+ }
+
+ private void validateDnsSrvError(final IllegalArgumentException exception)
{
+ final String message = exception.getMessage();
+ assertTrue(message.contains("Failed to initialize DnsSrvHostProvider
for DNS name:"));
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
new file mode 100644
index 000000000..136b6e47a
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.zookeeper.client.DnsSrvHostProvider.DnsSrvResolver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.SRVRecord;
+
+public class DnsSrvHostProviderTest {
+
+ private static final String TEST_DNS_NAME = "_zookeeper._tcp.example.com.";
+ private static final long TEST_SEED = 12345L;
+
+ private DnsSrvResolver mockDnsSrvResolver;
+
+ @BeforeEach
+ public void setUp() {
+ mockDnsSrvResolver = mock(DnsSrvResolver.class);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ System.clearProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS);
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ final SRVRecord[] srvRecords = createMockSrvRecords();
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ try (final DnsSrvHostProvider hostProvider = new
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+ assertEquals(3, hostProvider.size());
+ assertNotNull(hostProvider.next(0));
+ }
+ }
+
+ @Test
+ public void testServerIteration() throws Exception {
+ final SRVRecord[] srvRecords = createMockSrvRecords();
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ try (final DnsSrvHostProvider hostProvider = new
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+ final InetSocketAddress addr1 = hostProvider.next(0);
+ final InetSocketAddress addr2 = hostProvider.next(0);
+ final InetSocketAddress addr3 = hostProvider.next(0);
+
+ final Set<InetSocketAddress> actualAddresses = new HashSet<>();
+ actualAddresses.add(addr1);
+ actualAddresses.add(addr2);
+ actualAddresses.add(addr3);
+
+ final Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+ expectedAddresses.add(new InetSocketAddress("server1.example.com",
2181));
+ expectedAddresses.add(new InetSocketAddress("server2.example.com",
2181));
+ expectedAddresses.add(new InetSocketAddress("server3.example.com",
2181));
+
+ assertEquals(expectedAddresses, actualAddresses);
+
+ // cycle back
+ final InetSocketAddress addr4 = hostProvider.next(0);
+ assertTrue(expectedAddresses.contains(addr4));
+ }
+ }
+
+ @Test
+ public void testEmptyDnsName() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider("", TEST_SEED, mockDnsSrvResolver,
null));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(null, TEST_SEED, mockDnsSrvResolver,
null));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(" ", TEST_SEED, mockDnsSrvResolver,
null));
+ }
+
+ @Test
+ public void testNoSrvRecords() throws Exception {
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(new
SRVRecord[0]);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED,
mockDnsSrvResolver, null));
+ }
+
+ @Test
+ public void testDnsLookupFailure() throws Exception {
+ when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME))
+ .thenThrow(new java.io.IOException("DNS lookup failed"));
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED,
mockDnsSrvResolver, null));
+ }
+
+ @Test
+ public void testInvalidPortFiltering() throws Exception {
+ // Create SRV record with invalid port (0)
+ final SRVRecord invalidPortRecord =
createMockSrvRecord("server1.example.com.", 0);
+ final SRVRecord[] srvRecords = new SRVRecord[]{invalidPortRecord};
+
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED,
mockDnsSrvResolver, null));
+ }
+
+ @Test
+ public void testTrailingDotRemoval() throws Exception {
+ final SRVRecord recordWithDot =
createMockSrvRecord("server1.example.com.", 2181);
+ final SRVRecord[] srvRecords = new SRVRecord[]{recordWithDot};
+
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ try (final DnsSrvHostProvider hostProvider = new
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+ assertEquals(1, hostProvider.size());
+ final InetSocketAddress addr = hostProvider.next(0);
+
+ // validate trailing dot is removed
+ assertEquals("server1.example.com", addr.getHostString());
+ }
+ }
+
+ @Test
+ public void testRefreshIntervalZeroDisablesPeriodicRefresh() throws
Exception {
+ // Set system property to disable refresh
+ System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS,
"0");
+
+ final SRVRecord[] srvRecords = createMockSrvRecords();
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ try (final DnsSrvHostProvider hostProvider = new
DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsSrvResolver, null)) {
+ // Verify initial setup works
+ assertEquals(3, hostProvider.size());
+
+ // Wait to ensure no background refresh occurs
+ Thread.sleep(1000);
+
+ // Verify DNS resolver was only called once during initialization
(no periodic refresh)
+ verify(mockDnsSrvResolver,
times(1)).lookupSrvRecords(TEST_DNS_NAME);
+
+ // Verify host provider still works normally
+ assertNotNull(hostProvider.next(0));
+
+ // Test multiple next() calls to ensure functionality is not
affected
+ for (int i = 0; i < 5; i++) {
+ assertNotNull(hostProvider.next(0));
+ }
+
+ // Verify no additional DNS calls were made
+ verify(mockDnsSrvResolver,
times(1)).lookupSrvRecords(TEST_DNS_NAME);
+ }
+ }
+
+ @Test
+ public void testRefreshIntervalNegative() throws Exception {
+ System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_SECONDS,
"-1");
+
+ final SRVRecord[] srvRecords = createMockSrvRecords();
+
when(mockDnsSrvResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords);
+
+ final IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED,
mockDnsSrvResolver, null));
+
+ assertEquals("Invalid DNS SRV refresh interval: -1",
exception.getMessage());
+ }
+
+ private SRVRecord[] createMockSrvRecords() {
+ return new SRVRecord[]{
+ createMockSrvRecord("server1.example.com.", 2181),
+ createMockSrvRecord("server2.example.com.", 2181),
+ createMockSrvRecord("server3.example.com.", 2181)
+ };
+ }
+
+ private SRVRecord createMockSrvRecord(final String target, final int port)
{
+ try {
+ final Name targetName = Name.fromString(target);
+ final Name serviceName = Name.fromString(TEST_DNS_NAME);
+ return new SRVRecord(serviceName, 1, 300, 1, 1, port, targetName);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to create mock SRV record", e);
+ }
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
index fc30e6992..d7d55a4b1 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java
@@ -19,6 +19,11 @@
package org.apache.zookeeper.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.client.ConnectStringParser;
import org.junit.jupiter.api.Test;
@@ -106,4 +111,89 @@ public void testParseIPV6ConnectionString() {
assertEquals(2183, parser.getServerAddresses().get(2).getPort());
}
+ @Test
+ public void testDnsSrvFormatNoChroot() {
+ testDnsSrvFormat("dns-srv://zookeeper.myapp.com",
"zookeeper.myapp.com", null);
+ }
+
+ @Test
+ public void testDnsSrvFormatWithChroot() {
+ testDnsSrvFormat("dns-srv://zookeeper.myapp.com/myapp",
"zookeeper.myapp.com", "/myapp");
+ }
+
+ @Test
+ public void testDnsSrvFormatWithNestedChroot() {
+ testDnsSrvFormat("dns-srv://zookeeper.shared.com/services/auth",
"zookeeper.shared.com", "/services/auth");
+ }
+
+ @Test
+ public void testDnsSrvFormatWithRootChroot() {
+ testDnsSrvFormat("dns-srv://zookeeper.myapp.com/",
"zookeeper.myapp.com", null);
+ }
+
+ @Test
+ public void testDnsSrvFormatWithSubdomain() {
+ testDnsSrvFormat("dns-srv://zk.prod.myapp.com/production",
"zk.prod.myapp.com", "/production");
+ }
+
+ @Test
+ public void testDnsSrvFormatInvalidChroot() {
+ final String connectString = "dns-srv://zookeeper.myapp.com/invalid/";
+ assertThrows(IllegalArgumentException.class, () -> new
ConnectStringParser(connectString));
+ }
+
+ @Test
+ public void testMixedFormatsComparison() {
+ final String hostPortFormat = "zk1:2181,zk2:2181/myapp";
+ final String dnsSrvFormat = "dns-srv://zookeeper.myapp.com/myapp";
+
+ final ConnectStringParser hostPortParser = new
ConnectStringParser(hostPortFormat);
+ final ConnectStringParser dnsSrvParser = new
ConnectStringParser(dnsSrvFormat);
+
+ // Both should have the same chroot path
+ assertEquals("/myapp", hostPortParser.getChrootPath());
+ assertEquals("/myapp", dnsSrvParser.getChrootPath());
+
+ // Host:Port format should have multiple server addresses
+ assertEquals(2, hostPortParser.getServerAddresses().size());
+ assertEquals("zk1",
hostPortParser.getServerAddresses().get(0).getHostString());
+ assertEquals("zk2",
hostPortParser.getServerAddresses().get(1).getHostString());
+
+ // DNS SRV format should have single DNS service name
+ assertEquals(1, dnsSrvParser.getServerAddresses().size());
+ assertEquals("zookeeper.myapp.com",
dnsSrvParser.getServerAddresses().get(0).getHostString());
+ }
+
+ @Test
+ public void testBackwardCompatibility() {
+ final Map<String, Integer> testCases = new HashMap<>();
+
+ testCases.put("localhost:2181", 1);
+ testCases.put("zk1:2181,zk2:2181,zk3:2181", 3);
+ testCases.put("zk1:2181,zk2:2181/myapp", 2);
+ testCases.put("[::1]:2181", 1);
+ testCases.put("[2001:db8::1]:2181,[2001:db8::2]:2181/test", 2);
+
+ for (final Map.Entry<String, Integer> testCase : testCases.entrySet())
{
+ final String connectString = testCase.getKey();
+ final int expectedSize = testCase.getValue();
+
+ final ConnectStringParser parser = new
ConnectStringParser(connectString);
+ assertNotNull(parser.getServerAddresses());
+ assertEquals(expectedSize, parser.getServerAddresses().size());
+ }
+ }
+
+ private void testDnsSrvFormat(final String connectString, final String
expectedHostName, final String expectedChrootPath) {
+ final ConnectStringParser parser = new
ConnectStringParser(connectString);
+
+ if (expectedChrootPath == null) {
+ assertNull(parser.getChrootPath());
+ } else {
+ assertEquals(expectedChrootPath, parser.getChrootPath());
+ }
+ assertEquals(1, parser.getServerAddresses().size());
+ assertEquals(expectedHostName,
parser.getServerAddresses().get(0).getHostString());
+ assertEquals(2181, parser.getServerAddresses().get(0).getPort());
+ }
}