ivankelly closed pull request #1645: Make LedgerFragmentReplicator use
MetadataUpdateLoop
URL: https://github.com/apache/bookkeeper/pull/1645
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 59c203ba0f..e5b5914354 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -893,7 +893,7 @@ public void processResult(int rc, String path, Object ctx) {
try {
LedgerFragmentReplicator.SingleFragmentCallback cb
=
new
LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
-
bkc.getMainWorkerPool(),
+
bkc.getLedgerManager(),
startEntryId,
getReplacementBookiesMap(ensemble, targetBookieAddresses));
LedgerFragment ledgerFragment = new
LedgerFragment(lh,
startEntryId, endEntryId,
targetBookieAddresses.keySet());
@@ -1047,7 +1047,7 @@ private void replicateLedgerFragment(LedgerHandle lh,
SingleFragmentCallback cb = new SingleFragmentCallback(
resultCallBack,
lh,
- bkc.getMainWorkerPool(),
+ bkc.getLedgerManager(),
ledgerFragment.getFirstEntryId(),
getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index eb11b30c5a..dc7302c46f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -23,7 +23,6 @@
import io.netty.buffer.Unpooled;
-import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,13 +32,13 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.api.WriteFlag;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.replication.ReplicationStats;
@@ -48,7 +47,6 @@
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -341,16 +339,16 @@ public void readComplete(int rc, LedgerHandle lh,
static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
- final OrderedExecutor mainWorkerPool;
+ final LedgerManager ledgerManager;
final long fragmentStartId;
final Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie;
SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
- LedgerHandle lh, OrderedExecutor
mainWorkerPool, long fragmentStartId,
- Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
+ LedgerHandle lh, LedgerManager ledgerManager,
long fragmentStartId,
+ Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
this.ledgerFragmentsMcb = ledgerFragmentsMcb;
this.lh = lh;
- this.mainWorkerPool = mainWorkerPool;
+ this.ledgerManager = ledgerManager;
this.fragmentStartId = fragmentStartId;
this.oldBookie2NewBookie = oldBookie2NewBookie;
}
@@ -363,7 +361,7 @@ public void processResult(int rc, String path, Object ctx) {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
- updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie);
+ updateEnsembleInfo(ledgerManager, ledgerFragmentsMcb,
fragmentStartId, lh, oldBookie2NewBookie);
}
}
@@ -371,104 +369,41 @@ public void processResult(int rc, String path, Object
ctx) {
* Updates the ensemble with newBookie and notify the ensembleUpdatedCb.
*/
private static void updateEnsembleInfo(
- AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
- LedgerHandle lh, OrderedExecutor mainWorkerPool,
- Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie)
{
- /*
- * Update the ledger metadata's ensemble info to point to the new
- * bookie.
- */
- List<BookieSocketAddress> ensemble =
lh.getLedgerMetadata().getEnsembles().get(fragmentStartId);
- List<BookieSocketAddress> newEnsemble = new ArrayList<>(ensemble);
- for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry :
oldBookie2NewBookie.entrySet()) {
- int deadBookieIndex = newEnsemble.indexOf(entry.getKey());
- // update ensemble info might happen after re-read ledger
metadata, so the ensemble might already
- // change. if ensemble is already changed, skip replacing the
bookie doesn't exist.
- if (deadBookieIndex >= 0) {
- newEnsemble.set(deadBookieIndex, entry.getValue());
- } else {
- LOG.info("Bookie {} doesn't exist in ensemble {} anymore.",
entry.getKey(), ensemble);
- }
- }
- lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
- lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
- fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie));
- }
+ LedgerManager ledgerManager, AsyncCallback.VoidCallback
ensembleUpdatedCb, long fragmentStartId,
+ LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
- /**
- * Update the ensemble data with newBookie. re-reads the metadata on
- * MetadataVersionException and update ensemble again. On successfull
- * updation, it will also notify to super call back
- */
- private static class UpdateEnsembleCb implements
GenericCallback<LedgerMetadata> {
- final AsyncCallback.VoidCallback ensembleUpdatedCb;
- final LedgerHandle lh;
- final OrderedExecutor mainWorkerPool;
- final long fragmentStartId;
- final Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie;
+ MetadataUpdateLoop updateLoop = new MetadataUpdateLoop(
+ ledgerManager,
+ lh.getId(),
+ lh::getLedgerMetadata,
+ (metadata) -> {
+ // returns true if any of old bookies exist in ensemble
+ List<BookieSocketAddress> ensemble =
metadata.getEnsembles().get(fragmentStartId);
+ return
oldBookie2NewBookie.keySet().stream().anyMatch(ensemble::contains);
+ },
+ (currentMetadata) -> {
+ // replace all old bookies with new bookies in ensemble
+ List<BookieSocketAddress> newEnsemble =
currentMetadata.getEnsembles().get(fragmentStartId)
+ .stream().map((bookie) ->
oldBookie2NewBookie.getOrDefault(bookie, bookie))
+ .collect(Collectors.toList());
+ return LedgerMetadataBuilder.from(currentMetadata)
+ .replaceEnsembleEntry(fragmentStartId,
newEnsemble).build();
+ },
+ lh::setLedgerMetadata);
- public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
- long fragmentStartId, LedgerHandle lh,
- OrderedExecutor mainWorkerPool,
- Map<BookieSocketAddress, BookieSocketAddress>
oldBookie2NewBookie) {
- this.ensembleUpdatedCb = ledgerFragmentsMcb;
- this.lh = lh;
- this.mainWorkerPool = mainWorkerPool;
- this.fragmentStartId = fragmentStartId;
- this.oldBookie2NewBookie = oldBookie2NewBookie;
- }
+ updateLoop.run().whenComplete((result, ex) -> {
+ if (ex == null) {
+ LOG.info("Updated ZK for ledgerId: ({}:{}) to point ledger
fragments"
+ + " from old bookies to new bookies: {}",
oldBookie2NewBookie);
- @Override
- public void operationComplete(int rc, LedgerMetadata writtenMetadata) {
- if (rc == BKException.Code.MetadataVersionException) {
- LOG.warn("Two fragments attempted update at once; ledger id: "
- + lh.getId() + " startid: " + fragmentStartId);
- // try again, the previous success (with which this has
- // conflicted) will have updated the stat other operations
- // such as (addEnsemble) would update it too.
- lh.rereadMetadata(new
OrderedGenericCallback<LedgerMetadata>(mainWorkerPool, lh.getId()) {
- @Override
- public void safeOperationComplete(int rc,
- LedgerMetadata newMeta) {
- if (rc != BKException.Code.OK) {
- LOG
- .error("Error reading updated
ledger metadata for ledger "
- + lh.getId());
- ensembleUpdatedCb.processResult(rc, null,
- null);
- } else {
- while (true) {
- // temporary change, metadata really
shouldn't be updated
- // until the new metadata has been
written successfully
- LedgerMetadata currentMetadata =
lh.getLedgerMetadata();
- if
(lh.setLedgerMetadata(currentMetadata, newMeta)) {
- break;
- }
- }
- updateEnsembleInfo(ensembleUpdatedCb,
- fragmentStartId, lh,
mainWorkerPool, oldBookie2NewBookie);
- }
- }
- @Override
- public String toString() {
- return
String.format("ReReadMetadataForUpdateEnsemble(%d)", lh.getId());
- }
- });
- return;
- } else if (rc != BKException.Code.OK) {
- LOG.error("Error updating ledger config metadata for ledgerId
{} : {}",
- lh.getId(), BKException.codeLogger(rc));
- } else {
- LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
- + fragmentStartId
- + ") to point ledger fragments from old bookies to new
bookies: "
- + oldBookie2NewBookie);
- }
- /*
- * Pass the return code result up the chain with the parent
- * callback.
- */
- ensembleUpdatedCb.processResult(rc, null, null);
- }
+ ensembleUpdatedCb.processResult(BKException.Code.OK, null,
null);
+ } else {
+ LOG.error("Error updating ledger config metadata for
ledgerId {}", lh.getId(), ex);
+
+ ensembleUpdatedCb.processResult(
+ BKException.getExceptionCode(ex,
BKException.Code.UnexpectedConditionException),
+ null, null);
+ }
+ });
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services