This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b4f6e82 ZOOKEEPER-3574: Close quorum socket asynchronously to avoid
server shutdown stalled by long socket closing time
b4f6e82 is described below
commit b4f6e82de10e6f87836f8221970110dad2b3825c
Author: Jie Huang <[email protected]>
AuthorDate: Sun May 3 19:43:27 2020 +0200
ZOOKEEPER-3574: Close quorum socket asynchronously to avoid server shutdown
stalled by long socket closing time
…utdown stalled by long socket closing time
Author: Jie Huang <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Fangmin Lyu
<[email protected]>
Closes #1115 from jhuan31/ZOOKEEPER-3574
---
.../src/main/resources/markdown/zookeeperAdmin.md | 173 +++++++++++----------
.../org/apache/zookeeper/server/ServerMetrics.java | 5 +
.../apache/zookeeper/server/quorum/Learner.java | 26 ++++
3 files changed, 120 insertions(+), 84 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index ca1a3f7..6c616d1 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -202,11 +202,11 @@ ensemble:
though about a few here:
Every machine that is part of the ZooKeeper ensemble should know
about every other machine in the ensemble. You accomplish this with
- the series of lines of the form **server.id=host:port:port**.
- (The parameters **host** and **port** are straightforward, for each server
+ the series of lines of the form **server.id=host:port:port**.
+ (The parameters **host** and **port** are straightforward, for each server
you need to specify first a Quorum port then a dedicated port for ZooKeeper
leader
- election). Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
- for each ZooKeeper server instance (this can increase availability when
multiple physical
+ election). Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
+ for each ZooKeeper server instance (this can increase availability when
multiple physical
network interfaces can be used parallel in the cluster).
You attribute the
server id to each machine by creating a file named
@@ -233,7 +233,7 @@ ensemble:
ensemble.
7. If your configuration file is set up, you can start a
- ZooKeeper server:
+ ZooKeeper server:
$ java -cp zookeeper.jar:lib/*:conf
org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf
@@ -1004,7 +1004,7 @@ property, when available, is noted below.
* *zookeeper.request_throttler.shutdownTimeout* :
(Java system property only)
**New in 3.6.0:**
- The time (in milliseconds) the RequestThrottler waits for the request
queue to drain during shutdown before it shuts down forcefully. The default is
10000.
+ The time (in milliseconds) the RequestThrottler waits for the request
queue to drain during shutdown before it shuts down forcefully. The default is
10000.
* *advancedFlowControlEnabled* :
(Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
@@ -1032,59 +1032,59 @@ property, when available, is noted below.
**New in 3.6.0:**
The digest feature is added to detect the data inconsistency inside
ZooKeeper when loading database from disk, catching up and following
- leader, its doing incrementally hash check for the DataTree based on
+ leader, its doing incrementally hash check for the DataTree based on
the adHash paper mentioned in
https://cseweb.ucsd.edu/~daniele/papers/IncHash.pdf
- The idea is simple, the hash value of DataTree will be updated
incrementally
- based on the changes to the set of data. When the leader is preparing the
txn,
- it will pre-calculate the hash of the tree based on the changes happened
with
+ The idea is simple, the hash value of DataTree will be updated
incrementally
+ based on the changes to the set of data. When the leader is preparing the
txn,
+ it will pre-calculate the hash of the tree based on the changes happened
with
formula:
current_hash = current_hash + hash(new node data) - hash(old node data)
- If it’s creating a new node, the hash(old node data) will be 0, and if
it’s a
+ If it’s creating a new node, the hash(old node data) will be 0, and if
it’s a
delete node op, the hash(new node data) will be 0.
- This hash will be associated with each txn to represent the expected hash
value
- after applying the txn to the data tree, it will be sent to followers with
- original proposals. Learner will compare the actual hash value with the
one in
- the txn after applying the txn to the data tree, and report mismatch if
it’s not
+ This hash will be associated with each txn to represent the expected hash
value
+ after applying the txn to the data tree, it will be sent to followers with
+ original proposals. Learner will compare the actual hash value with the
one in
+ the txn after applying the txn to the data tree, and report mismatch if
it’s not
the same.
- These digest value will also be persisted with each txn and snapshot on
the disk,
- so when servers restarted and load data from disk, it will compare and see
if
+ These digest value will also be persisted with each txn and snapshot on
the disk,
+ so when servers restarted and load data from disk, it will compare and see
if
there is hash mismatch, which will help detect data loss issue on disk.
- For the actual hash function, we’re using CRC internally, it’s not a
collisionless
- hash function, but it’s more efficient compared to collisionless hash, and
the
+ For the actual hash function, we’re using CRC internally, it’s not a
collisionless
+ hash function, but it’s more efficient compared to collisionless hash, and
the
collision possibility is really really rare and can already meet our needs
here.
- This feature is backward and forward compatible, so it can safely rolling
upgrade,
- downgrade, enabled and later disabled without any compatible issue. Here
are the
+ This feature is backward and forward compatible, so it can safely rolling
upgrade,
+ downgrade, enabled and later disabled without any compatible issue. Here
are the
scenarios have been covered and tested:
- 1. When leader runs with new code while follower runs with old one, the
digest will
- be append to the end of each txn, follower will only read header and
txn data,
- digest value in the txn will be ignored. It won't affect the follower
reads and
+ 1. When leader runs with new code while follower runs with old one, the
digest will
+ be append to the end of each txn, follower will only read header and
txn data,
+ digest value in the txn will be ignored. It won't affect the follower
reads and
processes the next txn.
2. When leader runs with old code while follower runs with new one, the
digest won't
- be sent with txn, when follower tries to read the digest, it will throw
EOF which
+ be sent with txn, when follower tries to read the digest, it will throw
EOF which
is caught and handled gracefully with digest value set to null.
3. When loading old snapshot with new code, it will throw IOException when
trying to
read the non-exist digest value, and the exception will be caught and
digest will
- be set to null, which means we won't compare digest when loading this
snapshot,
+ be set to null, which means we won't compare digest when loading this
snapshot,
which is expected to happen during rolling upgrade
- 4. When loading new snapshot with old code, it will finish successfully
after deserialzing
+ 4. When loading new snapshot with old code, it will finish successfully
after deserialzing
the data tree, the digest value at the end of snapshot file will be
ignored
- 5. The scenarios of rolling restart with flags change are similar to the
1st and 2nd
+ 5. The scenarios of rolling restart with flags change are similar to the
1st and 2nd
scenarios discussed above, if the leader enabled but follower not,
digest value will
be ignored, and follower won't compare the digest during runtime; if
leader disabled
but follower enabled, follower will get EOF exception which is handled
gracefully.
- Note: the current digest calculation excluded nodes under /zookeeper
- due to the potential inconsistency in the /zookeeper/quota stat node,
+ Note: the current digest calculation excluded nodes under /zookeeper
+ due to the potential inconsistency in the /zookeeper/quota stat node,
we can include that after that issue is fixed.
By default, this feautre is disabled, set "true" to enable it.
@@ -1107,7 +1107,7 @@ property, when available, is noted below.
**New in 3.6.0:**
By default audit logs are disabled. Set to "true" to enable it. Default
value is "false".
See the [ZooKeeper audit logs](zookeeperAuditLogs.html) for more
information.
-
+
* *audit.impl.class* :
(Java system property: **zookeeper.audit.impl.class**)
**New in 3.6.0:**
@@ -1125,12 +1125,12 @@ property, when available, is noted below.
**New in 3.6.0:**
The size threshold after which a request is considered a large request. If
it is -1, then all requests are considered small, effectively turning off large
request throttling. The default is -1.
-* *outstandingHandshake.limit*
+* *outstandingHandshake.limit*
(Jave system property only:
**zookeeper.netty.server.outstandingHandshake.limit**)
- The maximum in-flight TLS handshake connections could have in ZooKeeper,
- the connections exceed this limit will be rejected before starting
handshake.
- This setting doesn't limit the max TLS concurrency, but helps avoid herd
- effect due to TLS handshake timeout when there are too many in-flight TLS
+ The maximum in-flight TLS handshake connections could have in ZooKeeper,
+ the connections exceed this limit will be rejected before starting
handshake.
+ This setting doesn't limit the max TLS concurrency, but helps avoid herd
+ effect due to TLS handshake timeout when there are too many in-flight TLS
handshakes. Set it to something like 250 is good enough to avoid herd
effect.
* *throttledOpWaitTime*
@@ -1142,6 +1142,11 @@ property, when available, is noted below.
The intent is for the clients not to retry them immediately.
When set to 0, no requests will be throttled. The default is 0.
+* *learner.closeSocketAsync*
+ (Jave system property only: **learner.closeSocketAsync**)
+ When enabled, a learner will close the quorum socket asynchronously. This
is useful for TLS connections where closing a socket might take a long time,
block the shutdown process, potentially delay a new leader election, and leave
the quorum unavailabe. Closing the socket asynchronously avoids blocking the
shutdown process despite the long socket closing time and a new leader election
can be started while the socket being closed. The default is false.
+
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
@@ -1155,13 +1160,13 @@ of servers -- that is, when deploying clusters of
servers.
non-authenticated UDP-based version of fast leader election, "2"
corresponds to the authenticated UDP-based version of fast
leader election, and "3" corresponds to TCP-based version of
- fast leader election. Algorithm 3 was made default in 3.2.0 and
+ fast leader election. Algorithm 3 was made default in 3.2.0 and
prior versions (3.0.0 and 3.1.0) were using algorithm 1 and 2 as well.
###### Note
- >The implementations of leader election 1, and 2 were
- **deprecated** in 3.4.0. Since 3.6.0 only FastLeaderElection is available,
- in case of upgrade you have to shutdown all of your servers and
- restart them with electionAlg=3 (or by removing the line from the
configuration file).
[...]
+ >The implementations of leader election 1, and 2 were
+ **deprecated** in 3.4.0. Since 3.6.0 only FastLeaderElection is available,
+ in case of upgrade you have to shutdown all of your servers and
+ restart them with electionAlg=3 (or by removing the line from the
configuration file).
[...]
* *maxTimeToWaitForEpoch* :
(Java system property: **zookeeper.leader.maxTimeToWaitForEpoch**)
@@ -1215,15 +1220,15 @@ of servers -- that is, when deploying clusters of
servers.
The first followers use to connect to the leader, and the second is for
leader election. If you want to test multiple servers on a single machine,
then
different ports can be used for each server.
-
+
<a name="id_multi_address"></a>
Since ZooKeeper 3.6.0 it is possible to specify **multiple addresses** for
each
ZooKeeper server (see
[ZOOKEEPER-3188](https://issues.apache.org/jira/projects/ZOOKEEPER/issues/ZOOKEEPER-3188)).
To enable this feature, you must set the *multiAddress.enabled*
configuration property
- to *true*. This helps to increase availability and adds network level
- resiliency to ZooKeeper. When multiple physical network interfaces are
used
- for the servers, ZooKeeper is able to bind on all interfaces and runtime
switching
+ to *true*. This helps to increase availability and adds network level
+ resiliency to ZooKeeper. When multiple physical network interfaces are used
+ for the servers, ZooKeeper is able to bind on all interfaces and runtime
switching
to a working interface in case a network error. The different addresses
can be specified
in the config using a pipe ('|') character. A valid configuration using
multiple addresses looks like:
@@ -1375,16 +1380,16 @@ As an example, this will enable all four letter word
commands:
* *electionPortBindRetry* :
(Java system property only: **zookeeper.electionPortBindRetry**)
- Property set max retry count when Zookeeper server fails to bind
- leader election port. Such errors can be temporary and recoverable,
+ Property set max retry count when Zookeeper server fails to bind
+ leader election port. Such errors can be temporary and recoverable,
such as DNS issue described in
[ZOOKEEPER-3320](https://issues.apache.org/jira/projects/ZOOKEEPER/issues/ZOOKEEPER-3320),
- or non-retryable, such as port already in use.
- In case of transient errors, this property can improve availability
- of Zookeeper server and help it to self recover.
- Default value 3. In container environment, especially in Kubernetes,
- this value should be increased or set to 0(infinite retry) to overcome
issues
+ or non-retryable, such as port already in use.
+ In case of transient errors, this property can improve availability
+ of Zookeeper server and help it to self recover.
+ Default value 3. In container environment, especially in Kubernetes,
+ this value should be increased or set to 0(infinite retry) to overcome
issues
related to DNS name resolving.
-
+
* *observer.reconnectDelayMs* :
(Java system property: **zookeeper.observer.reconnectDelayMs**)
@@ -1414,9 +1419,9 @@ As an example, this will enable all four letter word
commands:
The options in this section allow control over
encryption/authentication/authorization performed by the service.
-Beside this page, you can also find useful information about client side
configuration in the
-[Programmers Guide](zookeeperProgrammers.html#sc_java_client_configuration).
-The ZooKeeper Wiki also has useful pages about [ZooKeeper SSL
support](https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+SSL+User+Guide),
+Beside this page, you can also find useful information about client side
configuration in the
+[Programmers Guide](zookeeperProgrammers.html#sc_java_client_configuration).
+The ZooKeeper Wiki also has useful pages about [ZooKeeper SSL
support](https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+SSL+User+Guide),
and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+and+SASL).
* *DigestAuthenticationProvider.superDigest* :
@@ -1509,8 +1514,8 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
* *ssl.keyStore.type* and *ssl.quorum.keyStore.type* :
(Java system properties: **zookeeper.ssl.keyStore.type** and
**zookeeper.ssl.quorum.keyStore.type**)
**New in 3.5.5:**
- Specifies the file format of client and quorum keystores. Values: JKS,
PEM, PKCS12 or null (detect by filename).
- Default: null
+ Specifies the file format of client and quorum keystores. Values: JKS,
PEM, PKCS12 or null (detect by filename).
+ Default: null
* *ssl.trustStore.location* and *ssl.trustStore.password* and
*ssl.quorum.trustStore.location* and *ssl.quorum.trustStore.password* :
(Java system properties: **zookeeper.ssl.trustStore.location** and
**zookeeper.ssl.trustStore.password** and
**zookeeper.ssl.quorum.trustStore.location** and
**zookeeper.ssl.quorum.trustStore.password**)
@@ -1522,8 +1527,8 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
* *ssl.trustStore.type* and *ssl.quorum.trustStore.type* :
(Java system properties: **zookeeper.ssl.trustStore.type** and
**zookeeper.ssl.quorum.trustStore.type**)
**New in 3.5.5:**
- Specifies the file format of client and quorum trustStores. Values: JKS,
PEM, PKCS12 or null (detect by filename).
- Default: null
+ Specifies the file format of client and quorum trustStores. Values: JKS,
PEM, PKCS12 or null (detect by filename).
+ Default: null
* *ssl.protocol* and *ssl.quorum.protocol* :
(Java system properties: **zookeeper.ssl.protocol** and
**zookeeper.ssl.quorum.protocol**)
@@ -1541,7 +1546,7 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
(Java system properties: **zookeeper.ssl.ciphersuites** and
**zookeeper.ssl.quorum.ciphersuites**)
**New in 3.5.5:**
Specifies the enabled cipher suites to be used in client and quorum TLS
negotiation.
- Default: Enabled cipher suites depend on the Java runtime version being
used.
+ Default: Enabled cipher suites depend on the Java runtime version being
used.
* *ssl.context.supplier.class* and *ssl.quorum.context.supplier.class* :
(Java system properties: **zookeeper.ssl.context.supplier.class** and
**zookeeper.ssl.quorum.context.supplier.class**)
@@ -1586,19 +1591,19 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
Specifies that the client port should accept SSL connections
(using the same configuration as the secure client port).
Default: false
-
+
* *authProvider*:
(Java system property: **zookeeper.authProvider**)
You can specify multiple authentication provider classes for ZooKeeper.
Usually you use this parameter to specify the SASL authentication provider
like:
`authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider`
-
+
* *kerberos.removeHostFromPrincipal*
(Java system property: **zookeeper.kerberos.removeHostFromPrincipal**)
You can instruct ZooKeeper to remove the host from the client principal
name during authentication.
(e.g. the zk/[email protected] client principal will be authenticated in
ZooKeeper as [email protected])
Default: false
-
+
* *kerberos.removeRealmFromPrincipal*
(Java system property: **zookeeper.kerberos.removeRealmFromPrincipal**)
You can instruct ZooKeeper to remove the realm from the client principal
name during authentication.
@@ -1608,9 +1613,9 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
* *multiAddress.enabled* :
(Java system property: **zookeeper.multiAddress.enabled**)
**New in 3.6.0:**
- Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
- for each ZooKeeper server instance (this can increase availability when
multiple physical
- network interfaces can be used parallel in the cluster). Setting this
parameter to
+ Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
+ for each ZooKeeper server instance (this can increase availability when
multiple physical
+ network interfaces can be used parallel in the cluster). Setting this
parameter to
**true** will enable this feature. Please note, that you can not enable
this feature
during a rolling upgrade if the version of the old ZooKeeper cluster is
prior to 3.6.0.
The default value is **false**.
@@ -1618,17 +1623,17 @@ and [SASL authentication for
ZooKeeper](https://cwiki.apache.org/confluence/disp
* *multiAddress.reachabilityCheckTimeoutMs* :
(Java system property:
**zookeeper.multiAddress.reachabilityCheckTimeoutMs**)
**New in 3.6.0:**
- Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
- for each ZooKeeper server instance (this can increase availability when
multiple physical
+ Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
+ for each ZooKeeper server instance (this can increase availability when
multiple physical
network interfaces can be used parallel in the cluster). ZooKeeper will
perform ICMP ECHO requests
- or try to establish a TCP connection on port 7 (Echo) of the destination
host in order to find
+ or try to establish a TCP connection on port 7 (Echo) of the destination
host in order to find
the reachable addresses. This happens only if you provide multiple
addresses in the configuration.
- In this property you can set the timeout in millisecs for the reachability
check. The check happens
+ In this property you can set the timeout in millisecs for the reachability
check. The check happens
in parallel for the different addresses, so the timeout you set here is
the maximum time will be taken
by checking the reachability of all addresses.
The default value is **1000**.
- This parameter has no effect, unless you enable the MultiAddress feature
by setting *multiAddress.enabled=true*.
+ This parameter has no effect, unless you enable the MultiAddress feature
by setting *multiAddress.enabled=true*.
<a name="Experimental+Options%2FFeatures"></a>
@@ -1684,11 +1689,11 @@ the variable does.
* *jute.maxbuffer.extrasize*:
(Java system property: **zookeeper.jute.maxbuffer.extrasize**)
**New in 3.5.7:**
- While processing client requests ZooKeeper server adds some additional
information into
- the requests before persisting it as a transaction. Earlier this
additional information size
+ While processing client requests ZooKeeper server adds some additional
information into
+ the requests before persisting it as a transaction. Earlier this
additional information size
was fixed to 1024 bytes. For many scenarios, specially scenarios where
jute.maxbuffer value
is more than 1 MB and request type is multi, this fixed size was
insufficient.
- To handle all the scenarios additional information size is increased from
1024 byte
+ To handle all the scenarios additional information size is increased from
1024 byte
to same as jute.maxbuffer size and also it is made configurable through
jute.maxbuffer.extrasize.
Generally this property is not required to be configured as default value
is the most optimal value.
@@ -1708,17 +1713,17 @@ the variable does.
* *multiAddress.reachabilityCheckEnabled* :
(Java system property: **zookeeper.multiAddress.reachabilityCheckEnabled**)
**New in 3.6.0:**
- Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
- for each ZooKeeper server instance (this can increase availability when
multiple physical
+ Since ZooKeeper 3.6.0 you can also [specify multiple
addresses](#id_multi_address)
+ for each ZooKeeper server instance (this can increase availability when
multiple physical
network interfaces can be used parallel in the cluster). ZooKeeper will
perform ICMP ECHO requests
- or try to establish a TCP connection on port 7 (Echo) of the destination
host in order to find
+ or try to establish a TCP connection on port 7 (Echo) of the destination
host in order to find
the reachable addresses. This happens only if you provide multiple
addresses in the configuration.
- The reachable check can fail if you hit some ICMP rate-limitation, (e.g.
on MacOS) when you try to
- start a large (e.g. 11+) ensemble members cluster on a single machine for
testing.
-
- Default value is **true**. By setting this parameter to 'false' you can
disable the reachability checks.
- Please note, disabling the reachability check will cause the cluster not
to be able to reconfigure
- itself properly during network problems, so the disabling is advised only
during testing.
+ The reachable check can fail if you hit some ICMP rate-limitation, (e.g.
on MacOS) when you try to
+ start a large (e.g. 11+) ensemble members cluster on a single machine for
testing.
+
+ Default value is **true**. By setting this parameter to 'false' you can
disable the reachability checks.
+ Please note, disabling the reachability check will cause the cluster not
to be able to reconfigure
+ itself properly during network problems, so the disabling is advised only
during testing.
This parameter has no effect, unless you enable the MultiAddress feature
by setting *multiAddress.enabled=true*.
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 9521420..7ea7010 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -232,9 +232,12 @@ public final class ServerMetrics {
NETTY_QUEUED_BUFFER =
metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC);
DIGEST_MISMATCHES_COUNT =
metricsContext.getCounter("digest_mismatches_count");
+
TLS_HANDSHAKE_EXCEEDED =
metricsContext.getCounter("tls_handshake_exceeded");
CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING =
metricsContext.getCounter("cnxn_closed_without_zk_server_running");
+
+ SOCKET_CLOSING_TIME = metricsContext.getSummary("socket_closing_time",
DetailLevel.BASIC);
}
/**
@@ -455,6 +458,8 @@ public final class ServerMetrics {
public final Counter CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING;
+ public final Summary SOCKET_CLOSING_TIME;
+
private final MetricsProvider metricsProvider;
public void resetAll() {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index da5f113..0b807ef 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSocket;
import org.apache.jute.BinaryInputArchive;
@@ -46,10 +47,12 @@ import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.TxnLogEntry;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -86,6 +89,7 @@ public class Learner {
protected Socket sock;
protected MultipleAddresses leaderAddr;
+ protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
/**
* Socket getter
@@ -116,10 +120,14 @@ public class Learner {
public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending";
private static boolean asyncSending =
Boolean.getBoolean(LEARNER_ASYNC_SENDING);
+ public static final String LEARNER_CLOSE_SOCKET_ASYNC =
"learner.closeSocketAsync";
+ public static final boolean closeSocketAsync =
Boolean.getBoolean(LEARNER_CLOSE_SOCKET_ASYNC);
+
static {
LOG.info("leaderConnectDelayDuringRetryMs: {}",
leaderConnectDelayDuringRetryMs);
LOG.info("TCP NoDelay set to: {}", nodelay);
LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+ LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
}
final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new
ConcurrentHashMap<Long, ServerCnxn>();
@@ -337,6 +345,7 @@ public class Learner {
throw new IOException("Failed connect to " + multiAddr);
} else {
sock = socket.get();
+ sockBeingClosed.set(false);
}
self.authLearner.authenticate(sock, hostname);
@@ -847,10 +856,27 @@ public class Learner {
}
void closeSocket() {
+ if (sock != null) {
+ if (sockBeingClosed.compareAndSet(false, true)) {
+ if (closeSocketAsync) {
+ final Thread closingThread = new Thread(() ->
closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
+ closingThread.setDaemon(true);
+ closingThread.start();
+ } else {
+ closeSockSync();
+ }
+ }
+ }
+ }
+
+ void closeSockSync() {
try {
+ long startTime = Time.currentElapsedTime();
if (sock != null) {
sock.close();
+ sock = null;
}
+
ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() -
startTime);
} catch (IOException e) {
LOG.warn("Ignoring error closing connection to leader", e);
}