This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-4.11
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.11 by this push:
new 588c0d4 fix: BKAsyncLogWriter swallows rootcause of the WriteException
588c0d4 is described below
commit 588c0d46bec78ad1e01f9b182073a76dc88fb85c
Author: Andrey Yegorov <[email protected]>
AuthorDate: Tue Feb 2 23:30:11 2021 -0800
fix: BKAsyncLogWriter swallows rootcause of the WriteException
Descriptions of the changes in this PR:
Added first exception as a rootcause for the WriteException
### Motivation
simplify troubleshooting
### Changes
Added first exception as a rootcause for the WriteException
Master Issue: #2574
Reviewers: Enrico Olivelli <[email protected]>
This closes #2575 from dlg99/master-alw-exception
(cherry picked from commit 2a6f718140eb01ae426856361dbbdcf80d4755b3)
Signed-off-by: Enrico Olivelli <[email protected]>
---
.../main/java/org/apache/distributedlog/BKAsyncLogWriter.java | 10 +++++++---
.../org/apache/distributedlog/exceptions/WriteException.java | 5 +++++
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
index 18e6757..780107e 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -91,6 +92,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter implements
AsyncLogWriter {
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
+ firstEncounteredError.compareAndSet(null, cause);
encounteredError = true;
}
}
@@ -126,6 +128,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter
implements AsyncLogWriter {
private final boolean disableRollOnSegmentError;
private LinkedList<PendingLogRecord> pendingRequests = null;
private volatile boolean encounteredError = false;
+ private final AtomicReference<Throwable> firstEncounteredError = new
AtomicReference<>(null);
private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
private long lastTxId = DistributedLogConstants.INVALID_TXID;
@@ -187,7 +190,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter
implements AsyncLogWriter {
private BKLogSegmentWriter getCachedLogSegmentWriter() throws
WriteException {
if (encounteredError) {
throw new WriteException(bkDistributedLogManager.getStreamName(),
- "writer has been closed due to error.");
+ "writer has been closed due to error.",
firstEncounteredError.get());
}
BKLogSegmentWriter segmentWriter = getCachedLogWriter();
if (null != segmentWriter
@@ -216,7 +219,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter
implements AsyncLogWriter {
final boolean
allowMaxTxID) {
if (encounteredError) {
return FutureUtils.exception(new
WriteException(bkDistributedLogManager.getStreamName(),
- "writer has been closed due to error."));
+ "writer has been closed due to error.",
firstEncounteredError.get()));
}
CompletableFuture<BKLogSegmentWriter> writerFuture =
asyncGetLedgerWriter(!disableRollOnSegmentError);
if (null == writerFuture) {
@@ -383,6 +386,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter
implements AsyncLogWriter {
final List<PendingLogRecord> pendingRequestsSnapshot;
synchronized (this) {
pendingRequestsSnapshot = pendingRequests;
+ firstEncounteredError.compareAndSet(null, cause);
encounteredError = errorOutWriter;
pendingRequests = null;
if (null != rollingFuture) {
@@ -517,7 +521,7 @@ class BKAsyncLogWriter extends BKAbstractLogWriter
implements AsyncLogWriter {
for (PendingLogRecord pendingLogRecord : pendingRequests) {
pendingLogRecord.promise
.completeExceptionally(new
WriteException(bkDistributedLogManager.getStreamName(),
- "abort wring: writer has been closed due to
error."));
+ "abort writing: writer has been closed due to
error."));
}
}
}
diff --git
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
index 1d9c2a9..549fd41 100644
---
a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
+++
b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
@@ -28,4 +28,9 @@ public class WriteException extends DLException {
super(StatusCode.WRITE_EXCEPTION,
"Write rejected because stream " + stream + " has encountered an
error : " + transmitError);
}
+
+ public WriteException(String stream, String transmitError, Throwable
cause) {
+ super(StatusCode.WRITE_EXCEPTION,
+ "Write rejected because stream " + stream + " has encountered
an error : " + transmitError, cause);
+ }
}