This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 6d44737e7 [CELEBORN-1275] Fix bug that callback function may hang when
unchecked exception missed
6d44737e7 is described below
commit 6d44737e70b21a472f35d85a7e67671093230d84
Author: Fei Wang <[email protected]>
AuthorDate: Fri Feb 23 10:27:54 2024 +0800
[CELEBORN-1275] Fix bug that callback function may hang when unchecked
exception missed
### What changes were proposed in this pull request?
Refer: [SPARK-28160](https://issues.apache.org/jira/browse/SPARK-28160) /
https://github.com/apache/spark/pull/24964
ByteBuffer.allocate may throw OutOfMemoryError when the response is large
but no enough memory is available. However, when this happens,
TransportClient.sendRpcSync will just hang forever if the timeout set to
unlimited.
### Why are the changes needed?
To catch the exception of `ByteBuffer.allocate` in corner case.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Quote the local test in https://github.com/apache/spark/pull/24964
```
I tested in my IDE by setting the value of size to -1 to verify the result.
Without this patch, it won't be finished until timeout (May hang forever if
timeout set to MAX_INT), or the expected IllegalArgumentException will be
caught.
Override
public void onSuccess(ByteBuffer response) {
try {
int size = response.remaining();
ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in
runtime when debug
copy.put(response);
// flip "copy" to make it readable
copy.flip();
result.set(copy);
} catch (Throwable t) {
result.setException(t);
}
}
```
Closes #2316 from turboFei/fix_transport_client_onsucess.
Authored-by: Fei Wang <[email protected]>
Signed-off-by: chenfu <[email protected]>
(cherry picked from commit 387bffc0a37f0dceac8ef8b613c48b03d8247480)
Signed-off-by: chenfu <[email protected]>
---
.../celeborn/common/network/client/TransportClient.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index b5796fe98..7054c2f93 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -261,11 +261,16 @@ public class TransportClient implements Closeable {
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- ByteBuffer copy = ByteBuffer.allocate(response.remaining());
- copy.put(response);
- // flip "copy" to make it readable
- copy.flip();
- result.set(copy);
+ try {
+ ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+ copy.put(response);
+ // flip "copy" to make it readable
+ copy.flip();
+ result.set(copy);
+ } catch (Throwable t) {
+ logger.warn("Error in responding RPC callback", t);
+ result.setException(t);
+ }
}
@Override