This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec1966 Disallow direct access to LedgerHandle#metadata
7ec1966 is described below
commit 7ec196624f0cd0cd1d57b47d24d35788adfcbcde
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Jul 31 19:25:59 2018 +0200
Disallow direct access to LedgerHandle#metadata
This object has been accessed and mutated all over the client, which
makes it hard to do anything with the object. This patch removes the
direct accesses, so the object can only be accessed through an
accessor.
Master issue: #281
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>
This closes #1574 from ivankelly/onlyaccessmdthroughaccessor
---
.../apache/bookkeeper/client/ForceLedgerOp.java | 2 +-
.../client/LedgerFragmentReplicator.java | 9 +++-
.../org/apache/bookkeeper/client/LedgerHandle.java | 62 +++++++++++++++-------
.../apache/bookkeeper/client/LedgerHandleAdv.java | 2 +-
.../org/apache/bookkeeper/client/PendingAddOp.java | 7 +--
.../apache/bookkeeper/client/PendingReadLacOp.java | 9 ++--
.../apache/bookkeeper/client/PendingReadOp.java | 2 +-
.../bookkeeper/client/PendingWriteLacOp.java | 4 +-
.../bookkeeper/client/ReadLastConfirmedOp.java | 14 ++---
.../bookkeeper/client/ReadOnlyLedgerHandle.java | 36 +++++++------
.../bookkeeper/client/TryReadLastConfirmedOp.java | 9 ++--
.../org/apache/bookkeeper/client/ClientUtil.java | 2 +-
.../client/ParallelLedgerRecoveryTest.java | 18 ++++---
.../bookkeeper/client/TestPendingReadLacOp.java | 8 +--
14 files changed, 115 insertions(+), 69 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index cd60848..ae0e051 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -69,7 +69,7 @@ class ForceLedgerOp extends SafeRunnable implements
ForceLedgerCallback {
LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId,
currentNonDurableLastAddConfirmed);
}
// we need to send the request to every bookie in the ensamble
- this.currentEnsemble = lh.metadata.currentEnsemble;
+ this.currentEnsemble = lh.getLedgerMetadata().currentEnsemble;
this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index c1440b7..538cfb7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -430,7 +430,14 @@ public class LedgerFragmentReplicator {
ensembleUpdatedCb.processResult(rc, null,
null);
} else {
- lh.metadata = newMeta;
+ while (true) {
+ // temporary change, metadata really
shouldn't be updated
+ // until the new metadata has been
written successfully
+ LedgerMetadata currentMetadata =
lh.getLedgerMetadata();
+ if
(lh.setLedgerMetadata(currentMetadata, newMeta)) {
+ break;
+ }
+ }
updateEnsembleInfo(ensembleUpdatedCb,
fragmentStartId, lh,
oldBookie2NewBookie);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3a610df..90b9884 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -102,7 +102,7 @@ public class LedgerHandle implements WriteHandle {
static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
final byte[] ledgerKey;
- LedgerMetadata metadata;
+ private LedgerMetadata metadata;
final BookKeeper bk;
final long ledgerId;
long lastAddPushed;
@@ -274,7 +274,9 @@ public class LedgerHandle implements WriteHandle {
}
protected void initializeExplicitLacFlushPolicy() {
- if (!metadata.isClosed() && !(this instanceof ReadOnlyLedgerHandle) &&
bk.getExplicitLacInterval() > 0) {
+ if (!getLedgerMetadata().isClosed()
+ && !(this instanceof ReadOnlyLedgerHandle)
+ && bk.getExplicitLacInterval() > 0) {
explicitLacFlushPolicy = new
ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
} else {
explicitLacFlushPolicy =
ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
@@ -333,13 +335,25 @@ public class LedgerHandle implements WriteHandle {
return metadata;
}
+ boolean setLedgerMetadata(LedgerMetadata expected, LedgerMetadata
newMetadata) {
+ synchronized (this) {
+ // ensure that we only update the metadata if it is the object we
expect it to be
+ if (metadata == expected) {
+ metadata = newMetadata;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
/**
* Get this ledger's customMetadata map.
*
* @return map containing user provided customMetadata.
*/
public Map<String, byte[]> getCustomMetadata() {
- return metadata.getCustomMetadata();
+ return getLedgerMetadata().getCustomMetadata();
}
/**
@@ -348,7 +362,7 @@ public class LedgerHandle implements WriteHandle {
* @return the count of fragments
*/
public synchronized long getNumFragments() {
- return metadata.getEnsembles().size();
+ return getLedgerMetadata().getEnsembles().size();
}
/**
@@ -358,7 +372,7 @@ public class LedgerHandle implements WriteHandle {
* @return count of unique bookies
*/
public synchronized long getNumBookies() {
- Map<Long, ArrayList<BookieSocketAddress>> m = metadata.getEnsembles();
+ Map<Long, ArrayList<BookieSocketAddress>> m =
getLedgerMetadata().getEnsembles();
Set<BookieSocketAddress> s = Sets.newHashSet();
for (ArrayList<BookieSocketAddress> aList : m.values()) {
s.addAll(aList);
@@ -402,7 +416,7 @@ public class LedgerHandle implements WriteHandle {
* @return the ledger creation time
*/
public long getCtime() {
- return this.metadata.getCtime();
+ return getLedgerMetadata().getCtime();
}
/**
@@ -425,10 +439,10 @@ public class LedgerHandle implements WriteHandle {
void writeLedgerConfig(GenericCallback<LedgerMetadata> writeCb) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing metadata to ledger manager: {}, {}",
this.ledgerId, metadata.getVersion());
+ LOG.debug("Writing metadata to ledger manager: {}, {}",
this.ledgerId, getLedgerMetadata().getVersion());
}
- bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
+ bk.getLedgerManager().writeLedgerMetadata(ledgerId,
getLedgerMetadata(), writeCb);
}
/**
@@ -476,7 +490,7 @@ public class LedgerHandle implements WriteHandle {
*/
@Override
public synchronized boolean isClosed() {
- return metadata.isClosed();
+ return getLedgerMetadata().isClosed();
}
void asyncCloseInternal(final CloseCallback cb, final Object ctx, final
int rc) {
@@ -536,6 +550,7 @@ public class LedgerHandle implements WriteHandle {
}
synchronized (LedgerHandle.this) {
+ LedgerMetadata metadata = getLedgerMetadata();
prevState = metadata.getState();
prevLastEntryId = metadata.getLastEntryId();
prevLength = metadata.getLength();
@@ -556,6 +571,7 @@ public class LedgerHandle implements WriteHandle {
errorOutPendingAdds(rc, pendingAdds);
if (LOG.isDebugEnabled()) {
+ LedgerMetadata metadata = getLedgerMetadata();
LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
+ metadata.getLastEntryId() + " with this many
bytes: " + metadata.getLength());
}
@@ -577,6 +593,7 @@ public class LedgerHandle implements WriteHandle {
ledgerId,
BKException.codeLogger(newrc));
cb.closeComplete(rc,
LedgerHandle.this, ctx);
} else {
+ LedgerMetadata metadata =
getLedgerMetadata();
metadata.setState(prevState);
if (prevState.equals(State.CLOSED)) {
metadata.close(prevLastEntryId);
@@ -1120,7 +1137,7 @@ public class LedgerHandle implements WriteHandle {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
- if (metadata.isClosed()) {
+ if (getLedgerMetadata().isClosed()) {
wasClosed = true;
}
}
@@ -1198,7 +1215,7 @@ public class LedgerHandle implements WriteHandle {
int nonWritableCount = 0;
for (int i = 0; i < sz; i++) {
- if
(!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+ if
(!bk.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i),
key)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
@@ -1277,7 +1294,7 @@ public class LedgerHandle implements WriteHandle {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
- if (metadata.isClosed()) {
+ if (getLedgerMetadata().isClosed()) {
wasClosed = true;
} else {
long entryId = ++lastAddPushed;
@@ -1358,6 +1375,7 @@ public class LedgerHandle implements WriteHandle {
boolean isClosed;
long lastEntryId;
synchronized (this) {
+ LedgerMetadata metadata = getLedgerMetadata();
isClosed = metadata.isClosed();
lastEntryId = metadata.getLastEntryId();
}
@@ -1399,6 +1417,7 @@ public class LedgerHandle implements WriteHandle {
boolean isClosed;
long lastEntryId;
synchronized (this) {
+ LedgerMetadata metadata = getLedgerMetadata();
isClosed = metadata.isClosed();
lastEntryId = metadata.getLastEntryId();
}
@@ -1486,6 +1505,7 @@ public class LedgerHandle implements WriteHandle {
boolean isClosed;
long lac;
synchronized (this) {
+ LedgerMetadata metadata = getLedgerMetadata();
isClosed = metadata.isClosed();
lac = metadata.getLastEntryId();
}
@@ -1653,6 +1673,7 @@ public class LedgerHandle implements WriteHandle {
public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback
cb, final Object ctx) {
boolean isClosed;
synchronized (this) {
+ LedgerMetadata metadata = getLedgerMetadata();
isClosed = metadata.isClosed();
if (isClosed) {
lastAddConfirmed = metadata.getLastEntryId();
@@ -1714,7 +1735,7 @@ public class LedgerHandle implements WriteHandle {
// close the ledger and send fails to all the adds in the pipeline
void handleUnrecoverableErrorDuringAdd(int rc) {
- if (metadata.isInRecovery()) {
+ if (getLedgerMetadata().isInRecovery()) {
// we should not close ledger if ledger is recovery mode
// otherwise we may lose entry.
errorOutPendingAdds(rc);
@@ -1796,6 +1817,7 @@ public class LedgerHandle implements WriteHandle {
final ArrayList<BookieSocketAddress> newEnsemble = new
ArrayList<BookieSocketAddress>();
final long newEnsembleStartEntry = getLastAddConfirmed() + 1;
final HashSet<Integer> replacedBookies = new HashSet<Integer>();
+ final LedgerMetadata metadata = getLedgerMetadata();
synchronized (metadata) {
newEnsemble.addAll(metadata.currentEnsemble);
for (Map.Entry<Integer, BookieSocketAddress> entry :
failedBookies.entrySet()) {
@@ -1869,6 +1891,7 @@ public class LedgerHandle implements WriteHandle {
}
return;
}
+ LedgerMetadata metadata = getLedgerMetadata();
synchronized (metadata) {
try {
EnsembleInfo ensembleInfo =
replaceBookieInMetadata(delayedWriteFailedBookies, curNumEnsembleChanges);
@@ -1922,6 +1945,7 @@ public class LedgerHandle implements WriteHandle {
handleUnrecoverableErrorDuringAdd(WriteException);
return;
}
+ LedgerMetadata metadata = getLedgerMetadata();
synchronized (metadata) {
try {
EnsembleInfo ensembleInfo =
replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
@@ -2075,7 +2099,7 @@ public class LedgerHandle implements WriteHandle {
LOG.error("[EnsembleChange-L{}-{}] : could not resolve
ledger metadata conflict"
+ " while changing ensemble to: {}, local
meta data is \n {} \n,"
+ " zk meta data is \n {} \n, closing
ledger",
- ledgerId, ensembleChangeIdx,
ensembleInfo.newEnsemble, metadata, newMeta);
+ ledgerId, ensembleChangeIdx,
ensembleInfo.newEnsemble, getLedgerMetadata(), newMeta);
handleUnrecoverableErrorDuringAdd(rc);
}
}
@@ -2096,6 +2120,7 @@ public class LedgerHandle implements WriteHandle {
* </p>
*/
private boolean resolveConflict(LedgerMetadata newMeta) {
+ LedgerMetadata metadata = getLedgerMetadata();
if (LOG.isDebugEnabled()) {
LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts -
local metadata = \n {} \n,"
+ " zk metadata = \n {} \n", ledgerId, ensembleChangeIdx,
metadata, newMeta);
@@ -2125,7 +2150,7 @@ public class LedgerHandle implements WriteHandle {
}
if (-1 == diff) {
// Case 1: metadata is changed by other ones (e.g.
Recovery)
- return updateMetadataIfPossible(newMeta);
+ return updateMetadataIfPossible(metadata, newMeta);
}
return false;
}
@@ -2142,7 +2167,7 @@ public class LedgerHandle implements WriteHandle {
// didn't finish, so try to resolve conflicts with the
metadata read from zookeeper and
// update ensemble changed metadata again.
if (areFailedBookiesReplaced(metadata, ensembleInfo)) {
- return updateMetadataIfPossible(newMeta);
+ return updateMetadataIfPossible(metadata, newMeta);
}
} else {
ensembleChangeCounter.inc();
@@ -2177,7 +2202,7 @@ public class LedgerHandle implements WriteHandle {
return replaced;
}
- private boolean updateMetadataIfPossible(LedgerMetadata newMeta) {
+ private boolean updateMetadataIfPossible(LedgerMetadata metadata,
LedgerMetadata newMeta) {
// if the local metadata is newer than zookeeper metadata, it
means that metadata is updated
// again when it was trying re-reading the metatada, re-kick the
reread again
if (metadata.isNewerThan(newMeta)) {
@@ -2260,6 +2285,7 @@ public class LedgerHandle implements WriteHandle {
boolean wasClosed = false;
boolean wasInRecovery = false;
+ LedgerMetadata metadata = getLedgerMetadata();
synchronized (this) {
if (metadata.isClosed()) {
if (forceRecovery) {
@@ -2311,7 +2337,7 @@ public class LedgerHandle implements WriteHandle {
if (rc != BKException.Code.OK) {
cb.operationComplete(rc, null);
} else {
- metadata = newMeta;
+ LedgerHandle.this.metadata = newMeta;
recover(cb, listener, forceRecovery);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 70e9430..153ceeb 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -206,7 +206,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
- if (metadata.isClosed()) {
+ if (getLedgerMetadata().isClosed()) {
wasClosed = true;
} else {
long currentLength = addToLength(op.payload.readableBytes());
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 3c3a1dd..5f17226 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -138,8 +138,9 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
void sendWriteRequest(int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY :
FLAG_NONE;
-
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey,
- entryId, toSend, this, bookieIndex, flags, allowFailFast,
lh.writeFlags);
+
lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+ lh.ledgerId, lh.ledgerKey, entryId,
toSend, this, bookieIndex,
+ flags, allowFailFast, lh.writeFlags);
++pendingWriteRequests;
}
@@ -265,7 +266,7 @@ class PendingAddOp extends SafeRunnable implements
WriteCallback {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;
- if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
+ if
(!lh.getLedgerMetadata().currentEnsemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
if (LOG.isDebugEnabled()) {
LOG.debug("Write did not succeed: " + ledgerId + ", " +
entryId + ". But we have already fixed it.");
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index db87d89..7b3aa9f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -60,13 +60,14 @@ class PendingReadLacOp implements ReadLacCallback {
PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
this.lh = lh;
this.cb = cb;
- this.numResponsesPending = lh.metadata.getEnsembleSize();
+ this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
}
public void initiate() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
- lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+ LedgerMetadata metadata = lh.getLedgerMetadata();
+ for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+ lh.bk.getBookieClient().readLac(metadata.currentEnsemble.get(i),
lh.ledgerId, this, i);
}
}
@@ -117,7 +118,7 @@ class PendingReadLacOp implements ReadLacCallback {
// Too bad, this bookie did not give us a valid answer, we
// still might be able to recover. So, continue
LOG.error("Mac mismatch while reading ledger: " + ledgerId +
" LAC from bookie: "
- + lh.metadata.currentEnsemble.get(bookieIndex));
+ +
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
rc = BKException.Code.DigestMatchException;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a6ffd32..76332df 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -493,7 +493,7 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
}
protected LedgerMetadata getLedgerMetadata() {
- return lh.metadata;
+ return lh.getLedgerMetadata();
}
protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index af45e29..d163130 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -70,8 +70,8 @@ class PendingWriteLacOp implements WriteLacCallback {
}
void sendWriteLacRequest(int bookieIndex) {
-
lh.bk.getBookieClient().writeLac(lh.metadata.currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey,
- lac, toSend, this, bookieIndex);
+
lh.bk.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+ lh.ledgerId, lh.ledgerKey, lac,
toSend, this, bookieIndex);
}
void initiate(ByteBufList toSend) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 2cb6152..112a145 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -52,13 +52,14 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
this.cb = cb;
this.maxRecoveredData = new
RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
this.lh = lh;
- this.numResponsesPending = lh.metadata.getEnsembleSize();
+ this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
}
public void initiate() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+ LedgerMetadata metadata = lh.getLedgerMetadata();
+ for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+ lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_NONE);
@@ -66,8 +67,9 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
}
public void initiateWithFencing() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+ LedgerMetadata metadata = lh.getLedgerMetadata();
+ for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+ lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i,
BookieProtocol.FLAG_DO_FENCING,
@@ -96,7 +98,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
// still might be able to recover though so continue
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: "
+ entryId
+ " while reading last entry from bookie: "
- + lh.metadata.currentEnsemble.get(bookieIndex));
+ +
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index f2433ed..3cc59a0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -45,24 +45,30 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
class MetadataUpdater extends SafeRunnable {
- final LedgerMetadata m;
+ final LedgerMetadata newMetadata;
MetadataUpdater(LedgerMetadata metadata) {
- this.m = metadata;
+ this.newMetadata = metadata;
}
@Override
public void safeRun() {
- Version.Occurred occurred =
-
ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
- if (Version.Occurred.BEFORE == occurred) {
- LOG.info("Updated ledger metadata for ledger {} to {}.",
ledgerId, this.m.toSafeString());
- synchronized (ReadOnlyLedgerHandle.this) {
- if (this.m.isClosed()) {
- ReadOnlyLedgerHandle.this.lastAddConfirmed =
this.m.getLastEntryId();
- ReadOnlyLedgerHandle.this.length =
this.m.getLength();
+ while (true) {
+ LedgerMetadata currentMetadata = getLedgerMetadata();
+ Version.Occurred occurred =
currentMetadata.getVersion().compare(newMetadata.getVersion());
+ if (Version.Occurred.BEFORE == occurred) {
+ LOG.info("Updated ledger metadata for ledger {} to {}.",
ledgerId, newMetadata.toSafeString());
+ synchronized (ReadOnlyLedgerHandle.this) {
+ if (newMetadata.isClosed()) {
+ ReadOnlyLedgerHandle.this.lastAddConfirmed =
newMetadata.getLastEntryId();
+ ReadOnlyLedgerHandle.this.length =
newMetadata.getLength();
+ }
+ if (setLedgerMetadata(currentMetadata, newMetadata)) {
+ break;
+ }
}
- ReadOnlyLedgerHandle.this.metadata = this.m;
+ } else {
+ break;
}
}
}
@@ -123,7 +129,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements
LedgerMetadataListene
@Override
void handleBookieFailure(final Map<Integer, BookieSocketAddress>
failedBookies) {
blockAddCompletions.incrementAndGet();
- synchronized (metadata) {
+ synchronized (getLedgerMetadata()) {
try {
EnsembleInfo ensembleInfo =
replaceBookieInMetadata(failedBookies,
numEnsembleChanges.incrementAndGet());
@@ -154,11 +160,11 @@ class ReadOnlyLedgerHandle extends LedgerHandle
implements LedgerMetadataListene
if (null == newMetadata) {
return;
}
- Version.Occurred occurred =
- this.metadata.getVersion().compare(newMetadata.getVersion());
+ LedgerMetadata currentMetadata = getLedgerMetadata();
+ Version.Occurred occurred =
currentMetadata.getVersion().compare(newMetadata.getVersion());
if (LOG.isDebugEnabled()) {
LOG.debug("Try to update metadata from {} to {} : {}",
- this.metadata, newMetadata, occurred);
+ currentMetadata, newMetadata, occurred);
}
if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 441504c..b8a5f0d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -46,12 +46,13 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
this.lh = lh;
this.cb = cb;
this.maxRecoveredData = new RecoveryData(lac, 0);
- this.numResponsesPending = lh.metadata.getEnsembleSize();
+ this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
}
public void initiate() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+ LedgerMetadata metadata = lh.getLedgerMetadata();
+ for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+ lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
this, i, BookieProtocol.FLAG_NONE);
@@ -83,7 +84,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
} catch (BKException.BKDigestMatchException e) {
LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: "
+ entryId
+ " while reading last entry from bookie: "
- + lh.metadata.currentEnsemble.get(bookieIndex));
+ +
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
}
} else if (BKException.Code.UnauthorizedAccessException == rc &&
!completed) {
cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 8e2dd7d..bb3e553 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -46,7 +46,7 @@ public class ClientUtil {
* Returns that whether ledger is in open state.
*/
public static boolean isLedgerOpen(LedgerHandle handle) {
- return !handle.metadata.isClosed();
+ return !handle.getLedgerMetadata().isClosed();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index f31feef..b87c666 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -428,14 +428,16 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
final CountDownLatch addLatch = new CountDownLatch(1);
final AtomicBoolean addSuccess = new AtomicBoolean(false);
LOG.info("Add entry {} with lac = {}", entryId, lac);
- lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(0),
lh.getId(), lh.ledgerKey, entryId, toSend,
- new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
- addSuccess.set(BKException.Code.OK == rc);
- addLatch.countDown();
- }
- }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+
lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
+ lh.getId(), lh.ledgerKey, entryId,
toSend,
+ new WriteCallback() {
+ @Override
+ public void writeComplete(int rc,
long ledgerId, long entryId,
+
BookieSocketAddress addr, Object ctx) {
+
addSuccess.set(BKException.Code.OK == rc);
+ addLatch.countDown();
+ }
+ }, 0, BookieProtocol.FLAG_NONE,
false, WriteFlag.NONE);
addLatch.await();
assertTrue("add entry 14 should succeed", addSuccess.get());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 6e63810..49a435e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -54,7 +54,7 @@ public class TestPendingReadLacOp extends
BookKeeperClusterTestCase {
PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) ->
result.complete(lac)) {
@Override
public void initiate() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+ for (int i = 0; i <
lh.getLedgerMetadata().currentEnsemble.size(); i++) {
final int index = i;
ByteBufList buffer =
lh.getDigestManager().computeDigestAndPackageForSending(
2,
@@ -70,7 +70,7 @@ public class TestPendingReadLacOp extends
BookKeeperClusterTestCase {
index);
}, 0, TimeUnit.SECONDS);
-
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+
lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
lh.ledgerId, this, i);
}
}
@@ -90,7 +90,7 @@ public class TestPendingReadLacOp extends
BookKeeperClusterTestCase {
PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) ->
result.complete(lac)) {
@Override
public void initiate() {
- for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+ for (int i = 0; i <
lh.getLedgerMetadata().currentEnsemble.size(); i++) {
final int index = i;
ByteBufList buffer =
lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
bkc.scheduler.schedule(() -> {
@@ -101,7 +101,7 @@ public class TestPendingReadLacOp extends
BookKeeperClusterTestCase {
null,
index);
}, 0, TimeUnit.SECONDS);
-
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+
lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
lh.ledgerId, this, i);
}
}