This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a031e0e Make LedgerFragmentReplicator use MetadataUpdateLoop
a031e0e is described below
commit a031e0ede1767960d191d70a6cda21990d1f541e
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Sep 6 09:34:23 2018 +0200
Make LedgerFragmentReplicator use MetadataUpdateLoop
Master Issue: #281
Author: Ivan Kelly <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1645 from ivankelly/ledger-fragment-immutable-metadata2
---
.../apache/bookkeeper/client/BookKeeperAdmin.java | 4 +-
.../client/LedgerFragmentReplicator.java | 145 ++++++---------------
2 files changed, 42 insertions(+), 107 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 59c203b..e5b5914 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -893,7 +893,7 @@ public class BookKeeperAdmin implements AutoCloseable {
try {
LedgerFragmentReplicator.SingleFragmentCallback cb
=
new
LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
-
bkc.getMainWorkerPool(),
+
bkc.getLedgerManager(),
startEntryId,
getReplacementBookiesMap(ensemble, targetBookieAddresses));
LedgerFragment ledgerFragment = new
LedgerFragment(lh,
startEntryId, endEntryId,
targetBookieAddresses.keySet());
@@ -1047,7 +1047,7 @@ public class BookKeeperAdmin implements AutoCloseable {
SingleFragmentCallback cb = new SingleFragmentCallback(
resultCallBack,
lh,
- bkc.getMainWorkerPool(),
+ bkc.getLedgerManager(),
ledgerFragment.getFirstEntryId(),
getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index eb11b30..dc7302c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -23,7 +23,6 @@ import static
org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
import io.netty.buffer.Unpooled;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,13 +32,13 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.replication.ReplicationStats;
@@ -48,7 +47,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -341,16 +339,16 @@ public class LedgerFragmentReplicator {
static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
- final OrderedExecutor mainWorkerPool;
+ final LedgerManager ledgerManager;
final long fragmentStartId;
final Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie;
SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
- LedgerHandle lh, OrderedExecutor
mainWorkerPool, long fragmentStartId,
- Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
+ LedgerHandle lh, LedgerManager ledgerManager,
long fragmentStartId,
+ Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
this.ledgerFragmentsMcb = ledgerFragmentsMcb;
this.lh = lh;
- this.mainWorkerPool = mainWorkerPool;
+ this.ledgerManager = ledgerManager;
this.fragmentStartId = fragmentStartId;
this.oldBookie2NewBookie = oldBookie2NewBookie;
}
@@ -363,7 +361,7 @@ public class LedgerFragmentReplicator {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
- updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie);
+ updateEnsembleInfo(ledgerManager, ledgerFragmentsMcb,
fragmentStartId, lh, oldBookie2NewBookie);
}
}
@@ -371,104 +369,41 @@ public class LedgerFragmentReplicator {
* Updates the ensemble with newBookie and notify the ensembleUpdatedCb.
*/
private static void updateEnsembleInfo(
- AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
- LedgerHandle lh, OrderedExecutor mainWorkerPool,
- Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie)
{
- /*
- * Update the ledger metadata's ensemble info to point to the new
- * bookie.
- */
- List<BookieSocketAddress> ensemble =
lh.getLedgerMetadata().getEnsembles().get(fragmentStartId);
- List<BookieSocketAddress> newEnsemble = new ArrayList<>(ensemble);
- for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry :
oldBookie2NewBookie.entrySet()) {
- int deadBookieIndex = newEnsemble.indexOf(entry.getKey());
- // update ensemble info might happen after re-read ledger
metadata, so the ensemble might already
- // change. if ensemble is already changed, skip replacing the
bookie doesn't exist.
- if (deadBookieIndex >= 0) {
- newEnsemble.set(deadBookieIndex, entry.getValue());
- } else {
- LOG.info("Bookie {} doesn't exist in ensemble {} anymore.",
entry.getKey(), ensemble);
- }
- }
- lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
- lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
- fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie));
- }
+ LedgerManager ledgerManager, AsyncCallback.VoidCallback
ensembleUpdatedCb, long fragmentStartId,
+ LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
- /**
- * Update the ensemble data with newBookie. re-reads the metadata on
- * MetadataVersionException and update ensemble again. On successfull
- * updation, it will also notify to super call back
- */
- private static class UpdateEnsembleCb implements
GenericCallback<LedgerMetadata> {
- final AsyncCallback.VoidCallback ensembleUpdatedCb;
- final LedgerHandle lh;
- final OrderedExecutor mainWorkerPool;
- final long fragmentStartId;
- final Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie;
+ MetadataUpdateLoop updateLoop = new MetadataUpdateLoop(
+ ledgerManager,
+ lh.getId(),
+ lh::getLedgerMetadata,
+ (metadata) -> {
+ // returns true if any of old bookies exist in ensemble
+ List<BookieSocketAddress> ensemble =
metadata.getEnsembles().get(fragmentStartId);
+ return
oldBookie2NewBookie.keySet().stream().anyMatch(ensemble::contains);
+ },
+ (currentMetadata) -> {
+ // replace all old bookies with new bookies in ensemble
+ List<BookieSocketAddress> newEnsemble =
currentMetadata.getEnsembles().get(fragmentStartId)
+ .stream().map((bookie) ->
oldBookie2NewBookie.getOrDefault(bookie, bookie))
+ .collect(Collectors.toList());
+ return LedgerMetadataBuilder.from(currentMetadata)
+ .replaceEnsembleEntry(fragmentStartId,
newEnsemble).build();
+ },
+ lh::setLedgerMetadata);
- public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
- long fragmentStartId, LedgerHandle lh,
- OrderedExecutor mainWorkerPool,
- Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
- this.ensembleUpdatedCb = ledgerFragmentsMcb;
- this.lh = lh;
- this.mainWorkerPool = mainWorkerPool;
- this.fragmentStartId = fragmentStartId;
- this.oldBookie2NewBookie = oldBookie2NewBookie;
- }
+ updateLoop.run().whenComplete((result, ex) -> {
+ if (ex == null) {
+ LOG.info("Updated ZK for ledgerId: ({}:{}) to point ledger
fragments"
+ + " from old bookies to new bookies: {}",
oldBookie2NewBookie);
- @Override
- public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
- if (rc == BKException.Code.MetadataVersionException) {
- LOG.warn("Two fragments attempted update at once; ledger id: "
- + lh.getId() + " startid: " + fragmentStartId);
- // try again, the previous success (with which this has
- // conflicted) will have updated the stat other operations
- // such as (addEnsemble) would update it too.
- lh.rereadMetadata(new
OrderedGenericCallback<LedgerMetadata>(mainWorkerPool, lh.getId()) {
- @Override
- public void safeOperationComplete(int rc,
- LedgerMetadata newMeta) {
- if (rc != BKException.Code.OK) {
- LOG
- .error("Error reading updated
ledger metadata for ledger "
- + lh.getId());
- ensembleUpdatedCb.processResult(rc, null,
- null);
- } else {
- while (true) {
- // temporary change, metadata really
shouldn't be updated
- // until the new metadata has been
written successfully
- LedgerMetadata currentMetadata =
lh.getLedgerMetadata();
- if
(lh.setLedgerMetadata(currentMetadata, newMeta)) {
- break;
- }
- }
- updateEnsembleInfo(ensembleUpdatedCb,
- fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie);
- }
- }
- @Override
- public String toString() {
- return
String.format("ReReadMetadataForUpdateEnsemble(%d)", lh.getId());
- }
- });
- return;
- } else if (rc != BKException.Code.OK) {
- LOG.error("Error updating ledger config metadata for ledgerId
{} : {}",
- lh.getId(), BKException.codeLogger(rc));
- } else {
- LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
- + fragmentStartId
- + ") to point ledger fragments from old bookies to new
bookies: "
- + oldBookie2NewBookie);
- }
- /*
- * Pass the return code result up the chain with the parent
- * callback.
- */
- ensembleUpdatedCb.processResult(rc, null, null);
- }
+ ensembleUpdatedCb.processResult(BKException.Code.OK, null,
null);
+ } else {
+ LOG.error("Error updating ledger config metadata for
ledgerId {}", lh.getId(), ex);
+
+ ensembleUpdatedCb.processResult(
+ BKException.getExceptionCode(ex,
BKException.Code.UnexpectedConditionException),
+ null, null);
+ }
+ });
}
}