This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new eb2de50 ZOOKEEPER-3320: backport to branch-3.5): Leader election port
stop listen when hostname unresolvable for some time
eb2de50 is described below
commit eb2de507105c94e5ff0b83e2a4794bc21bf25a82
Author: Igor Skokov <[email protected]>
AuthorDate: Tue Aug 13 13:36:15 2019 +0200
ZOOKEEPER-3320: backport to branch-3.5): Leader election port stop listen
when hostname unresolvable for some time
Backport of #1033 to branch-3.5
Author: Igor Skokov <[email protected]>
Reviewers: [email protected], [email protected]
Closes #1053 from Lagrang/ZOOKEEPER-3320-BACKPORT35
---
.../src/main/resources/markdown/zookeeperAdmin.md | 13 ++
.../{server/ExitCode.java => common/NetUtils.java} | 26 +++-
.../java/org/apache/zookeeper/server/ExitCode.java | 43 +++++-
.../zookeeper/server/ZooKeeperCriticalThread.java | 2 +-
.../zookeeper/server/quorum/QuorumCnxManager.java | 165 +++++++++++++--------
.../org/apache/zookeeper/test/CnxManagerTest.java | 32 ++++
6 files changed, 211 insertions(+), 70 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index f96faa6..fd0f8ee 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -862,6 +862,19 @@ As an example, this will enable all four letter word
commands:
properly, check your operating system's options regarding TCP
keepalive for more information. Defaults to
**false**.
+
+* *electionPortBindRetry* :
+ (Java system property only: **zookeeper.electionPortBindRetry**)
+ Property set max retry count when Zookeeper server fails to bind
+ leader election port. Such errors can be temporary and recoverable,
+ such as DNS issue described in
[ZOOKEEPER-3320](https://issues.apache.org/jira/projects/ZOOKEEPER/issues/ZOOKEEPER-3320),
+ or non-retryable, such as port already in use.
+ In case of transient errors, this property can improve availability
+ of Zookeeper server and help it to self recover.
+ Default value 3. In container environment, especially in Kubernetes,
+ this value should be increased or set to 0(infinite retry) to overcome
issues
+ related to DNS name resolving.
+
<a name="sc_authOptions"></a>
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NetUtils.java
similarity index 53%
copy from
zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
copy to zookeeper-server/src/main/java/org/apache/zookeeper/common/NetUtils.java
index 02d96cb..4779003 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NetUtils.java
@@ -15,13 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zookeeper.server;
+
+package org.apache.zookeeper.common;
+
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
/**
- * Exit code used to exit server
+ * This class contains common utilities for netstuff. Like printing IPv6
literals correctly
*/
-public class ExitCode {
+public class NetUtils {
+
+ public static String formatInetAddr(InetSocketAddress addr) {
+ InetAddress ia = addr.getAddress();
+
+ if (ia == null) {
+ return String.format("%s:%s", addr.getHostString(),
addr.getPort());
+ }
- /* Represents unexpected error */
- public final static int UNEXPECTED_ERROR = 1;
+ if (ia instanceof Inet6Address) {
+ return String.format("[%s]:%s", ia.getHostAddress(),
addr.getPort());
+ } else {
+ return String.format("%s:%s", ia.getHostAddress(), addr.getPort());
+ }
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
index 02d96cb..e4d2624 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ExitCode.java
@@ -20,8 +20,43 @@ package org.apache.zookeeper.server;
/**
* Exit code used to exit server
*/
-public class ExitCode {
+public enum ExitCode {
- /* Represents unexpected error */
- public final static int UNEXPECTED_ERROR = 1;
-}
+ /** Execution finished normally */
+ EXECUTION_FINISHED(0),
+
+ /** Unexpected errors like IO Exceptions */
+ UNEXPECTED_ERROR(1),
+
+ /** Invalid arguments during invocations */
+ INVALID_INVOCATION(2),
+
+ /** Cannot access datadir when trying to replicate server */
+ UNABLE_TO_ACCESS_DATADIR(3),
+
+ /** Unable to start admin server at ZooKeeper startup */
+ ERROR_STARTING_ADMIN_SERVER(4),
+
+ /** Severe error during snapshot IO */
+ TXNLOG_ERROR_TAKING_SNAPSHOT(10),
+
+ /** zxid from COMMIT does not match the one from pendingTxns queue */
+ UNMATCHED_TXN_COMMIT(12),
+
+ /** Unexpected packet from leader, or unable to truncate log on
Leader.TRUNC */
+ QUORUM_PACKET_ERROR(13),
+
+ /** Unable to bind to the quorum (election) port after multiple retry */
+ UNABLE_TO_BIND_QUORUM_PORT(14);
+
+ private final int value;
+
+ ExitCode(final int newValue) {
+ value = newValue;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+}
\ No newline at end of file
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperCriticalThread.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperCriticalThread.java
index 4fefbd3..d52d01d 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperCriticalThread.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperCriticalThread.java
@@ -46,6 +46,6 @@ public class ZooKeeperCriticalThread extends ZooKeeperThread {
@Override
protected void handleException(String threadName, Throwable e) {
LOG.error("Severe unrecoverable error, from thread : {}", threadName,
e);
- listener.notifyStopping(threadName, ExitCode.UNEXPECTED_ERROR);
+ listener.notifyStopping(threadName,
ExitCode.UNEXPECTED_ERROR.getValue());
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 2727f01..4dcab9b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -18,6 +18,8 @@
package org.apache.zookeeper.server.quorum;
+import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -27,6 +29,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
@@ -34,7 +37,6 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,12 +44,12 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.net.ssl.SSLSocket;
-
import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
@@ -60,11 +62,11 @@ import org.slf4j.LoggerFactory;
* maintains one connection for every pair of servers. The tricky part is to
* guarantee that there is exactly one connection for every pair of servers
that
* are operating correctly and that can communicate over the network.
- *
+ *
* If two servers try to start a connection concurrently, then the connection
* manager uses a very simple tie-breaking mechanism to decide which connection
- * to drop based on the IP addressed of the two parties.
- *
+ * to drop based on the IP addressed of the two parties.
+ *
* For every peer, the manager maintains a queue of messages to send. If the
* connection to any particular peer drops, then the sender thread puts the
* message back on the list. As this implementation currently uses a queue
@@ -72,7 +74,7 @@ import org.slf4j.LoggerFactory;
* message to the tail of the queue, thus changing the order of messages.
* Although this is not a problem for the leader election, it could be a
problem
* when consolidating peer communication. This is to be verified, though.
- *
+ *
*/
public class QuorumCnxManager {
@@ -87,7 +89,7 @@ public class QuorumCnxManager {
static final int SEND_CAPACITY = 1;
static final int PACKETMAXSIZE = 1024 * 512;
-
+
/*
* Negative counter for observer server ids.
*/
@@ -105,9 +107,9 @@ public class QuorumCnxManager {
static public final int maxBuffer = 2048;
/*
- * Connection time out value in milliseconds
+ * Connection time out value in milliseconds
*/
-
+
private int cnxTO = 5000;
final QuorumPeer self;
@@ -256,7 +258,7 @@ public class QuorumCnxManager {
this.queueSendMap = new ConcurrentHashMap<Long,
ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
-
+
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
@@ -314,7 +316,7 @@ public class QuorumCnxManager {
/**
* Invokes initiateConnection for testing purposes
- *
+ *
* @param sid
*/
public void testInitiateConnection(long sid) throws Exception {
@@ -590,7 +592,7 @@ public class QuorumCnxManager {
}
/**
- * Processes invoke this message to queue a message to send. Currently,
+ * Processes invoke this message to queue a message to send. Currently,
* only leader election uses it.
*/
public void toSend(Long sid, ByteBuffer b) {
@@ -616,13 +618,13 @@ public class QuorumCnxManager {
addToSendQueue(bq, b);
}
connectOne(sid);
-
+
}
}
-
+
/**
* Try to establish a connection to server with id sid using its
electionAddr.
- *
+ *
* @param sid server id
* @return boolean success indication
*/
@@ -681,10 +683,10 @@ public class QuorumCnxManager {
return false;
}
}
-
+
/**
* Try to establish a connection to server with id sid.
- *
+ *
* @param sid server id
*/
synchronized void connectOne(long sid){
@@ -718,22 +720,22 @@ public class QuorumCnxManager {
}
}
}
-
-
+
+
/**
* Try to establish a connection with each server if one
* doesn't exist.
*/
-
+
public void connectAll(){
long sid;
for(Enumeration<Long> en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
connectOne(sid);
- }
+ }
}
-
+
/**
* Check if all queues are empty, indicating that all messages have been
delivered.
@@ -756,7 +758,7 @@ public class QuorumCnxManager {
shutdown = true;
LOG.debug("Halting listener");
listener.halt();
-
+
// Wait for the listener to terminate.
try {
listener.join();
@@ -772,7 +774,7 @@ public class QuorumCnxManager {
inprogressConnections.clear();
resetConnectionThreadCount();
}
-
+
/**
* A soft halt simply finishes workers.
*/
@@ -785,7 +787,7 @@ public class QuorumCnxManager {
/**
* Helper method to set socket options.
- *
+ *
* @param sock
* Reference to socket
*/
@@ -797,7 +799,7 @@ public class QuorumCnxManager {
/**
* Helper method to close a socket.
- *
+ *
* @param sock
* Reference to socket
*/
@@ -839,12 +841,39 @@ public class QuorumCnxManager {
*/
public class Listener extends ZooKeeperThread {
+ private static final String ELECTION_PORT_BIND_RETRY =
"zookeeper.electionPortBindRetry";
+ private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
+
+ private final int portBindMaxRetry;
+ private Runnable socketBindErrorHandler = () ->
System.exit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
volatile ServerSocket ss = null;
public Listener() {
// During startup of thread, thread name will be overridden to
// specific election address
super("ListenerThread");
+
+ // maximum retry count while trying to bind to election port
+ // see ZOOKEEPER-3320 for more details
+ final Integer maxRetry =
Integer.getInteger(ELECTION_PORT_BIND_RETRY,
+
DEFAULT_PORT_BIND_MAX_RETRY);
+ if (maxRetry >= 0) {
+ LOG.info("Election port bind maximum retries is {}",
+ maxRetry == 0 ? "infinite" : maxRetry);
+ portBindMaxRetry = maxRetry;
+ } else {
+ LOG.info("'{}' contains invalid value: {}(must be >= 0). "
+ + "Use default value of {} instead.",
+ ELECTION_PORT_BIND_RETRY, maxRetry,
DEFAULT_PORT_BIND_MAX_RETRY);
+ portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY;
+ }
+ }
+
+ /**
+ * Change socket bind error handler. Used for testing.
+ */
+ public void setSocketBindErrorHandler(Runnable errorHandler) {
+ this.socketBindErrorHandler = errorHandler;
}
/**
@@ -855,7 +884,8 @@ public class QuorumCnxManager {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
- while((!shutdown) && (numRetries < 3)){
+ Exception exitException = null;
+ while ((!shutdown) && (portBindMaxRetry == 0 || numRetries <
portBindMaxRetry)) {
try {
if (self.shouldUsePortUnification()) {
LOG.info("Creating TLS-enabled quorum server socket");
@@ -882,28 +912,34 @@ public class QuorumCnxManager {
setName(addr.toString());
ss.bind(addr);
while (!shutdown) {
- client = ss.accept();
-
- setSockOpts(client);
- LOG.info("Received connection request "
- + client.getRemoteSocketAddress());
- // Receive and handle the connection request
- // asynchronously if the quorum sasl authentication is
- // enabled. This is required because sasl server
- // authentication process may take few seconds to
finish,
- // this may delay next peer connection requests.
- if (quorumSaslAuthEnabled) {
- receiveConnectionAsync(client);
- } else {
- receiveConnection(client);
+ try {
+ client = ss.accept();
+ setSockOpts(client);
+ LOG.info("Received connection request "
+ +
formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
+ // Receive and handle the connection request
+ // asynchronously if the quorum sasl
authentication is
+ // enabled. This is required because sasl server
+ // authentication process may take few seconds to
finish,
+ // this may delay next peer connection requests.
+ if (quorumSaslAuthEnabled) {
+ receiveConnectionAsync(client);
+ } else {
+ receiveConnection(client);
+ }
+ numRetries = 0;
+ } catch (SocketTimeoutException e) {
+ LOG.warn("The socket is listening for the election
accepted "
+ + "and it timed out unexpectedly, but
will retry."
+ + "see ZOOKEEPER-2836");
}
- numRetries = 0;
}
} catch (IOException e) {
if (shutdown) {
break;
}
LOG.error("Exception while listening", e);
+ exitException = e;
numRetries++;
try {
ss.close();
@@ -919,10 +955,19 @@ public class QuorumCnxManager {
}
LOG.info("Leaving listener");
if (!shutdown) {
- LOG.error("As I'm leaving the listener thread, "
- + "I won't be able to participate in leader "
- + "election any longer: "
- + self.getElectionAddress());
+ LOG.error("As I'm leaving the listener thread after "
+ + numRetries + " errors. "
+ + "I won't be able to participate in leader "
+ + "election any longer: "
+ + formatInetAddr(self.getElectionAddress())
+ + ". Use " + ELECTION_PORT_BIND_RETRY + " property
to "
+ + "increase retry count.");
+ if (exitException instanceof SocketException) {
+ // After leaving listener thread, the host cannot join the
+ // quorum anymore, this is a severe error that we cannot
+ // recover from, so we need to exit
+ socketBindErrorHandler.run();
+ }
} else if (ss != null) {
// Clean up for shutdown.
try {
@@ -966,7 +1011,7 @@ public class QuorumCnxManager {
/**
* An instance of this thread receives messages to send
* through a queue and sends them to the server sid.
- *
+ *
* @param sock
* Socket to remote peer
* @param sid
@@ -993,23 +1038,23 @@ public class QuorumCnxManager {
/**
* Returns RecvWorker that pairs up with this SendWorker.
- *
- * @return RecvWorker
+ *
+ * @return RecvWorker
*/
synchronized RecvWorker getRecvWorker(){
return recvWorker;
}
-
+
synchronized boolean finish() {
LOG.debug("Calling finish for " + sid);
-
+
if(!running){
/*
- * Avoids running finish() twice.
+ * Avoids running finish() twice.
*/
return running;
}
-
+
running = false;
closeSocket(sock);
@@ -1024,7 +1069,7 @@ public class QuorumCnxManager {
threadCnt.decrementAndGet();
return running;
}
-
+
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
@@ -1068,7 +1113,7 @@ public class QuorumCnxManager {
LOG.error("Failed to send last message. Shutting down
thread.", e);
this.finish();
}
-
+
try {
while (running && !shutdown && sock != null) {
@@ -1129,20 +1174,20 @@ public class QuorumCnxManager {
running = false;
}
}
-
+
/**
* Shuts down this worker
- *
+ *
* @return boolean Value of variable running
*/
synchronized boolean finish() {
if(!running){
/*
- * Avoids running finish() twice.
+ * Avoids running finish() twice.
*/
return running;
}
- running = false;
+ running = false;
this.interrupt();
threadCnt.decrementAndGet();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
index 1a9bb6b..ea16961 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
@@ -29,9 +29,11 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.net.Socket;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.zookeeper.common.Time;
import org.slf4j.Logger;
@@ -281,6 +283,36 @@ public class CnxManagerTest extends ZKTestCase {
}
/**
+ * Test for bug described in {@link
https://issues.apache.org/jira/browse/ZOOKEEPER-3320}.
+ * Test create peer with address which contains unresolvable DNS name,
+ * leader election listener thread should stop after N errors.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCnxManagerListenerThreadConfigurableRetry() throws
Exception {
+ final Map<Long,QuorumServer> unresolvablePeers = new HashMap<>();
+ final long myid = 1L;
+ unresolvablePeers.put(myid, new QuorumServer(myid,
"unresolvable-domain.org:2182:2183;2181"));
+ final QuorumPeer peer = new QuorumPeer(unresolvablePeers,
+ ClientBase.createTmpDir(),
+ ClientBase.createTmpDir(),
+ 2181, 3, myid, 1000, 2, 2);
+ final QuorumCnxManager cnxManager = peer.createCnxnManager();
+ final QuorumCnxManager.Listener listener = cnxManager.listener;
+ final AtomicBoolean errorHappend = new AtomicBoolean();
+ listener.setSocketBindErrorHandler(() -> errorHappend.set(true));
+ listener.start();
+ // listener thread should stop and throws error which notify
QuorumPeer about error.
+ // QuorumPeer should start shutdown process
+ listener.join(15000); // set wait time, if listener contains bug and
thread not stops.
+ Assert.assertFalse(listener.isAlive());
+ Assert.assertTrue(errorHappend.get());
+ Assert.assertFalse(QuorumPeer.class.getSimpleName() + " not stopped
after "
+ + "listener thread death", listener.isAlive());
+ }
+
+ /**
* Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
* observer connects to a 3.5.0 server.
* {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789}