This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a031e0e  Make LedgerFragmentReplicator use MetadataUpdateLoop
a031e0e is described below

commit a031e0ede1767960d191d70a6cda21990d1f541e
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Sep 6 09:34:23 2018 +0200

    Make LedgerFragmentReplicator use MetadataUpdateLoop
    
    Master Issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1645 from ivankelly/ledger-fragment-immutable-metadata2
---
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |   4 +-
 .../client/LedgerFragmentReplicator.java           | 145 ++++++---------------
 2 files changed, 42 insertions(+), 107 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 59c203b..e5b5914 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -893,7 +893,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                         try {
                             LedgerFragmentReplicator.SingleFragmentCallback cb 
=
                                 new 
LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
-                                                                               
     bkc.getMainWorkerPool(),
+                                                                               
     bkc.getLedgerManager(),
                                         startEntryId, 
getReplacementBookiesMap(ensemble, targetBookieAddresses));
                             LedgerFragment ledgerFragment = new 
LedgerFragment(lh,
                                 startEntryId, endEntryId, 
targetBookieAddresses.keySet());
@@ -1047,7 +1047,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         SingleFragmentCallback cb = new SingleFragmentCallback(
             resultCallBack,
             lh,
-            bkc.getMainWorkerPool(),
+            bkc.getLedgerManager(),
             ledgerFragment.getFirstEntryId(),
             getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index eb11b30..dc7302c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -23,7 +23,6 @@ import static 
org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
 import io.netty.buffer.Unpooled;
 
-import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -33,13 +32,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.replication.ReplicationStats;
@@ -48,7 +47,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedGenericCallback;
 import org.apache.zookeeper.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -341,16 +339,16 @@ public class LedgerFragmentReplicator {
     static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
         final AsyncCallback.VoidCallback ledgerFragmentsMcb;
         final LedgerHandle lh;
-        final OrderedExecutor mainWorkerPool;
+        final LedgerManager ledgerManager;
         final long fragmentStartId;
         final Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie;
 
         SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
-                               LedgerHandle lh, OrderedExecutor 
mainWorkerPool, long fragmentStartId,
-                Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie) {
+                               LedgerHandle lh, LedgerManager ledgerManager, 
long fragmentStartId,
+                               Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie) {
             this.ledgerFragmentsMcb = ledgerFragmentsMcb;
             this.lh = lh;
-            this.mainWorkerPool = mainWorkerPool;
+            this.ledgerManager = ledgerManager;
             this.fragmentStartId = fragmentStartId;
             this.oldBookie2NewBookie = oldBookie2NewBookie;
         }
@@ -363,7 +361,7 @@ public class LedgerFragmentReplicator {
                 ledgerFragmentsMcb.processResult(rc, null, null);
                 return;
             }
-            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, 
mainWorkerPool, oldBookie2NewBookie);
+            updateEnsembleInfo(ledgerManager, ledgerFragmentsMcb, 
fragmentStartId, lh, oldBookie2NewBookie);
         }
     }
 
@@ -371,104 +369,41 @@ public class LedgerFragmentReplicator {
      * Updates the ensemble with newBookie and notify the ensembleUpdatedCb.
      */
     private static void updateEnsembleInfo(
-            AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
-            LedgerHandle lh, OrderedExecutor mainWorkerPool,
-            Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) 
{
-        /*
-         * Update the ledger metadata's ensemble info to point to the new
-         * bookie.
-         */
-        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().get(fragmentStartId);
-        List<BookieSocketAddress> newEnsemble = new ArrayList<>(ensemble);
-        for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry : 
oldBookie2NewBookie.entrySet()) {
-            int deadBookieIndex = newEnsemble.indexOf(entry.getKey());
-            // update ensemble info might happen after re-read ledger 
metadata, so the ensemble might already
-            // change. if ensemble is already changed, skip replacing the 
bookie doesn't exist.
-            if (deadBookieIndex >= 0) {
-                newEnsemble.set(deadBookieIndex, entry.getValue());
-            } else {
-                LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", 
entry.getKey(), ensemble);
-            }
-        }
-        lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
-        lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
-                                                  fragmentStartId, lh, 
mainWorkerPool, oldBookie2NewBookie));
-    }
+            LedgerManager ledgerManager, AsyncCallback.VoidCallback 
ensembleUpdatedCb, long fragmentStartId,
+            LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie) {
 
-    /**
-     * Update the ensemble data with newBookie. re-reads the metadata on
-     * MetadataVersionException and update ensemble again. On successfull
-     * updation, it will also notify to super call back
-     */
-    private static class UpdateEnsembleCb implements 
GenericCallback<LedgerMetadata> {
-        final AsyncCallback.VoidCallback ensembleUpdatedCb;
-        final LedgerHandle lh;
-        final OrderedExecutor mainWorkerPool;
-        final long fragmentStartId;
-        final Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie;
+        MetadataUpdateLoop updateLoop = new MetadataUpdateLoop(
+                ledgerManager,
+                lh.getId(),
+                lh::getLedgerMetadata,
+                (metadata) -> {
+                    // returns true if any of old bookies exist in ensemble
+                    List<BookieSocketAddress> ensemble = 
metadata.getEnsembles().get(fragmentStartId);
+                    return 
oldBookie2NewBookie.keySet().stream().anyMatch(ensemble::contains);
+                },
+                (currentMetadata) -> {
+                    // replace all old bookies with new bookies in ensemble
+                    List<BookieSocketAddress> newEnsemble = 
currentMetadata.getEnsembles().get(fragmentStartId)
+                        .stream().map((bookie) -> 
oldBookie2NewBookie.getOrDefault(bookie, bookie))
+                        .collect(Collectors.toList());
+                    return LedgerMetadataBuilder.from(currentMetadata)
+                        .replaceEnsembleEntry(fragmentStartId, 
newEnsemble).build();
+                },
+                lh::setLedgerMetadata);
 
-        public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
-                long fragmentStartId, LedgerHandle lh,
-                OrderedExecutor mainWorkerPool,
-                Map<BookieSocketAddress, BookieSocketAddress> 
oldBookie2NewBookie) {
-            this.ensembleUpdatedCb = ledgerFragmentsMcb;
-            this.lh = lh;
-            this.mainWorkerPool = mainWorkerPool;
-            this.fragmentStartId = fragmentStartId;
-            this.oldBookie2NewBookie = oldBookie2NewBookie;
-        }
+        updateLoop.run().whenComplete((result, ex) -> {
+                if (ex == null) {
+                    LOG.info("Updated ZK for ledgerId: ({}:{}) to point ledger 
fragments"
+                             + " from old bookies to new bookies: {}", 
oldBookie2NewBookie);
 
-        @Override
-        public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
-            if (rc == BKException.Code.MetadataVersionException) {
-                LOG.warn("Two fragments attempted update at once; ledger id: "
-                        + lh.getId() + " startid: " + fragmentStartId);
-                // try again, the previous success (with which this has
-                // conflicted) will have updated the stat other operations
-                // such as (addEnsemble) would update it too.
-                lh.rereadMetadata(new 
OrderedGenericCallback<LedgerMetadata>(mainWorkerPool, lh.getId()) {
-                            @Override
-                            public void safeOperationComplete(int rc,
-                                    LedgerMetadata newMeta) {
-                                if (rc != BKException.Code.OK) {
-                                    LOG
-                                            .error("Error reading updated 
ledger metadata for ledger "
-                                                    + lh.getId());
-                                    ensembleUpdatedCb.processResult(rc, null,
-                                            null);
-                                } else {
-                                    while (true) {
-                                        // temporary change, metadata really 
shouldn't be updated
-                                        // until the new metadata has been 
written successfully
-                                        LedgerMetadata currentMetadata = 
lh.getLedgerMetadata();
-                                        if 
(lh.setLedgerMetadata(currentMetadata, newMeta)) {
-                                            break;
-                                        }
-                                    }
-                                    updateEnsembleInfo(ensembleUpdatedCb,
-                                                       fragmentStartId, lh, 
mainWorkerPool, oldBookie2NewBookie);
-                                }
-                            }
-                            @Override
-                            public String toString() {
-                                return 
String.format("ReReadMetadataForUpdateEnsemble(%d)", lh.getId());
-                            }
-                        });
-                return;
-            } else if (rc != BKException.Code.OK) {
-                LOG.error("Error updating ledger config metadata for ledgerId 
{} : {}",
-                        lh.getId(), BKException.codeLogger(rc));
-            } else {
-                LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
-                        + fragmentStartId
-                        + ") to point ledger fragments from old bookies to new 
bookies: "
-                        + oldBookie2NewBookie);
-            }
-            /*
-             * Pass the return code result up the chain with the parent
-             * callback.
-             */
-            ensembleUpdatedCb.processResult(rc, null, null);
-        }
+                    ensembleUpdatedCb.processResult(BKException.Code.OK, null, 
null);
+                } else {
+                    LOG.error("Error updating ledger config metadata for 
ledgerId {}", lh.getId(), ex);
+
+                    ensembleUpdatedCb.processResult(
+                            BKException.getExceptionCode(ex, 
BKException.Code.UnexpectedConditionException),
+                            null, null);
+                }
+            });
     }
 }

Reply via email to