This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 908c448 ZOOKEEPER-3774: Close quorum socket asynchronously on the
leader to a…
908c448 is described below
commit 908c4480ee6036bc4024309b7b16318671ae416c
Author: Jie Huang <[email protected]>
AuthorDate: Fri Oct 2 19:12:55 2020 -0700
ZOOKEEPER-3774: Close quorum socket asynchronously on the leader to a…
…void ping being blocked by long socket closing time
Author: Jie Huang <[email protected]>
Reviewers: ztzg, hanm, eolivelli, symat
Closes #1301 from jhuan31/ZOOKEEPER-3774
---
.../src/main/resources/markdown/zookeeperAdmin.md | 9 ++-
.../zookeeper/server/quorum/LearnerHandler.java | 85 +++++++++++++---------
2 files changed, 60 insertions(+), 34 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c579a71..76305a5 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1146,8 +1146,14 @@ property, when available, is noted below.
* *learner.closeSocketAsync*
(Jave system property only: **learner.closeSocketAsync**)
+ **New in 3.6.2:**
When enabled, a learner will close the quorum socket asynchronously. This
is useful for TLS connections where closing a socket might take a long time,
block the shutdown process, potentially delay a new leader election, and leave
the quorum unavailabe. Closing the socket asynchronously avoids blocking the
shutdown process despite the long socket closing time and a new leader election
can be started while the socket being closed. The default is false.
+* *leader.closeSocketAsync*
+ (Java system property only: **leader.closeSocketAsync**)
+ **New in 3.6.2:**
+ When enabled, the leader will close a quorum socket asynchoronously. This
is useful for TLS connections where closing a socket might take a long time. If
disconnecting a follower is initiated in ping() because of a failed
SyncLimitCheck then the long socket closing time will block the sending of
pings to other followers. Without receiving pings, the other followers will not
send session information to the leader, which causes sessions to expire.
Setting this flag to true ensures that [...]
+
* *forward_learner_requests_to_commit_processor_disabled*
(Jave system property:
**zookeeper.forward_learner_requests_to_commit_processor_disabled**)
When this property is set, the requests from learners won't be enqueued to
@@ -1512,7 +1518,8 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
* *sslQuorum* :
(Java system property: **zookeeper.sslQuorum**)
**New in 3.5.5:**
- Enables encrypted quorum communication. Default is `false`.
+ Enables encrypted quorum communication. Default is `false`. When enabling
this feature, please also consider enabling *leader.closeSocketAsync*
+ and *learner.closeSocketAsync* to avoid issues associated with the
potentially long socket closing time when shutting down an SSL connection.
* *ssl.keyStore.location and ssl.keyStore.password* and
*ssl.quorum.keyStore.location* and *ssl.quorum.keyStore.password* :
(Java system properties: **zookeeper.ssl.keyStore.location** and
**zookeeper.ssl.keyStore.password** and
**zookeeper.ssl.quorum.keyStore.location** and
**zookeeper.ssl.quorum.keyStore.password**)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 4a7def8..38cbaa6 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,6 +41,7 @@ import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.TxnLogProposalIterator;
@@ -63,12 +65,21 @@ public class LearnerHandler extends ZooKeeperThread {
private static final Logger LOG =
LoggerFactory.getLogger(LearnerHandler.class);
+ public static final String LEADER_CLOSE_SOCKET_ASYNC =
"leader.closeSocketAsync";
+ public static final boolean closeSocketAsync =
Boolean.parseBoolean(System.getProperty(LEADER_CLOSE_SOCKET_ASYNC, "false"));
+
+ static {
+ LOG.info("{} = {}", LEADER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
+ }
+
protected final Socket sock;
public Socket getSocket() {
return sock;
}
+ AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
+
final LearnerMaster learnerMaster;
/** Deadline for receiving the next ack. If we are bootstrapping then
@@ -277,11 +288,8 @@ public class LearnerHandler extends ZooKeeperThread {
}
} catch (IOException e) {
LOG.error("Server failed to authenticate quorum learner, addr: {},
closing connection", sock.getRemoteSocketAddress(), e);
- try {
- sock.close();
- } catch (IOException ie) {
- LOG.error("Exception while closing socket", ie);
- }
+ closeSocket();
+
throw new SaslException("Authentication failure: " +
e.getMessage());
}
@@ -357,17 +365,11 @@ public class LearnerHandler extends ZooKeeperThread {
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {
- if (!sock.isClosed()) {
- LOG.warn("Unexpected exception at {}", this, e);
- try {
- // this will cause everything to shutdown on
- // this learner handler and will help notify
- // the learner/observer instantaneously
- sock.close();
- } catch (IOException ie) {
- LOG.warn("Error closing socket for handler {}", this,
ie);
- }
- }
+ LOG.error("Exception while sending packets in LearnerHandler",
e);
+ // this will cause everything to shutdown on
+ // this learner handler and will help notify
+ // the learner/observer instantaneously
+ closeSocket();
break;
}
}
@@ -710,16 +712,8 @@ public class LearnerHandler extends ZooKeeperThread {
}
}
} catch (IOException e) {
- if (sock != null && !sock.isClosed()) {
- LOG.error("Unexpected exception causing shutdown while sock
still open", e);
- //close the socket to make sure the
- //other side can see it being close
- try {
- sock.close();
- } catch (IOException ie) {
- // do nothing
- }
- }
+ LOG.error("Unexpected exception in LearnerHandler: ", e);
+ closeSocket();
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
@@ -1050,13 +1044,9 @@ public class LearnerHandler extends ZooKeeperThread {
} catch (InterruptedException e) {
LOG.warn("Ignoring unexpected exception", e);
}
- try {
- if (sock != null && !sock.isClosed()) {
- sock.close();
- }
- } catch (IOException e) {
- LOG.warn("Ignoring unexpected exception during socket close", e);
- }
+
+ closeSocket();
+
this.interrupt();
learnerMaster.removeLearnerHandler(this);
learnerMaster.unregisterLearnerHandlerBean(this);
@@ -1157,4 +1147,33 @@ public class LearnerHandler extends ZooKeeperThread {
needOpPacket = value;
}
+ void closeSocket() {
+ if (sock != null && !sock.isClosed() &&
sockBeingClosed.compareAndSet(false, true)) {
+ if (closeSocketAsync) {
+ LOG.info("Asynchronously closing socket to learner {}.",
getSid());
+ closeSockAsync();
+ } else {
+ LOG.info("Synchronously closing socket to learner {}.",
getSid());
+ closeSockSync();
+ }
+ }
+ }
+
+ void closeSockAsync() {
+ final Thread closingThread = new Thread(() -> closeSockSync(),
"CloseSocketThread(sid:" + this.sid);
+ closingThread.setDaemon(true);
+ closingThread.start();
+ }
+
+ void closeSockSync() {
+ try {
+ if (sock != null) {
+ long startTime = Time.currentElapsedTime();
+ sock.close();
+
ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() -
startTime);
+ }
+ } catch (IOException e) {
+ LOG.warn("Ignoring error closing connection to learner {}",
getSid(), e);
+ }
+ }
}