This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 6d44737e7 [CELEBORN-1275] Fix bug that callback function may hang when 
unchecked exception missed
6d44737e7 is described below

commit 6d44737e70b21a472f35d85a7e67671093230d84
Author: Fei Wang <[email protected]>
AuthorDate: Fri Feb 23 10:27:54 2024 +0800

    [CELEBORN-1275] Fix bug that callback function may hang when unchecked 
exception missed
    
    ### What changes were proposed in this pull request?
    Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) / 
https://github.com/apache/spark/pull/24964
    ByteBuffer.allocate may throw OutOfMemoryError when the response is large 
but no enough memory is available. However, when this happens, 
TransportClient.sendRpcSync will just hang forever if the timeout set to 
unlimited.
    
    ### Why are the changes needed?
    To catch the exception of `ByteBuffer.allocate` in corner case.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Quote the local test in https://github.com/apache/spark/pull/24964
    ```
    I tested in my IDE by setting the value of size to -1 to verify the result. 
Without this patch, it won't be finished until timeout (May hang forever if 
timeout set to MAX_INT), or the expected IllegalArgumentException will be 
caught.
    
          Override
          public void onSuccess(ByteBuffer response) {
            try {
              int size = response.remaining();
              ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in 
runtime when debug
              copy.put(response);
              // flip "copy" to make it readable
              copy.flip();
              result.set(copy);
            } catch (Throwable t) {
              result.setException(t);
            }
          }
    ```
    
    Closes #2316 from turboFei/fix_transport_client_onsucess.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: chenfu <[email protected]>
    (cherry picked from commit 387bffc0a37f0dceac8ef8b613c48b03d8247480)
    Signed-off-by: chenfu <[email protected]>
---
 .../celeborn/common/network/client/TransportClient.java   | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index b5796fe98..7054c2f93 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -261,11 +261,16 @@ public class TransportClient implements Closeable {
         new RpcResponseCallback() {
           @Override
           public void onSuccess(ByteBuffer response) {
-            ByteBuffer copy = ByteBuffer.allocate(response.remaining());
-            copy.put(response);
-            // flip "copy" to make it readable
-            copy.flip();
-            result.set(copy);
+            try {
+              ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+              copy.put(response);
+              // flip "copy" to make it readable
+              copy.flip();
+              result.set(copy);
+            } catch (Throwable t) {
+              logger.warn("Error in responding RPC callback", t);
+              result.setException(t);
+            }
           }
 
           @Override

Reply via email to