[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-4424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated ZOOKEEPER-4424:
--------------------------------------
    Labels: pull-request-available  (was: )

> Re-throwing IOException in 
> Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler#acceptConnections is not 
> always needed
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: ZOOKEEPER-4424
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4424
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: server
>    Affects Versions: 3.6.2
>            Reporter: Haoze Wu
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new 
> socket connection, it may throw an IOException at line 510 or 514 or 515 or 
> 517.
> The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and 
> [https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency 
> bug. However, If the IOException occurs at line 514 or 515 or 517, actually 
> we can avoid this complicated process. We can simply catch the IOException 
> and proceed to accept the next socket connection, without exiting the 
> LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g., 
> `setSoTimeout`) only indicates that the socket connection has some issues, 
> but the `ServerSocket` can still work well. Therefore, the 
> `LearnerCnxAcceptorHandler` can proceed to accept more socket connections 
> with this `ServerSocket`.
> {code:java}
> //zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
>             private void acceptConnections() throws IOException {
>                 Socket socket = null;
>                 boolean error = false;
>                 try {
>                     socket = serverSocket.accept();  // line 510
>                     // start with the initLimit, once the ack is processed
>                     // in LearnerHandler switch to the syncLimit
>                     socket.setSoTimeout(self.tickTime * self.initLimit); // 
> line 514
>                     socket.setTcpNoDelay(nodelay);  // line 515
>                     BufferedInputStream is = new 
> BufferedInputStream(socket.getInputStream());  // line 517
>                     LearnerHandler fh = new LearnerHandler(socket, is, 
> Leader.this);
>                     fh.start();
>                 } catch (SocketException e) {
>                     error = true;
>                     if (stop.get()) {
>                         LOG.warn("Exception while shutting down acceptor.", 
> e);
>                     } else {
>                         throw e;
>                     }
>                 } catch (SaslException e) {
>                     LOG.error("Exception while connecting to quorum learner", 
> e);
>                     error = true;
>                 } catch (Exception e) {
>                     error = true;
>                     throw e;
>                 } finally {
>                     // Don't leak sockets on errors
>                     if (error && socket != null && !socket.isClosed()) {
>                         try {
>                             socket.close();
>                         } catch (IOException e) {
>                             LOG.warn("Error closing socket: " + socket, e);
>                         }
>                     }
>                 }
>             }
> {code}
> We propose that the following implementation is better. The advantage is that 
> those IOException in the socket will not force the 
> `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid 
> the overhead of re-election and potential concurrency bugs such as 
> ZOOKEEPER-4203.
> {code:java}
>             private void acceptConnections() throws IOException {
>                 Socket socket = null;
>                 boolean error = false;
>                 try {
>                     socket = serverSocket.accept();
>                     BufferedInputStream is;
>                     try {
>                         // start with the initLimit, once the ack is processed
>                         // in LearnerHandler switch to the syncLimit
>                         socket.setSoTimeout(self.tickTime * self.initLimit);
>                         socket.setTcpNoDelay(nodelay);
>                         is = new BufferedInputStream(socket.getInputStream());
>                     } catch (IOException e) {
>                         error = true;
>                         return;  // close the socket at the finally block
>                     }
>                     LearnerHandler fh = new LearnerHandler(socket, is, 
> Leader.this);
>                     fh.start();
>                 } catch (SocketException e) {
>                     error = true;
>                     if (stop.get()) {
>                         LOG.warn("Exception while shutting down acceptor.", 
> e);
>                     } else {
>                         throw e;
>                     }
>                 } catch (SaslException e) {
>                     LOG.error("Exception while connecting to quorum learner", 
> e);
>                 } catch (Exception e) {
>                     error = true;
>                     throw e;
>                 } finally {
>                     // Don't leak sockets on errors
>                     if (error && socket != null && !socket.isClosed()) {
>                         try {
>                             socket.close();
>                         } catch (IOException e) {
>                             LOG.warn("Error closing socket: " + socket, e);
>                         }
>                     }
>                 }
>             }
> {code}
> This code pattern has been adopted by other communities. For example, in 
> Kafka 
> [https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]
>  :
> {code:java}
>   /**
>    * Accept a new connection
>    */
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       configureAcceptedSocketChannel(socketChannel)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>
>         info(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException =>
>         // ...
>         None
>       case e: IOException =>
>         error(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>     }
>   }
>   /**
>    * Close `channel` and decrement the connection count.
>    */
>   def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
>     if (channel != null) {
>       // ...
>       closeSocket(channel)
>     }
>   }
>   protected def closeSocket(channel: SocketChannel): Unit = {
>     CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
>     CoreUtils.swallow(channel.close(), this, Level.ERROR)
>   } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to