There were some recent discussions about the proposed mechanism to cancel an RPC in mid-transmission. I am writing this email to summarize the problem cancellation is trying to solve and some proposed solutions.
As part of IMPALA-2567 <https://issues.apache.org/jira/browse/IMPALA-2567>, Impala is porting its RPC facility to utilize the Kudu RPC library. One major feature of KRPC is asynchronous RPC. This prevents the need for Impala to create a separate thread for each communication channel between fragment instances running on different Impala backends. When an asynchronous RPC is in progress, the KRPC code needs to hold on to the RPC payload provided and owned by a fragment instance. We have a singleton messenger instance per Impalad node. The connection between two hosts is shared by different fragment instances. At any point when the RPC is in-progress, the coordinator can cancel all fragment instances of a query for various reasons. In which case, the fragment execution threads need to cancel their in-flight RPC and free up the RPC payloads (e.g. which are passed as sidecars of the RPC request) within a reasonable amount of time (e.g. 500ms). Currently in the KRPC code, when a cancellation is requested when an outbound call is in SENDING state (i.e. it has potentially sent some or the whole header to the remote server), it will wait for the entire payload to be sent before invoking the completion callback specified in the async RPC request. At which point, the KRPC client can assume that the RPC payload is no longer referenced by the KRPC code and it can be freed. When the network connections between Impalad backends is functioning well, it shouldn't be long before the payload is done sending as Impalad internally tries to bound the payload size to 8MB (but not necessarily guaranteed in some degenerated case e.g. row with 1GB string). However, when the network gets congested or is very lossy, there is in theory no bound on how long it takes to finish sending the payload. In other words, the cancellation of the RPC can take unbounded amount of time to complete and thus the cancellation of a fragment instance can also take infinitely long. This is a chicken-and-egg problem. Usually, the user cancels the query because the poor network performance caused it to run for too long. With the unbounded wait, the user cannot even cancel the query within a bounded period of time. Similar problem arises when an RPC times out. As the KRPC code stands today, there is no guarantee that the payload is not referenced by the KRPC code when the completion callback is invoked due to timeout. The client doesn't get any notification on when the payload can be freed. In theory, this can be solved by deferring the callback until the payload has been completely sent. However, this may not always be desirable due to the unbounded wait mentioned above. For example, if a client requests a timeout of 60s, it's not desirable to invoke the callback say 300s after the timeout. If you agree that the above are indeed problems, we need a RPC cancellation mechanism which meets the following requirements: - the RPC payload is no longer referenced by the KRPC code when the completion callback is invoked. - the time between the cancellation request and the invocation of the completion callback should be reasonably bounded. The followings are two different proposals floated: 1. Upgrade the KRPC protocol to insert a footer at the very end of each outbound call transfer (i.e. after header, serialized protobuf request, sidecars etc). The footer contains a flag which is parsed by the server to determine whether the inbound call should be discarded. Normally, this flag will be cleared. When an outbound call is cancelled while it's still in the SENDING state, the footer is modified in place to have the flag set. In addition, the sidecars of the outbound call transfers are modified to point to a dummy buffer. In this way, the slices in an outbound transfer no longer references the original payload of the RPC and the footer guarantees that whatever is received by the server will be discarded so it's okay to replace the original payload with dummy bytes. This allows the completion callback to be invoked without waiting for the entire payload to be sent. Pros: - relatively low overhead in network consumption (1 byte per outbound call transfer) - cancellation is complete once the reactor thread handles the cancellation event and is not reliant on the network condition. Cons: - the remaining bytes of the payload still need to be sent across a congested network after cancellation. - added complexity to the protocol. The details of the proposal is in KUDU-2065 <https://issues.apache.org/jira/browse/KUDU-2065>. The patch which implemented this idea is at https://gerrit.cloudera.org/#/c/7599/ It was later reverted due to KUDU-2110 <https://issues.apache.org/jira/browse/KUDU-2110>. 2. Keep the KRPC protocol unchanged. When an outbound call is cancelled when it's in SENDING state, register a timer with a timeout (e.g. 500ms). When the timer expires, invoke a callback to call shutdown(fd, SHUT_WR) on the socket. This shuts down one-half of the connection and prevents more bytes from being sent while allowing bytes to be read on the client side. recv() will eventually return 0 on the server side. After all other in-flight RPCs for that connection have been replied to, the server should close the connection. The client, after reading all the replies, should also destroy the Connection object. All RPCs behind the cancelled the RPC in the outbound queue on the client side are aborted and need to be retried. Pro: - stop piling on a congested network after cancelling an RPC. - cancellation is still bounded in time. Con: - For Impalad, each host may be connected to all other hosts in the cluster. In the worst case, this means closing all n connections. In total, that could be n^2 connection being re-established concurrently. This can easily trigger negotiation timeout in a secure cluster based on our testing on bolt1k cluster, leading to unpredictable failures. - The RPCs behind the cancelled RPC in the outbound queue cannot be retried until all previous in-flight RPCs have been replied to. This can be unboundedly long if the remote server is overloaded. Please feel free to comment on the proposals above or suggest new ideas. -- Thanks, Michael
