[
https://issues.apache.org/jira/browse/ZOOKEEPER-4424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated ZOOKEEPER-4424:
--------------------------------------
Labels: pull-request-available (was: )
> Re-throwing IOException in
> Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler#acceptConnections is not
> always needed
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: ZOOKEEPER-4424
> URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4424
> Project: ZooKeeper
> Issue Type: Bug
> Components: server
> Affects Versions: 3.6.2
> Reporter: Haoze Wu
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new
> socket connection, it may throw an IOException at line 510 or 514 or 515 or
> 517.
> The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and
> [https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency
> bug. However, If the IOException occurs at line 514 or 515 or 517, actually
> we can avoid this complicated process. We can simply catch the IOException
> and proceed to accept the next socket connection, without exiting the
> LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g.,
> `setSoTimeout`) only indicates that the socket connection has some issues,
> but the `ServerSocket` can still work well. Therefore, the
> `LearnerCnxAcceptorHandler` can proceed to accept more socket connections
> with this `ServerSocket`.
> {code:java}
> //zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
> private void acceptConnections() throws IOException {
> Socket socket = null;
> boolean error = false;
> try {
> socket = serverSocket.accept(); // line 510
> // start with the initLimit, once the ack is processed
> // in LearnerHandler switch to the syncLimit
> socket.setSoTimeout(self.tickTime * self.initLimit); //
> line 514
> socket.setTcpNoDelay(nodelay); // line 515
> BufferedInputStream is = new
> BufferedInputStream(socket.getInputStream()); // line 517
> LearnerHandler fh = new LearnerHandler(socket, is,
> Leader.this);
> fh.start();
> } catch (SocketException e) {
> error = true;
> if (stop.get()) {
> LOG.warn("Exception while shutting down acceptor.",
> e);
> } else {
> throw e;
> }
> } catch (SaslException e) {
> LOG.error("Exception while connecting to quorum learner",
> e);
> error = true;
> } catch (Exception e) {
> error = true;
> throw e;
> } finally {
> // Don't leak sockets on errors
> if (error && socket != null && !socket.isClosed()) {
> try {
> socket.close();
> } catch (IOException e) {
> LOG.warn("Error closing socket: " + socket, e);
> }
> }
> }
> }
> {code}
> We propose that the following implementation is better. The advantage is that
> those IOException in the socket will not force the
> `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid
> the overhead of re-election and potential concurrency bugs such as
> ZOOKEEPER-4203.
> {code:java}
> private void acceptConnections() throws IOException {
> Socket socket = null;
> boolean error = false;
> try {
> socket = serverSocket.accept();
> BufferedInputStream is;
> try {
> // start with the initLimit, once the ack is processed
> // in LearnerHandler switch to the syncLimit
> socket.setSoTimeout(self.tickTime * self.initLimit);
> socket.setTcpNoDelay(nodelay);
> is = new BufferedInputStream(socket.getInputStream());
> } catch (IOException e) {
> error = true;
> return; // close the socket at the finally block
> }
> LearnerHandler fh = new LearnerHandler(socket, is,
> Leader.this);
> fh.start();
> } catch (SocketException e) {
> error = true;
> if (stop.get()) {
> LOG.warn("Exception while shutting down acceptor.",
> e);
> } else {
> throw e;
> }
> } catch (SaslException e) {
> LOG.error("Exception while connecting to quorum learner",
> e);
> } catch (Exception e) {
> error = true;
> throw e;
> } finally {
> // Don't leak sockets on errors
> if (error && socket != null && !socket.isClosed()) {
> try {
> socket.close();
> } catch (IOException e) {
> LOG.warn("Error closing socket: " + socket, e);
> }
> }
> }
> }
> {code}
> This code pattern has been adopted by other communities. For example, in
> Kafka
> [https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]
> :
> {code:java}
> /**
> * Accept a new connection
> */
> private def accept(key: SelectionKey): Option[SocketChannel] = {
> val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
> val socketChannel = serverSocketChannel.accept()
> try {
> connectionQuotas.inc(endPoint.listenerName,
> socketChannel.socket.getInetAddress, blockedPercentMeter)
> configureAcceptedSocketChannel(socketChannel)
> Some(socketChannel)
> } catch {
> case e: TooManyConnectionsException =>
> info(...)
> close(endPoint.listenerName, socketChannel)
> None
> case e: ConnectionThrottledException =>
> // ...
> None
> case e: IOException =>
> error(...)
> close(endPoint.listenerName, socketChannel)
> None
> }
> }
> /**
> * Close `channel` and decrement the connection count.
> */
> def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
> if (channel != null) {
> // ...
> closeSocket(channel)
> }
> }
> protected def closeSocket(channel: SocketChannel): Unit = {
> CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
> CoreUtils.swallow(channel.close(), this, Level.ERROR)
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)