This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec1966  Disallow direct access to LedgerHandle#metadata
7ec1966 is described below

commit 7ec196624f0cd0cd1d57b47d24d35788adfcbcde
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Jul 31 19:25:59 2018 +0200

    Disallow direct access to LedgerHandle#metadata
    
    This object has been accessed and mutated all over the client, which
    makes it hard to do anything with the object. This patch removes the
    direct accesses, so the object can only be accessed through an
    accessor.
    
    Master issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #1574 from ivankelly/onlyaccessmdthroughaccessor
---
 .../apache/bookkeeper/client/ForceLedgerOp.java    |  2 +-
 .../client/LedgerFragmentReplicator.java           |  9 +++-
 .../org/apache/bookkeeper/client/LedgerHandle.java | 62 +++++++++++++++-------
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  2 +-
 .../org/apache/bookkeeper/client/PendingAddOp.java |  7 +--
 .../apache/bookkeeper/client/PendingReadLacOp.java |  9 ++--
 .../apache/bookkeeper/client/PendingReadOp.java    |  2 +-
 .../bookkeeper/client/PendingWriteLacOp.java       |  4 +-
 .../bookkeeper/client/ReadLastConfirmedOp.java     | 14 ++---
 .../bookkeeper/client/ReadOnlyLedgerHandle.java    | 36 +++++++------
 .../bookkeeper/client/TryReadLastConfirmedOp.java  |  9 ++--
 .../org/apache/bookkeeper/client/ClientUtil.java   |  2 +-
 .../client/ParallelLedgerRecoveryTest.java         | 18 ++++---
 .../bookkeeper/client/TestPendingReadLacOp.java    |  8 +--
 14 files changed, 115 insertions(+), 69 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index cd60848..ae0e051 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -69,7 +69,7 @@ class ForceLedgerOp extends SafeRunnable implements 
ForceLedgerCallback {
             LOG.debug("force {} clientNonDurableLac {}", lh.ledgerId, 
currentNonDurableLastAddConfirmed);
         }
         // we need to send the request to every bookie in the ensamble
-        this.currentEnsemble = lh.metadata.currentEnsemble;
+        this.currentEnsemble = lh.getLedgerMetadata().currentEnsemble;
         this.ackSet = lh.distributionSchedule.getEnsembleAckSet();
 
         DistributionSchedule.WriteSet writeSet = lh.getDistributionSchedule()
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index c1440b7..538cfb7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -430,7 +430,14 @@ public class LedgerFragmentReplicator {
                                     ensembleUpdatedCb.processResult(rc, null,
                                             null);
                                 } else {
-                                    lh.metadata = newMeta;
+                                    while (true) {
+                                        // temporary change, metadata really 
shouldn't be updated
+                                        // until the new metadata has been 
written successfully
+                                        LedgerMetadata currentMetadata = 
lh.getLedgerMetadata();
+                                        if 
(lh.setLedgerMetadata(currentMetadata, newMeta)) {
+                                            break;
+                                        }
+                                    }
                                     updateEnsembleInfo(ensembleUpdatedCb,
                                             fragmentStartId, lh, 
oldBookie2NewBookie);
                                 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3a610df..90b9884 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -102,7 +102,7 @@ public class LedgerHandle implements WriteHandle {
     static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
 
     final byte[] ledgerKey;
-    LedgerMetadata metadata;
+    private LedgerMetadata metadata;
     final BookKeeper bk;
     final long ledgerId;
     long lastAddPushed;
@@ -274,7 +274,9 @@ public class LedgerHandle implements WriteHandle {
     }
 
     protected void initializeExplicitLacFlushPolicy() {
-        if (!metadata.isClosed() && !(this instanceof ReadOnlyLedgerHandle) && 
bk.getExplicitLacInterval() > 0) {
+        if (!getLedgerMetadata().isClosed()
+            && !(this instanceof ReadOnlyLedgerHandle)
+            && bk.getExplicitLacInterval() > 0) {
             explicitLacFlushPolicy = new 
ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this);
         } else {
             explicitLacFlushPolicy = 
ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
@@ -333,13 +335,25 @@ public class LedgerHandle implements WriteHandle {
         return metadata;
     }
 
+    boolean setLedgerMetadata(LedgerMetadata expected, LedgerMetadata 
newMetadata) {
+        synchronized (this) {
+            // ensure that we only update the metadata if it is the object we 
expect it to be
+            if (metadata == expected) {
+                metadata = newMetadata;
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
     /**
      * Get this ledger's customMetadata map.
      *
      * @return map containing user provided customMetadata.
      */
     public Map<String, byte[]> getCustomMetadata() {
-        return metadata.getCustomMetadata();
+        return getLedgerMetadata().getCustomMetadata();
     }
 
     /**
@@ -348,7 +362,7 @@ public class LedgerHandle implements WriteHandle {
      * @return the count of fragments
      */
     public synchronized long getNumFragments() {
-        return metadata.getEnsembles().size();
+        return getLedgerMetadata().getEnsembles().size();
     }
 
     /**
@@ -358,7 +372,7 @@ public class LedgerHandle implements WriteHandle {
      * @return count of unique bookies
      */
     public synchronized long getNumBookies() {
-        Map<Long, ArrayList<BookieSocketAddress>> m = metadata.getEnsembles();
+        Map<Long, ArrayList<BookieSocketAddress>> m = 
getLedgerMetadata().getEnsembles();
         Set<BookieSocketAddress> s = Sets.newHashSet();
         for (ArrayList<BookieSocketAddress> aList : m.values()) {
             s.addAll(aList);
@@ -402,7 +416,7 @@ public class LedgerHandle implements WriteHandle {
      * @return the ledger creation time
      */
     public long getCtime() {
-        return this.metadata.getCtime();
+        return getLedgerMetadata().getCtime();
     }
 
     /**
@@ -425,10 +439,10 @@ public class LedgerHandle implements WriteHandle {
 
     void writeLedgerConfig(GenericCallback<LedgerMetadata> writeCb) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing metadata to ledger manager: {}, {}", 
this.ledgerId, metadata.getVersion());
+            LOG.debug("Writing metadata to ledger manager: {}, {}", 
this.ledgerId, getLedgerMetadata().getVersion());
         }
 
-        bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
+        bk.getLedgerManager().writeLedgerMetadata(ledgerId, 
getLedgerMetadata(), writeCb);
     }
 
     /**
@@ -476,7 +490,7 @@ public class LedgerHandle implements WriteHandle {
      */
     @Override
     public synchronized boolean isClosed() {
-        return metadata.isClosed();
+        return getLedgerMetadata().isClosed();
     }
 
     void asyncCloseInternal(final CloseCallback cb, final Object ctx, final 
int rc) {
@@ -536,6 +550,7 @@ public class LedgerHandle implements WriteHandle {
                 }
 
                 synchronized (LedgerHandle.this) {
+                    LedgerMetadata metadata = getLedgerMetadata();
                     prevState = metadata.getState();
                     prevLastEntryId = metadata.getLastEntryId();
                     prevLength = metadata.getLength();
@@ -556,6 +571,7 @@ public class LedgerHandle implements WriteHandle {
                 errorOutPendingAdds(rc, pendingAdds);
 
                 if (LOG.isDebugEnabled()) {
+                    LedgerMetadata metadata = getLedgerMetadata();
                     LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
                               + metadata.getLastEntryId() + " with this many 
bytes: " + metadata.getLength());
                 }
@@ -577,6 +593,7 @@ public class LedgerHandle implements WriteHandle {
                                                 ledgerId, 
BKException.codeLogger(newrc));
                                         cb.closeComplete(rc, 
LedgerHandle.this, ctx);
                                     } else {
+                                        LedgerMetadata metadata = 
getLedgerMetadata();
                                         metadata.setState(prevState);
                                         if (prevState.equals(State.CLOSED)) {
                                             metadata.close(prevLastEntryId);
@@ -1120,7 +1137,7 @@ public class LedgerHandle implements WriteHandle {
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             }
         }
@@ -1198,7 +1215,7 @@ public class LedgerHandle implements WriteHandle {
 
         int nonWritableCount = 0;
         for (int i = 0; i < sz; i++) {
-            if 
(!bk.getBookieClient().isWritable(metadata.currentEnsemble.get(i), key)) {
+            if 
(!bk.getBookieClient().isWritable(getLedgerMetadata().currentEnsemble.get(i), 
key)) {
                 nonWritableCount++;
                 if (nonWritableCount >= allowedNonWritableCount) {
                     return false;
@@ -1277,7 +1294,7 @@ public class LedgerHandle implements WriteHandle {
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             } else {
                 long entryId = ++lastAddPushed;
@@ -1358,6 +1375,7 @@ public class LedgerHandle implements WriteHandle {
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lastEntryId = metadata.getLastEntryId();
         }
@@ -1399,6 +1417,7 @@ public class LedgerHandle implements WriteHandle {
         boolean isClosed;
         long lastEntryId;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lastEntryId = metadata.getLastEntryId();
         }
@@ -1486,6 +1505,7 @@ public class LedgerHandle implements WriteHandle {
         boolean isClosed;
         long lac;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             lac = metadata.getLastEntryId();
         }
@@ -1653,6 +1673,7 @@ public class LedgerHandle implements WriteHandle {
     public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback 
cb, final Object ctx) {
         boolean isClosed;
         synchronized (this) {
+            LedgerMetadata metadata = getLedgerMetadata();
             isClosed = metadata.isClosed();
             if (isClosed) {
                 lastAddConfirmed = metadata.getLastEntryId();
@@ -1714,7 +1735,7 @@ public class LedgerHandle implements WriteHandle {
 
     // close the ledger and send fails to all the adds in the pipeline
     void handleUnrecoverableErrorDuringAdd(int rc) {
-        if (metadata.isInRecovery()) {
+        if (getLedgerMetadata().isInRecovery()) {
             // we should not close ledger if ledger is recovery mode
             // otherwise we may lose entry.
             errorOutPendingAdds(rc);
@@ -1796,6 +1817,7 @@ public class LedgerHandle implements WriteHandle {
         final ArrayList<BookieSocketAddress> newEnsemble = new 
ArrayList<BookieSocketAddress>();
         final long newEnsembleStartEntry = getLastAddConfirmed() + 1;
         final HashSet<Integer> replacedBookies = new HashSet<Integer>();
+        final LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             newEnsemble.addAll(metadata.currentEnsemble);
             for (Map.Entry<Integer, BookieSocketAddress> entry : 
failedBookies.entrySet()) {
@@ -1869,6 +1891,7 @@ public class LedgerHandle implements WriteHandle {
             }
             return;
         }
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = 
replaceBookieInMetadata(delayedWriteFailedBookies, curNumEnsembleChanges);
@@ -1922,6 +1945,7 @@ public class LedgerHandle implements WriteHandle {
             handleUnrecoverableErrorDuringAdd(WriteException);
             return;
         }
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = 
replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
@@ -2075,7 +2099,7 @@ public class LedgerHandle implements WriteHandle {
                     LOG.error("[EnsembleChange-L{}-{}] : could not resolve 
ledger metadata conflict"
                                     + " while changing ensemble to: {}, local 
meta data is \n {} \n,"
                                     + " zk meta data is \n {} \n, closing 
ledger",
-                            ledgerId, ensembleChangeIdx, 
ensembleInfo.newEnsemble, metadata, newMeta);
+                            ledgerId, ensembleChangeIdx, 
ensembleInfo.newEnsemble, getLedgerMetadata(), newMeta);
                     handleUnrecoverableErrorDuringAdd(rc);
                 }
             }
@@ -2096,6 +2120,7 @@ public class LedgerHandle implements WriteHandle {
          * </p>
          */
         private boolean resolveConflict(LedgerMetadata newMeta) {
+            LedgerMetadata metadata = getLedgerMetadata();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts - 
local metadata = \n {} \n,"
                     + " zk metadata = \n {} \n", ledgerId, ensembleChangeIdx, 
metadata, newMeta);
@@ -2125,7 +2150,7 @@ public class LedgerHandle implements WriteHandle {
                 }
                 if (-1 == diff) {
                     // Case 1: metadata is changed by other ones (e.g. 
Recovery)
-                    return updateMetadataIfPossible(newMeta);
+                    return updateMetadataIfPossible(metadata, newMeta);
                 }
                 return false;
             }
@@ -2142,7 +2167,7 @@ public class LedgerHandle implements WriteHandle {
                 // didn't finish, so try to resolve conflicts with the 
metadata read from zookeeper and
                 // update ensemble changed metadata again.
                 if (areFailedBookiesReplaced(metadata, ensembleInfo)) {
-                    return updateMetadataIfPossible(newMeta);
+                    return updateMetadataIfPossible(metadata, newMeta);
                 }
             } else {
                 ensembleChangeCounter.inc();
@@ -2177,7 +2202,7 @@ public class LedgerHandle implements WriteHandle {
             return replaced;
         }
 
-        private boolean updateMetadataIfPossible(LedgerMetadata newMeta) {
+        private boolean updateMetadataIfPossible(LedgerMetadata metadata, 
LedgerMetadata newMeta) {
             // if the local metadata is newer than zookeeper metadata, it 
means that metadata is updated
             // again when it was trying re-reading the metatada, re-kick the 
reread again
             if (metadata.isNewerThan(newMeta)) {
@@ -2260,6 +2285,7 @@ public class LedgerHandle implements WriteHandle {
         boolean wasClosed = false;
         boolean wasInRecovery = false;
 
+        LedgerMetadata metadata = getLedgerMetadata();
         synchronized (this) {
             if (metadata.isClosed()) {
                 if (forceRecovery) {
@@ -2311,7 +2337,7 @@ public class LedgerHandle implements WriteHandle {
                             if (rc != BKException.Code.OK) {
                                 cb.operationComplete(rc, null);
                             } else {
-                                metadata = newMeta;
+                                LedgerHandle.this.metadata = newMeta;
                                 recover(cb, listener, forceRecovery);
                             }
                         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 70e9430..153ceeb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -206,7 +206,7 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
             // synchronized on this to ensure that
             // the ledger isn't closed between checking and
             // updating lastAddPushed
-            if (metadata.isClosed()) {
+            if (getLedgerMetadata().isClosed()) {
                 wasClosed = true;
             } else {
                 long currentLength = addToLength(op.payload.readableBytes());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 3c3a1dd..5f17226 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -138,8 +138,9 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
     void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : 
FLAG_NONE;
 
-        
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), 
lh.ledgerId, lh.ledgerKey,
-                entryId, toSend, this, bookieIndex, flags, allowFailFast, 
lh.writeFlags);
+        
lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                         lh.ledgerId, lh.ledgerKey, entryId, 
toSend, this, bookieIndex,
+                                         flags, allowFailFast, lh.writeFlags);
         ++pendingWriteRequests;
     }
 
@@ -265,7 +266,7 @@ class PendingAddOp extends SafeRunnable implements 
WriteCallback {
         int bookieIndex = (Integer) ctx;
         --pendingWriteRequests;
 
-        if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
+        if 
(!lh.getLedgerMetadata().currentEnsemble.get(bookieIndex).equals(addr)) {
             // ensemble has already changed, failure of this addr is immaterial
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Write did not succeed: " + ledgerId + ", " + 
entryId + ". But we have already fixed it.");
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
index db87d89..7b3aa9f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadLacOp.java
@@ -60,13 +60,14 @@ class PendingReadLacOp implements ReadLacCallback {
     PendingReadLacOp(LedgerHandle lh, LacCallback cb) {
         this.lh = lh;
         this.cb = cb;
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readLac(metadata.currentEnsemble.get(i),
                     lh.ledgerId, this, i);
         }
     }
@@ -117,7 +118,7 @@ class PendingReadLacOp implements ReadLacCallback {
                 // Too bad, this bookie did not give us a valid answer, we
                 // still might be able to recover. So, continue
                 LOG.error("Mac mismatch while reading  ledger: " + ledgerId + 
" LAC from bookie: "
-                        + lh.metadata.currentEnsemble.get(bookieIndex));
+                        + 
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
                 rc = BKException.Code.DigestMatchException;
             }
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a6ffd32..76332df 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -493,7 +493,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
     }
 
     protected LedgerMetadata getLedgerMetadata() {
-        return lh.metadata;
+        return lh.getLedgerMetadata();
     }
 
     protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
index af45e29..d163130 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java
@@ -70,8 +70,8 @@ class PendingWriteLacOp implements WriteLacCallback {
     }
 
     void sendWriteLacRequest(int bookieIndex) {
-        
lh.bk.getBookieClient().writeLac(lh.metadata.currentEnsemble.get(bookieIndex), 
lh.ledgerId, lh.ledgerKey,
-                lac, toSend, this, bookieIndex);
+        
lh.bk.getBookieClient().writeLac(lh.getLedgerMetadata().currentEnsemble.get(bookieIndex),
+                                         lh.ledgerId, lh.ledgerKey, lac, 
toSend, this, bookieIndex);
     }
 
     void initiate(ByteBufList toSend) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index 2cb6152..112a145 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -52,13 +52,14 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
         this.cb = cb;
         this.maxRecoveredData = new 
RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
         this.lh = lh;
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
         this.coverageSet = lh.distributionSchedule.getCoverageSet();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i, BookieProtocol.FLAG_NONE);
@@ -66,8 +67,9 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
     }
 
     public void initiateWithFencing() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                               lh.ledgerId,
                                               
BookieProtocol.LAST_ADD_CONFIRMED,
                                               this, i, 
BookieProtocol.FLAG_DO_FENCING,
@@ -96,7 +98,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
                 // still might be able to recover though so continue
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " 
+ entryId
                           + " while reading last entry from bookie: "
-                          + lh.metadata.currentEnsemble.get(bookieIndex));
+                          + 
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
             }
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index f2433ed..3cc59a0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -45,24 +45,30 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
 
     class MetadataUpdater extends SafeRunnable {
 
-        final LedgerMetadata m;
+        final LedgerMetadata newMetadata;
 
         MetadataUpdater(LedgerMetadata metadata) {
-            this.m = metadata;
+            this.newMetadata = metadata;
         }
 
         @Override
         public void safeRun() {
-            Version.Occurred occurred =
-                    
ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
-            if (Version.Occurred.BEFORE == occurred) {
-                LOG.info("Updated ledger metadata for ledger {} to {}.", 
ledgerId, this.m.toSafeString());
-                synchronized (ReadOnlyLedgerHandle.this) {
-                    if (this.m.isClosed()) {
-                            ReadOnlyLedgerHandle.this.lastAddConfirmed = 
this.m.getLastEntryId();
-                            ReadOnlyLedgerHandle.this.length = 
this.m.getLength();
+            while (true) {
+                LedgerMetadata currentMetadata = getLedgerMetadata();
+                Version.Occurred occurred = 
currentMetadata.getVersion().compare(newMetadata.getVersion());
+                if (Version.Occurred.BEFORE == occurred) {
+                    LOG.info("Updated ledger metadata for ledger {} to {}.", 
ledgerId, newMetadata.toSafeString());
+                    synchronized (ReadOnlyLedgerHandle.this) {
+                        if (newMetadata.isClosed()) {
+                            ReadOnlyLedgerHandle.this.lastAddConfirmed = 
newMetadata.getLastEntryId();
+                            ReadOnlyLedgerHandle.this.length = 
newMetadata.getLength();
+                        }
+                        if (setLedgerMetadata(currentMetadata, newMetadata)) {
+                            break;
+                        }
                     }
-                    ReadOnlyLedgerHandle.this.metadata = this.m;
+                } else {
+                    break;
                 }
             }
         }
@@ -123,7 +129,7 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
     @Override
     void handleBookieFailure(final Map<Integer, BookieSocketAddress> 
failedBookies) {
         blockAddCompletions.incrementAndGet();
-        synchronized (metadata) {
+        synchronized (getLedgerMetadata()) {
             try {
                 EnsembleInfo ensembleInfo = 
replaceBookieInMetadata(failedBookies,
                         numEnsembleChanges.incrementAndGet());
@@ -154,11 +160,11 @@ class ReadOnlyLedgerHandle extends LedgerHandle 
implements LedgerMetadataListene
         if (null == newMetadata) {
             return;
         }
-        Version.Occurred occurred =
-                this.metadata.getVersion().compare(newMetadata.getVersion());
+        LedgerMetadata currentMetadata = getLedgerMetadata();
+        Version.Occurred occurred = 
currentMetadata.getVersion().compare(newMetadata.getVersion());
         if (LOG.isDebugEnabled()) {
             LOG.debug("Try to update metadata from {} to {} : {}",
-                    this.metadata, newMetadata, occurred);
+                      currentMetadata, newMetadata, occurred);
         }
         if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
             try {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index 441504c..b8a5f0d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -46,12 +46,13 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
         this.lh = lh;
         this.cb = cb;
         this.maxRecoveredData = new RecoveryData(lac, 0);
-        this.numResponsesPending = lh.metadata.getEnsembleSize();
+        this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
     }
 
     public void initiate() {
-        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+        LedgerMetadata metadata = lh.getLedgerMetadata();
+        for (int i = 0; i < metadata.currentEnsemble.size(); i++) {
+            lh.bk.getBookieClient().readEntry(metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
                                          this, i, BookieProtocol.FLAG_NONE);
@@ -83,7 +84,7 @@ class TryReadLastConfirmedOp implements ReadEntryCallback {
             } catch (BKException.BKDigestMatchException e) {
                 LOG.error("Mac mismatch for ledger: " + ledgerId + ", entry: " 
+ entryId
                           + " while reading last entry from bookie: "
-                          + lh.metadata.currentEnsemble.get(bookieIndex));
+                          + 
lh.getLedgerMetadata().currentEnsemble.get(bookieIndex));
             }
         } else if (BKException.Code.UnauthorizedAccessException == rc && 
!completed) {
             cb.readLastConfirmedDataComplete(rc, maxRecoveredData);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 8e2dd7d..bb3e553 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -46,7 +46,7 @@ public class ClientUtil {
      * Returns that whether ledger is in open state.
      */
     public static boolean isLedgerOpen(LedgerHandle handle) {
-        return !handle.metadata.isClosed();
+        return !handle.getLedgerMetadata().isClosed();
     }
 
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index f31feef..b87c666 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -428,14 +428,16 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
-        lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(0), 
lh.getId(), lh.ledgerKey, entryId, toSend,
-            new WriteCallback() {
-                @Override
-                public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
-                    addSuccess.set(BKException.Code.OK == rc);
-                    addLatch.countDown();
-                }
-            }, 0, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
+        
lh.bk.getBookieClient().addEntry(lh.getLedgerMetadata().currentEnsemble.get(0),
+                                         lh.getId(), lh.ledgerKey, entryId, 
toSend,
+                                         new WriteCallback() {
+                                             @Override
+                                             public void writeComplete(int rc, 
long ledgerId, long entryId,
+                                                                       
BookieSocketAddress addr, Object ctx) {
+                                                 
addSuccess.set(BKException.Code.OK == rc);
+                                                 addLatch.countDown();
+                                             }
+                                         }, 0, BookieProtocol.FLAG_NONE, 
false, WriteFlag.NONE);
         addLatch.await();
         assertTrue("add entry 14 should succeed", addSuccess.get());
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 6e63810..49a435e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -54,7 +54,7 @@ public class TestPendingReadLacOp extends 
BookKeeperClusterTestCase {
         PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> 
result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                for (int i = 0; i < 
lh.getLedgerMetadata().currentEnsemble.size(); i++) {
                     final int index = i;
                     ByteBufList buffer = 
lh.getDigestManager().computeDigestAndPackageForSending(
                             2,
@@ -70,7 +70,7 @@ public class TestPendingReadLacOp extends 
BookKeeperClusterTestCase {
                                 index);
 
                     }, 0, TimeUnit.SECONDS);
-                    
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                    
lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }
@@ -90,7 +90,7 @@ public class TestPendingReadLacOp extends 
BookKeeperClusterTestCase {
         PendingReadLacOp pro = new PendingReadLacOp(lh, (rc, lac) -> 
result.complete(lac)) {
             @Override
             public void initiate() {
-                for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+                for (int i = 0; i < 
lh.getLedgerMetadata().currentEnsemble.size(); i++) {
                     final int index = i;
                     ByteBufList buffer = 
lh.getDigestManager().computeDigestAndPackageForSendingLac(1);
                     bkc.scheduler.schedule(() -> {
@@ -101,7 +101,7 @@ public class TestPendingReadLacOp extends 
BookKeeperClusterTestCase {
                                 null,
                                 index);
                     }, 0, TimeUnit.SECONDS);
-                    
lh.bk.getBookieClient().readLac(lh.metadata.currentEnsemble.get(i),
+                    
lh.bk.getBookieClient().readLac(lh.getLedgerMetadata().currentEnsemble.get(i),
                             lh.ledgerId, this, i);
                 }
             }

Reply via email to