This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-4.11
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.11 by this push:
     new 588c0d4  fix: BKAsyncLogWriter swallows rootcause of the WriteException
588c0d4 is described below

commit 588c0d46bec78ad1e01f9b182073a76dc88fb85c
Author: Andrey Yegorov <[email protected]>
AuthorDate: Tue Feb 2 23:30:11 2021 -0800

    fix: BKAsyncLogWriter swallows rootcause of the WriteException
    
    Descriptions of the changes in this PR:
    
    Added first exception as a rootcause for the WriteException
    
    ### Motivation
    
    simplify troubleshooting
    
    ### Changes
    
    Added first exception as a rootcause for the WriteException
    
    Master Issue: #2574
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #2575 from dlg99/master-alw-exception
    
    (cherry picked from commit 2a6f718140eb01ae426856361dbbdcf80d4755b3)
    Signed-off-by: Enrico Olivelli <[email protected]>
---
 .../main/java/org/apache/distributedlog/BKAsyncLogWriter.java  | 10 +++++++---
 .../org/apache/distributedlog/exceptions/WriteException.java   |  5 +++++
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index 18e6757..780107e 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -91,6 +92,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements 
AsyncLogWriter {
         @Override
         public void onFailure(Throwable cause) {
             promise.completeExceptionally(cause);
+            firstEncounteredError.compareAndSet(null, cause);
             encounteredError = true;
         }
     }
@@ -126,6 +128,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
     private final boolean disableRollOnSegmentError;
     private LinkedList<PendingLogRecord> pendingRequests = null;
     private volatile boolean encounteredError = false;
+    private final AtomicReference<Throwable> firstEncounteredError = new 
AtomicReference<>(null);
     private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
     private long lastTxId = DistributedLogConstants.INVALID_TXID;
 
@@ -187,7 +190,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
     private BKLogSegmentWriter getCachedLogSegmentWriter() throws 
WriteException {
         if (encounteredError) {
             throw new WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error.");
+                    "writer has been closed due to error.", 
firstEncounteredError.get());
         }
         BKLogSegmentWriter segmentWriter = getCachedLogWriter();
         if (null != segmentWriter
@@ -216,7 +219,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
                                                              final boolean 
allowMaxTxID) {
         if (encounteredError) {
             return FutureUtils.exception(new 
WriteException(bkDistributedLogManager.getStreamName(),
-                    "writer has been closed due to error."));
+                    "writer has been closed due to error.", 
firstEncounteredError.get()));
         }
         CompletableFuture<BKLogSegmentWriter> writerFuture = 
asyncGetLedgerWriter(!disableRollOnSegmentError);
         if (null == writerFuture) {
@@ -383,6 +386,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
         final List<PendingLogRecord> pendingRequestsSnapshot;
         synchronized (this) {
             pendingRequestsSnapshot = pendingRequests;
+            firstEncounteredError.compareAndSet(null, cause);
             encounteredError = errorOutWriter;
             pendingRequests = null;
             if (null != rollingFuture) {
@@ -517,7 +521,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter 
implements AsyncLogWriter {
                 for (PendingLogRecord pendingLogRecord : pendingRequests) {
                     pendingLogRecord.promise
                             .completeExceptionally(new 
WriteException(bkDistributedLogManager.getStreamName(),
-                            "abort wring: writer has been closed due to 
error."));
+                            "abort writing: writer has been closed due to 
error."));
                 }
             }
         }
diff --git 
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
 
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
index 1d9c2a9..549fd41 100644
--- 
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
+++ 
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
@@ -28,4 +28,9 @@ public class WriteException extends DLException {
         super(StatusCode.WRITE_EXCEPTION,
             "Write rejected because stream " + stream + " has encountered an 
error : " + transmitError);
     }
+
+    public WriteException(String stream, String transmitError, Throwable 
cause) {
+        super(StatusCode.WRITE_EXCEPTION,
+                "Write rejected because stream " + stream + " has encountered 
an error : " + transmitError, cause);
+    }
 }

Reply via email to