This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a5b4b93 UpdateLedgerOp uses MetadataUpdateLoop
a5b4b93 is described below
commit a5b4b93d76d69caed5800a4d4a56bd3e8add92ec
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Aug 23 09:51:45 2018 +0200
UpdateLedgerOp uses MetadataUpdateLoop
Also refactored a lot of things around the operation that didn't make
sense, like the executor.
Master issue: #281
Author: Ivan Kelly <[email protected]>
Reviewers: Sijie Guo <[email protected]>
This closes #1587 from ivankelly/update-ledger-op
---
.../bookkeeper/client/LedgerMetadataBuilder.java | 18 +-
.../apache/bookkeeper/client/UpdateLedgerOp.java | 261 +++++++--------------
.../bookkeeper/client/MetadataUpdateLoopTest.java | 40 ++--
3 files changed, 125 insertions(+), 194 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 76a7088..8c37bd5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -23,10 +23,10 @@ import static
com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -41,7 +41,7 @@ class LedgerMetadataBuilder {
private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
private Optional<Long> lastEntryId = Optional.empty();
- private Map<Long, List<BookieSocketAddress>> ensembles = new HashMap<>();
+ private TreeMap<Long, List<BookieSocketAddress>> ensembles = new
TreeMap<>();
private DigestType digestType = DigestType.CRC32C;
private Optional<byte[]> password = Optional.empty();
@@ -91,14 +91,26 @@ class LedgerMetadataBuilder {
return this;
}
- LedgerMetadataBuilder withEnsembleEntry(long firstEntry,
List<BookieSocketAddress> ensemble) {
+ LedgerMetadataBuilder newEnsembleEntry(long firstEntry,
List<BookieSocketAddress> ensemble) {
checkArgument(ensemble.size() == ensembleSize,
"Size of passed in ensemble must match the ensembleSize
of the builder");
+ checkArgument(ensembles.isEmpty() || firstEntry > ensembles.lastKey(),
+ "New entry must have a first entry greater than any
existing ensemble key");
+ ensembles.put(firstEntry, ensemble);
+ return this;
+ }
+ LedgerMetadataBuilder replaceEnsembleEntry(long firstEntry,
List<BookieSocketAddress> ensemble) {
+ checkArgument(ensemble.size() == ensembleSize,
+ "Size of passed in ensemble must match the ensembleSize
of the builder");
+ checkArgument(ensembles.containsKey(firstEntry),
+ "Ensemble must replace an existing ensemble in the
ensemble map");
ensembles.put(firstEntry, ensemble);
return this;
}
+
+
LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
this.lastEntryId = Optional.of(lastEntryId);
this.state = LedgerMetadataFormat.State.CLOSED;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index b55e2d0..befd4f3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -18,33 +18,26 @@
package org.apache.bookkeeper.client;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.SettableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieShell.UpdateLedgerNotifier;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,11 +47,11 @@ import org.slf4j.LoggerFactory;
public class UpdateLedgerOp {
private static final Logger LOG =
LoggerFactory.getLogger(UpdateLedgerOp.class);
- private final BookKeeper bkc;
+ private final LedgerManager lm;
private final BookKeeperAdmin admin;
public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
- this.bkc = bkc;
+ this.lm = bkc.getLedgerManager();
this.admin = admin;
}
@@ -81,174 +74,100 @@ public class UpdateLedgerOp {
* metadata
*/
public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId,
final BookieSocketAddress newBookieId,
- final int rate, final int limit, final UpdateLedgerNotifier
progressable) throws IOException {
+ final int rate, final int limit, final
UpdateLedgerNotifier progressable)
+ throws IOException, InterruptedException {
- final ExecutorService executor = Executors
- .newSingleThreadExecutor(new
DefaultThreadFactory("UpdateLedgerThread", true));
final AtomicInteger issuedLedgerCnt = new AtomicInteger();
final AtomicInteger updatedLedgerCnt = new AtomicInteger();
- final Future<?> updateBookieCb = executor.submit(new Runnable() {
-
- @Override
- public void run() {
- updateLedgers(oldBookieId, newBookieId, rate, limit,
progressable);
- }
-
- private void updateLedgers(final BookieSocketAddress oldBookieId,
final BookieSocketAddress newBookieId,
- final int rate, final int limit, final
UpdateLedgerNotifier progressable) {
- try {
- final AtomicBoolean stop = new AtomicBoolean(false);
- final Set<Long> outstandings =
Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
- final RateLimiter throttler = RateLimiter.create(rate);
- final Iterator<Long> ledgerItr =
admin.listLedgers().iterator();
- final CountDownLatch syncObj = new CountDownLatch(1);
-
- // iterate through all the ledgers
- while (ledgerItr.hasNext() && !stop.get()) {
- // throttler to control updates per second
- throttler.acquire();
-
- final Long lId = ledgerItr.next();
- final ReadLedgerMetadataCb readCb = new
ReadLedgerMetadataCb(bkc, lId, oldBookieId,
- newBookieId);
- outstandings.add(lId);
-
- FutureCallback<Void> updateLedgerCb = new
UpdateLedgerCb(lId, stop, issuedLedgerCnt,
- updatedLedgerCnt, outstandings, syncObj,
progressable);
- Futures.addCallback(readCb.getFutureListener(),
updateLedgerCb);
-
- issuedLedgerCnt.incrementAndGet();
- if (limit != Integer.MIN_VALUE &&
issuedLedgerCnt.get() >= limit || !ledgerItr.hasNext()) {
- stop.set(true);
+ final CompletableFuture<Void> finalPromise = new CompletableFuture<>();
+ final Set<CompletableFuture<?>> outstanding =
+ Collections.newSetFromMap(new
ConcurrentHashMap<CompletableFuture<?>, Boolean>());
+ final RateLimiter throttler = RateLimiter.create(rate);
+ final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
+
+ // iterate through all the ledgers
+ while (ledgerItr.hasNext() && !finalPromise.isDone()
+ && (limit == Integer.MIN_VALUE || issuedLedgerCnt.get() <
limit)) {
+ // throttler to control updates per second
+ throttler.acquire();
+
+ final long ledgerId = ledgerItr.next();
+ issuedLedgerCnt.incrementAndGet();
+
+ GenericCallbackFuture<LedgerMetadata> readPromise = new
GenericCallbackFuture<>();
+ lm.readLedgerMetadata(ledgerId, readPromise);
+ CompletableFuture<LedgerMetadata> writePromise =
readPromise.thenCompose((readMetadata) -> {
+ AtomicReference<LedgerMetadata> ref = new
AtomicReference<>(readMetadata);
+ return new MetadataUpdateLoop(
+ lm, ledgerId,
+ ref::get,
+ (metadata) -> {
+ return
metadata.getEnsembles().values().stream()
+ .flatMap(Collection::stream)
+ .filter(b -> b.equals(oldBookieId))
+ .count() > 0;
+ },
+ (metadata) -> {
+ return replaceBookieInEnsembles(metadata,
oldBookieId, newBookieId);
+ },
+ ref::compareAndSet).run();
+ });
+
+ outstanding.add(writePromise);
+ writePromise.whenComplete((metadata, ex) -> {
+ if (ex != null
+ && !(ex instanceof
BKException.BKNoSuchLedgerExistsException)) {
+ String error = String.format("Failed to update
ledger metadata %s, replacing %s with %s",
+ ledgerId,
oldBookieId, newBookieId);
+ LOG.error(error, ex);
+ finalPromise.completeExceptionally(new
IOException(error, ex));
+ } else {
+ LOG.info("Updated ledger {} metadata, replacing {}
with {}",
+ ledgerId, oldBookieId, newBookieId);
+
+ updatedLedgerCnt.incrementAndGet();
+ progressable.progress(updatedLedgerCnt.get(),
issuedLedgerCnt.get());
}
- bkc.getLedgerManager().readLedgerMetadata(lId, readCb);
- }
- // waiting till all the issued ledgers are finished
- syncObj.await();
- } catch (IOException ioe) {
- LOG.error("Exception while updating ledger", ioe);
- throw new RuntimeException("Exception while updating
ledger", ioe.getCause());
- } catch (InterruptedException ie) {
- LOG.error("Exception while updating ledger metadata", ie);
- Thread.currentThread().interrupt();
- throw new RuntimeException("Exception while updating
ledger", ie.getCause());
- }
- }
- });
- try {
- // Wait to finish the issued ledgers.
- updateBookieCb.get();
- } catch (ExecutionException ee) {
- throw new IOException("Exception while updating ledger", ee);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Exception while updating ledger", ie);
- } finally {
- executor.shutdown();
- }
- }
-
- private static final class UpdateLedgerCb implements FutureCallback<Void> {
- final long ledgerId;
- final AtomicBoolean stop;
- final AtomicInteger issuedLedgerCnt;
- final AtomicInteger updatedLedgerCnt;
- final Set<Long> outstandings;
- final CountDownLatch syncObj;
- final UpdateLedgerNotifier progressable;
-
- public UpdateLedgerCb(long ledgerId, AtomicBoolean stop, AtomicInteger
issuedLedgerCnt,
- AtomicInteger updatedLedgerCnt, Set<Long> outstandings,
CountDownLatch syncObj,
- UpdateLedgerNotifier progressable) {
- this.ledgerId = ledgerId;
- this.stop = stop;
- this.issuedLedgerCnt = issuedLedgerCnt;
- this.updatedLedgerCnt = updatedLedgerCnt;
- this.outstandings = outstandings;
- this.syncObj = syncObj;
- this.progressable = progressable;
- }
-
- @Override
- public void onFailure(Throwable th) {
- LOG.error("Error updating ledger {}", ledgerId, th);
- stop.set(true);
- finishUpdateLedger();
+ outstanding.remove(writePromise);
+ });
}
- @Override
- public void onSuccess(Void obj) {
- updatedLedgerCnt.incrementAndGet();
- // may print progress
- progressable.progress(updatedLedgerCnt.get(),
issuedLedgerCnt.get());
- finishUpdateLedger();
- }
+
CompletableFuture.allOf(outstanding.stream().toArray(CompletableFuture[]::new))
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ finalPromise.completeExceptionally(ex);
+ } else {
+ finalPromise.complete(null);
+ }
+ });
- private void finishUpdateLedger() {
- outstandings.remove(ledgerId);
- if (outstandings.isEmpty() && stop.get()) {
- LOG.info("Total number of ledgers issued={} updated={}",
issuedLedgerCnt.get(), updatedLedgerCnt.get());
- syncObj.countDown();
+ try {
+ finalPromise.get();
+ LOG.info("Total number of ledgers issued={} updated={}",
+ issuedLedgerCnt.get(), updatedLedgerCnt.get());
+ } catch (ExecutionException e) {
+ String error = String.format("Error waiting for ledger metadata
updates to complete (replacing %s with %s)",
+ oldBookieId, newBookieId);
+ LOG.info(error, e);
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(error, e);
}
}
}
- private static final class ReadLedgerMetadataCb implements
GenericCallback<LedgerMetadata> {
- final BookKeeper bkc;
- final Long ledgerId;
- final BookieSocketAddress curBookieAddr;
- final BookieSocketAddress toBookieAddr;
- SettableFuture<Void> future = SettableFuture.create();
- public ReadLedgerMetadataCb(BookKeeper bkc, Long ledgerId,
BookieSocketAddress curBookieAddr,
- BookieSocketAddress toBookieAddr) {
- this.bkc = bkc;
- this.ledgerId = ledgerId;
- this.curBookieAddr = curBookieAddr;
- this.toBookieAddr = toBookieAddr;
- }
-
- ListenableFuture<Void> getFutureListener() {
- return future;
+ private static LedgerMetadata replaceBookieInEnsembles(LedgerMetadata
metadata,
+ BookieSocketAddress
oldBookieId,
+ BookieSocketAddress
newBookieId) {
+ LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
+ for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e :
metadata.getEnsembles().entrySet()) {
+ List<BookieSocketAddress> newEnsemble = e.getValue().stream()
+ .map(b -> b.equals(oldBookieId) ? newBookieId : b)
+ .collect(Collectors.toList());
+ builder.replaceEnsembleEntry(e.getKey(), newEnsemble);
}
- @Override
- public void operationComplete(int rc, LedgerMetadata metadata) {
- if (BKException.Code.NoSuchLedgerExistsException == rc) {
- future.set(null);
- return; // this is OK
- } else if (BKException.Code.OK != rc) {
- // open ledger failed.
- LOG.error("Get ledger metadata {} failed: {}", ledgerId,
BKException.codeLogger(rc));
- future.setException(BKException.create(rc));
- return;
- }
- boolean updateEnsemble = false;
- for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e :
metadata.getEnsembles().entrySet()) {
- List<BookieSocketAddress> newEnsemble = new
ArrayList<>(e.getValue());
- int index = newEnsemble.indexOf(curBookieAddr);
- if (-1 != index) {
- newEnsemble.set(index, toBookieAddr);
- metadata.updateEnsemble(e.getKey(), newEnsemble);
- updateEnsemble = true;
- }
- }
- if (!updateEnsemble) {
- future.set(null);
- return; // ledger doesn't contains the given curBookieId
- }
- final GenericCallback<LedgerMetadata> writeCb = new
GenericCallback<LedgerMetadata>() {
- @Override
- public void operationComplete(int rc, LedgerMetadata result) {
- if (rc != BKException.Code.OK) {
- // metadata update failed
- LOG.error("Ledger {} metadata update failed. Error
code {}", ledgerId, rc);
- future.setException(BKException.create(rc));
- return;
- }
- future.set(null);
- }
- };
- bkc.getLedgerManager().writeLedgerMetadata(ledgerId, metadata,
writeCb);
- }
+ return builder.build();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index cb1dbd2..5ed75ce 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -66,12 +66,12 @@ public class MetadataUpdateLoopTest {
public void testBasicUpdate() throws Exception {
try (LedgerManager lm = new MockLedgerManager()) {
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(5)
- .withEnsembleEntry(0L, Lists.newArrayList(
- new
BookieSocketAddress("0.0.0.0:3181"),
- new
BookieSocketAddress("0.0.0.1:3181"),
- new
BookieSocketAddress("0.0.0.2:3181"),
- new
BookieSocketAddress("0.0.0.3:3181"),
- new
BookieSocketAddress("0.0.0.4:3181"))).build();
+ .newEnsembleEntry(0L, Lists.newArrayList(
+ new
BookieSocketAddress("0.0.0.0:3181"),
+ new
BookieSocketAddress("0.0.0.1:3181"),
+ new
BookieSocketAddress("0.0.0.2:3181"),
+ new
BookieSocketAddress("0.0.0.3:3181"),
+ new
BookieSocketAddress("0.0.0.4:3181"))).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
long ledgerId = 1234L;
lm.createLedgerMetadata(ledgerId, initMeta, promise);
@@ -88,7 +88,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, newAddress);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet);
loop.run().get();
@@ -114,7 +114,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+ .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
lm.createLedgerMetadata(ledgerId, initMeta, promise);
LedgerMetadata writtenMetadata = promise.get();
@@ -128,7 +128,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, b2);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference1::compareAndSet).run();
@@ -141,7 +141,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(1, b3);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference2::compareAndSet).run();
@@ -181,7 +181,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+ .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
lm.createLedgerMetadata(ledgerId, initMeta, promise);
LedgerMetadata writtenMetadata = promise.get();
@@ -196,7 +196,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, b2);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run();
CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
@@ -207,7 +207,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, b2);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run();
@@ -237,7 +237,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+ .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
lm.createLedgerMetadata(ledgerId, initMeta, promise);
LedgerMetadata writtenMetadata = promise.get();
@@ -252,7 +252,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, b2);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run();
@@ -265,7 +265,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(1, b3);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run();
Assert.assertEquals(loop2.get(), reference.get());
@@ -305,7 +305,7 @@ public class MetadataUpdateLoopTest {
.collect(Collectors.toList());
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
- .withEnsembleEntry(0L, initialEnsemble).build();
+ .newEnsembleEntry(0L, initialEnsemble).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
lm.createLedgerMetadata(ledgerId, initMeta, promise);
LedgerMetadata writtenMetadata = promise.get();
@@ -325,7 +325,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(i, replacementBookies.get(i));
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run())
.collect(Collectors.toList());
@@ -350,7 +350,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(1)
- .withEnsembleEntry(0L, Lists.newArrayList(b0)).build();
+ .newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
GenericCallbackFuture<LedgerMetadata> promise = new
GenericCallbackFuture<>();
lm.createLedgerMetadata(ledgerId, initMeta, promise);
LedgerMetadata writtenMetadata = promise.get();
@@ -377,7 +377,7 @@ public class MetadataUpdateLoopTest {
(currentMetadata) -> {
List<BookieSocketAddress> ensemble =
Lists.newArrayList(currentMetadata.getEnsemble(0L));
ensemble.set(0, b1);
- return
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L,
ensemble).build();
+ return
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L,
ensemble).build();
},
reference::compareAndSet).run();
lm.releaseWrites();