[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-4424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated ZOOKEEPER-4424:
--------------------------------
    Description: 
When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new 
socket connection, it may throw an IOException at line 510 or 514 or 515 or 517.

The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and 
[https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency 
bug. However, If the IOException occurs at line 514 or 515 or 517, actually we 
can avoid this complicated process. We can simply catch the IOException and 
proceed to accept the next socket connection, without exiting the 
LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g., 
`setSoTimeout`) only indicates that the socket connection has some issues, but 
the `ServerSocket` can still work well. Therefore, the 
`LearnerCnxAcceptorHandler` can proceed to accept more socket connections with 
this `ServerSocket`.
{code:java}
//zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
            private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                    socket = serverSocket.accept();  // line 510


                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    socket.setSoTimeout(self.tickTime * self.initLimit); // 
line 514
                    socket.setTcpNoDelay(nodelay);  // line 515

                    BufferedInputStream is = new 
BufferedInputStream(socket.getInputStream());  // line 517
                    LearnerHandler fh = new LearnerHandler(socket, is, 
Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", 
e);
                    error = true;
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
{code}
We propose that the following implementation is better. The advantage is that 
those IOException in the socket will not force the 
`Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid 
the overhead of re-election and potential concurrency bugs such as 
ZOOKEEPER-4203.
{code:java}
            private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                    socket = serverSocket.accept();
                    BufferedInputStream is;

                    try {
                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        socket.setSoTimeout(self.tickTime * self.initLimit);
                        socket.setTcpNoDelay(nodelay);

                        is = new BufferedInputStream(socket.getInputStream());
                    } catch (IOException e) {
                        error = true;
                        return;  // close the socket at the finally block
                    }

                    LearnerHandler fh = new LearnerHandler(socket, is, 
Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", 
e);
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
{code}
This code pattern has been adopted by other communities. For example, in Kafka 
[https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]
 :
{code:java}
  /**
   * Accept a new connection
   */
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      configureAcceptedSocketChannel(socketChannel)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info(...)
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException =>
        // ...
        None
      case e: IOException =>
        error(...)
        close(endPoint.listenerName, socketChannel)
        None
    }
  }

  /**
   * Close `channel` and decrement the connection count.
   */
  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
    if (channel != null) {
      // ...
      closeSocket(channel)
    }
  }

  protected def closeSocket(channel: SocketChannel): Unit = {
    CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
    CoreUtils.swallow(channel.close(), this, Level.ERROR)
  } {code}

  was:
When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new 
socket connection, it may throw an IOException at line 510 or 514 or 515 or 517.

The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and 
[https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency 
bug. However, If the IOException occurs at line 514 or 515 or 517, actually we 
can avoid this complicated process. We can simply catch the IOException and 
proceed to accept the next socket connection, without exiting the 
LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g., 
`setSoTimeout`) only indicates that the socket connection has some issues, but 
the `ServerSocket` can still work well. Therefore, the 
`LearnerCnxAcceptorHandler` can proceed to accept more socket connections with 
this `ServerSocket`.
{code:java}
//zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
            private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                    socket = serverSocket.accept();  // line 510


                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    socket.setSoTimeout(self.tickTime * self.initLimit); // 
line 514
                    socket.setTcpNoDelay(nodelay);  // line 515

                    BufferedInputStream is = new 
BufferedInputStream(socket.getInputStream());  // line 517
                    LearnerHandler fh = new LearnerHandler(socket, is, 
Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", 
e);
                    error = true;
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
{code}
We propose that the following implementation is better. The advantage is that 
those IOException in the socket will not force the 
`Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid 
the overhead of re-election and potential concurrency bugs such as 
ZOOKEEPER-4203.
{code:java}
            private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                    socket = serverSocket.accept();
                    try {
                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        socket.setSoTimeout(self.tickTime * self.initLimit);
                        socket.setTcpNoDelay(nodelay);

                        BufferedInputStream is = new 
BufferedInputStream(socket.getInputStream());
                    } catch (IOException e) {
                        error = true;
                        return;  // close the socket at the finally block
                    }

                    LearnerHandler fh = new LearnerHandler(socket, is, 
Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", 
e);
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
{code}
This code pattern has been adopted by other communities. For example, in Kafka 
https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740
 :
{code:java}
  /**
   * Accept a new connection
   */
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      configureAcceptedSocketChannel(socketChannel)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info(...)
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException =>
        // ...
        None
      case e: IOException =>
        error(...)
        close(endPoint.listenerName, socketChannel)
        None
    }
  }

  /**
   * Close `channel` and decrement the connection count.
   */
  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
    if (channel != null) {
      // ...
      closeSocket(channel)
    }
  }

  protected def closeSocket(channel: SocketChannel): Unit = {
    CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
    CoreUtils.swallow(channel.close(), this, Level.ERROR)
  } {code}


> Re-throwing IOException in 
> Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler#acceptConnections is not 
> always needed
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: ZOOKEEPER-4424
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4424
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: server
>    Affects Versions: 3.6.2
>            Reporter: Haoze Wu
>            Priority: Major
>
> When `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` is accepting a new 
> socket connection, it may throw an IOException at line 510 or 514 or 515 or 
> 517.
> The scenario of IOException at line 510 is discussed in ZOOKEEPER-4203 and 
> [https://github.com/apache/zookeeper/pull/1596] . It triggers a concurrency 
> bug. However, If the IOException occurs at line 514 or 515 or 517, actually 
> we can avoid this complicated process. We can simply catch the IOException 
> and proceed to accept the next socket connection, without exiting the 
> LearnerCnxAcceptorHandler thread. The exceptions in those operations (e.g., 
> `setSoTimeout`) only indicates that the socket connection has some issues, 
> but the `ServerSocket` can still work well. Therefore, the 
> `LearnerCnxAcceptorHandler` can proceed to accept more socket connections 
> with this `ServerSocket`.
> {code:java}
> //zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
>             private void acceptConnections() throws IOException {
>                 Socket socket = null;
>                 boolean error = false;
>                 try {
>                     socket = serverSocket.accept();  // line 510
>                     // start with the initLimit, once the ack is processed
>                     // in LearnerHandler switch to the syncLimit
>                     socket.setSoTimeout(self.tickTime * self.initLimit); // 
> line 514
>                     socket.setTcpNoDelay(nodelay);  // line 515
>                     BufferedInputStream is = new 
> BufferedInputStream(socket.getInputStream());  // line 517
>                     LearnerHandler fh = new LearnerHandler(socket, is, 
> Leader.this);
>                     fh.start();
>                 } catch (SocketException e) {
>                     error = true;
>                     if (stop.get()) {
>                         LOG.warn("Exception while shutting down acceptor.", 
> e);
>                     } else {
>                         throw e;
>                     }
>                 } catch (SaslException e) {
>                     LOG.error("Exception while connecting to quorum learner", 
> e);
>                     error = true;
>                 } catch (Exception e) {
>                     error = true;
>                     throw e;
>                 } finally {
>                     // Don't leak sockets on errors
>                     if (error && socket != null && !socket.isClosed()) {
>                         try {
>                             socket.close();
>                         } catch (IOException e) {
>                             LOG.warn("Error closing socket: " + socket, e);
>                         }
>                     }
>                 }
>             }
> {code}
> We propose that the following implementation is better. The advantage is that 
> those IOException in the socket will not force the 
> `Leader$LearnerCnxAcceptor$LearnerCnxAcceptorHandler` to exit, and thus avoid 
> the overhead of re-election and potential concurrency bugs such as 
> ZOOKEEPER-4203.
> {code:java}
>             private void acceptConnections() throws IOException {
>                 Socket socket = null;
>                 boolean error = false;
>                 try {
>                     socket = serverSocket.accept();
>                     BufferedInputStream is;
>                     try {
>                         // start with the initLimit, once the ack is processed
>                         // in LearnerHandler switch to the syncLimit
>                         socket.setSoTimeout(self.tickTime * self.initLimit);
>                         socket.setTcpNoDelay(nodelay);
>                         is = new BufferedInputStream(socket.getInputStream());
>                     } catch (IOException e) {
>                         error = true;
>                         return;  // close the socket at the finally block
>                     }
>                     LearnerHandler fh = new LearnerHandler(socket, is, 
> Leader.this);
>                     fh.start();
>                 } catch (SocketException e) {
>                     error = true;
>                     if (stop.get()) {
>                         LOG.warn("Exception while shutting down acceptor.", 
> e);
>                     } else {
>                         throw e;
>                     }
>                 } catch (SaslException e) {
>                     LOG.error("Exception while connecting to quorum learner", 
> e);
>                 } catch (Exception e) {
>                     error = true;
>                     throw e;
>                 } finally {
>                     // Don't leak sockets on errors
>                     if (error && socket != null && !socket.isClosed()) {
>                         try {
>                             socket.close();
>                         } catch (IOException e) {
>                             LOG.warn("Error closing socket: " + socket, e);
>                         }
>                     }
>                 }
>             }
> {code}
> This code pattern has been adopted by other communities. For example, in 
> Kafka 
> [https://github.com/apache/kafka/blob/2cd96f0e64f8a4f4b74e8049a6c527a990cb4777/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]
>  :
> {code:java}
>   /**
>    * Accept a new connection
>    */
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       configureAcceptedSocketChannel(socketChannel)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>
>         info(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException =>
>         // ...
>         None
>       case e: IOException =>
>         error(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>     }
>   }
>   /**
>    * Close `channel` and decrement the connection count.
>    */
>   def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
>     if (channel != null) {
>       // ...
>       closeSocket(channel)
>     }
>   }
>   protected def closeSocket(channel: SocketChannel): Unit = {
>     CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
>     CoreUtils.swallow(channel.close(), this, Level.ERROR)
>   } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to