This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07de52d [pulsar-broker] minor fix for removing atomic-updater at
managed-ledger (#4146)
07de52d is described below
commit 07de52d35f04e0bda71d6cdf79ded591cd9040aa
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Apr 26 14:05:43 2019 -0700
[pulsar-broker] minor fix for removing atomic-updater at managed-ledger
(#4146)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4f72ba3..6086259 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -236,8 +236,6 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
// last read-operation's callback to check read-timeout on it.
- private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl,
ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
- .newUpdater(ManagedLedgerImpl.class,
ReadEntryCallbackWrapper.class, "lastReadCallback");
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
/**
@@ -1592,7 +1590,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback =
ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
position.getEntryId(), callback, readOpCount, createdTime,
ctx);
- LAST_READ_CALLBACK.set(this, readCallback);
+ lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, position, readCallback,
readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
@@ -1607,7 +1605,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback =
ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
- LAST_READ_CALLBACK.set(this, readCallback);
+ lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry,
isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry,
isSlowestReader, opReadEntry, ctx);
@@ -3128,12 +3126,12 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
if (timeoutSec < 1) {
return;
}
- ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
- if (callback != null && callback.isTimedOut(timeoutSec)) {
- log.warn("[{}]-{} read entry timeout for {} after {} sec",
this.name, callback.ledgerId, callback.entryId,
- timeoutSec);
-
callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
callback.readOpCount);
- LAST_READ_CALLBACK.set(this, null);
+ if (this.lastReadCallback != null &&
this.lastReadCallback.isTimedOut(timeoutSec)) {
+ log.warn("[{}]-{} read entry timeout for {} after {} sec",
this.name, this.lastReadCallback.ledgerId,
+ this.lastReadCallback.entryId, timeoutSec);
+
this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
+ this.lastReadCallback.readOpCount);
+ lastReadCallback = null;
}
}