This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b2baef5e6 [#1572] fix(spark): Exceptions might be discarded when 
spilling buffers (#1573)
b2baef5e6 is described below

commit b2baef5e6b43fb8c5ccde5d509b2195b94224a16
Author: RickyMa <[email protected]>
AuthorDate: Thu Mar 14 09:33:42 2024 +0800

    [#1572] fix(spark): Exceptions might be discarded when spilling buffers 
(#1573)
    
    ### What changes were proposed in this pull request?
    
    Handle all the exceptions when spilling buffers.
    Let users know when a TimeoutException occurs.
    
    ### Why are the changes needed?
    
    Fix https://github.com/apache/incubator-uniffle/issues/1572.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java  | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 6c4c41af4..d8261047f 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -482,6 +482,9 @@ public class WriteBufferManager extends MemoryConsumer {
     } catch (TimeoutException timeoutException) {
       // A best effort strategy to wait.
       // If timeout exception occurs, the underlying tasks won't be cancelled.
+      LOG.warn("[taskId: {}] Spill tasks timeout after {} seconds", taskId, 
memorySpillTimeoutSec);
+    } catch (Exception e) {
+      LOG.warn("[taskId: {}] Failed to spill buffers due to ", taskId, e);
     } finally {
       long releasedSize =
           futures.stream()

Reply via email to