This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a5b4b93  UpdateLedgerOp uses MetadataUpdateLoop
a5b4b93 is described below

commit a5b4b93d76d69caed5800a4d4a56bd3e8add92ec
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Aug 23 09:51:45 2018 +0200

    UpdateLedgerOp uses MetadataUpdateLoop
    
    Also refactored a lot of things around the operation that didn't make
    sense, like the executor.
    
    Master issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>
    
    This closes #1587 from ivankelly/update-ledger-op
---
 .../bookkeeper/client/LedgerMetadataBuilder.java   |  18 +-
 .../apache/bookkeeper/client/UpdateLedgerOp.java   | 261 +++++++--------------
 .../bookkeeper/client/MetadataUpdateLoopTest.java  |  40 ++--
 3 files changed, 125 insertions(+), 194 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 76a7088..8c37bd5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -23,10 +23,10 @@ import static 
com.google.common.base.Preconditions.checkState;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeMap;
 
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -41,7 +41,7 @@ class LedgerMetadataBuilder {
     private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
     private Optional<Long> lastEntryId = Optional.empty();
 
-    private Map<Long, List<BookieSocketAddress>> ensembles = new HashMap<>();
+    private TreeMap<Long, List<BookieSocketAddress>> ensembles = new 
TreeMap<>();
 
     private DigestType digestType = DigestType.CRC32C;
     private Optional<byte[]> password = Optional.empty();
@@ -91,14 +91,26 @@ class LedgerMetadataBuilder {
         return this;
     }
 
-    LedgerMetadataBuilder withEnsembleEntry(long firstEntry, 
List<BookieSocketAddress> ensemble) {
+    LedgerMetadataBuilder newEnsembleEntry(long firstEntry, 
List<BookieSocketAddress> ensemble) {
         checkArgument(ensemble.size() == ensembleSize,
                       "Size of passed in ensemble must match the ensembleSize 
of the builder");
+        checkArgument(ensembles.isEmpty() || firstEntry > ensembles.lastKey(),
+                      "New entry must have a first entry greater than any 
existing ensemble key");
+        ensembles.put(firstEntry, ensemble);
+        return this;
+    }
 
+    LedgerMetadataBuilder replaceEnsembleEntry(long firstEntry, 
List<BookieSocketAddress> ensemble) {
+        checkArgument(ensemble.size() == ensembleSize,
+                      "Size of passed in ensemble must match the ensembleSize 
of the builder");
+        checkArgument(ensembles.containsKey(firstEntry),
+                      "Ensemble must replace an existing ensemble in the 
ensemble map");
         ensembles.put(firstEntry, ensemble);
         return this;
     }
 
+
+
     LedgerMetadataBuilder closingAtEntry(long lastEntryId) {
         this.lastEntryId = Optional.of(lastEntryId);
         this.state = LedgerMetadataFormat.State.CLOSED;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index b55e2d0..befd4f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -18,33 +18,26 @@
 
 package org.apache.bookkeeper.client;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.SettableFuture;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.BookieShell.UpdateLedgerNotifier;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,11 +47,11 @@ import org.slf4j.LoggerFactory;
 public class UpdateLedgerOp {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(UpdateLedgerOp.class);
-    private final BookKeeper bkc;
+    private final LedgerManager lm;
     private final BookKeeperAdmin admin;
 
     public UpdateLedgerOp(final BookKeeper bkc, final BookKeeperAdmin admin) {
-        this.bkc = bkc;
+        this.lm = bkc.getLedgerManager();
         this.admin = admin;
     }
 
@@ -81,174 +74,100 @@ public class UpdateLedgerOp {
      *             metadata
      */
     public void updateBookieIdInLedgers(final BookieSocketAddress oldBookieId, 
final BookieSocketAddress newBookieId,
-            final int rate, final int limit, final UpdateLedgerNotifier 
progressable) throws IOException {
+                                        final int rate, final int limit, final 
UpdateLedgerNotifier progressable)
+            throws IOException, InterruptedException {
 
-        final ExecutorService executor = Executors
-                .newSingleThreadExecutor(new 
DefaultThreadFactory("UpdateLedgerThread", true));
         final AtomicInteger issuedLedgerCnt = new AtomicInteger();
         final AtomicInteger updatedLedgerCnt = new AtomicInteger();
-        final Future<?> updateBookieCb = executor.submit(new Runnable() {
-
-            @Override
-            public void run() {
-                updateLedgers(oldBookieId, newBookieId, rate, limit, 
progressable);
-            }
-
-            private void updateLedgers(final BookieSocketAddress oldBookieId, 
final BookieSocketAddress newBookieId,
-                    final int rate, final int limit, final 
UpdateLedgerNotifier progressable) {
-                try {
-                    final AtomicBoolean stop = new AtomicBoolean(false);
-                    final Set<Long> outstandings = 
Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
-                    final RateLimiter throttler = RateLimiter.create(rate);
-                    final Iterator<Long> ledgerItr = 
admin.listLedgers().iterator();
-                    final CountDownLatch syncObj = new CountDownLatch(1);
-
-                    // iterate through all the ledgers
-                    while (ledgerItr.hasNext() && !stop.get()) {
-                        // throttler to control updates per second
-                        throttler.acquire();
-
-                        final Long lId = ledgerItr.next();
-                        final ReadLedgerMetadataCb readCb = new 
ReadLedgerMetadataCb(bkc, lId, oldBookieId,
-                                newBookieId);
-                        outstandings.add(lId);
-
-                        FutureCallback<Void> updateLedgerCb = new 
UpdateLedgerCb(lId, stop, issuedLedgerCnt,
-                                updatedLedgerCnt, outstandings, syncObj, 
progressable);
-                        Futures.addCallback(readCb.getFutureListener(), 
updateLedgerCb);
-
-                        issuedLedgerCnt.incrementAndGet();
-                        if (limit != Integer.MIN_VALUE && 
issuedLedgerCnt.get() >= limit || !ledgerItr.hasNext()) {
-                            stop.set(true);
+        final CompletableFuture<Void> finalPromise = new CompletableFuture<>();
+        final Set<CompletableFuture<?>> outstanding =
+            Collections.newSetFromMap(new 
ConcurrentHashMap<CompletableFuture<?>, Boolean>());
+        final RateLimiter throttler = RateLimiter.create(rate);
+        final Iterator<Long> ledgerItr = admin.listLedgers().iterator();
+
+        // iterate through all the ledgers
+        while (ledgerItr.hasNext() && !finalPromise.isDone()
+               && (limit == Integer.MIN_VALUE || issuedLedgerCnt.get() < 
limit)) {
+            // throttler to control updates per second
+            throttler.acquire();
+
+            final long ledgerId = ledgerItr.next();
+            issuedLedgerCnt.incrementAndGet();
+
+            GenericCallbackFuture<LedgerMetadata> readPromise = new 
GenericCallbackFuture<>();
+            lm.readLedgerMetadata(ledgerId, readPromise);
+            CompletableFuture<LedgerMetadata> writePromise = 
readPromise.thenCompose((readMetadata) -> {
+                    AtomicReference<LedgerMetadata> ref = new 
AtomicReference<>(readMetadata);
+                    return new MetadataUpdateLoop(
+                            lm, ledgerId,
+                            ref::get,
+                            (metadata) -> {
+                                return 
metadata.getEnsembles().values().stream()
+                                    .flatMap(Collection::stream)
+                                    .filter(b -> b.equals(oldBookieId))
+                                    .count() > 0;
+                            },
+                            (metadata) -> {
+                                return replaceBookieInEnsembles(metadata, 
oldBookieId, newBookieId);
+                            },
+                            ref::compareAndSet).run();
+                });
+
+            outstanding.add(writePromise);
+            writePromise.whenComplete((metadata, ex) -> {
+                        if (ex != null
+                            && !(ex instanceof 
BKException.BKNoSuchLedgerExistsException)) {
+                            String error = String.format("Failed to update 
ledger metadata %s, replacing %s with %s",
+                                                         ledgerId, 
oldBookieId, newBookieId);
+                            LOG.error(error, ex);
+                            finalPromise.completeExceptionally(new 
IOException(error, ex));
+                        } else {
+                            LOG.info("Updated ledger {} metadata, replacing {} 
with {}",
+                                     ledgerId, oldBookieId, newBookieId);
+
+                            updatedLedgerCnt.incrementAndGet();
+                            progressable.progress(updatedLedgerCnt.get(), 
issuedLedgerCnt.get());
                         }
-                        bkc.getLedgerManager().readLedgerMetadata(lId, readCb);
-                    }
-                    // waiting till all the issued ledgers are finished
-                    syncObj.await();
-                } catch (IOException ioe) {
-                    LOG.error("Exception while updating ledger", ioe);
-                    throw new RuntimeException("Exception while updating 
ledger", ioe.getCause());
-                } catch (InterruptedException ie) {
-                    LOG.error("Exception while updating ledger metadata", ie);
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Exception while updating 
ledger", ie.getCause());
-                }
-            }
-        });
-        try {
-            // Wait to finish the issued ledgers.
-            updateBookieCb.get();
-        } catch (ExecutionException ee) {
-            throw new IOException("Exception while updating ledger", ee);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Exception while updating ledger", ie);
-        } finally {
-            executor.shutdown();
-        }
-    }
-
-    private static final class UpdateLedgerCb implements FutureCallback<Void> {
-        final long ledgerId;
-        final AtomicBoolean stop;
-        final AtomicInteger issuedLedgerCnt;
-        final AtomicInteger updatedLedgerCnt;
-        final Set<Long> outstandings;
-        final CountDownLatch syncObj;
-        final UpdateLedgerNotifier progressable;
-
-        public UpdateLedgerCb(long ledgerId, AtomicBoolean stop, AtomicInteger 
issuedLedgerCnt,
-                AtomicInteger updatedLedgerCnt, Set<Long> outstandings, 
CountDownLatch syncObj,
-                UpdateLedgerNotifier progressable) {
-            this.ledgerId = ledgerId;
-            this.stop = stop;
-            this.issuedLedgerCnt = issuedLedgerCnt;
-            this.updatedLedgerCnt = updatedLedgerCnt;
-            this.outstandings = outstandings;
-            this.syncObj = syncObj;
-            this.progressable = progressable;
-        }
-
-        @Override
-        public void onFailure(Throwable th) {
-            LOG.error("Error updating ledger {}", ledgerId, th);
-            stop.set(true);
-            finishUpdateLedger();
+                        outstanding.remove(writePromise);
+                    });
         }
 
-        @Override
-        public void onSuccess(Void obj) {
-            updatedLedgerCnt.incrementAndGet();
-            // may print progress
-            progressable.progress(updatedLedgerCnt.get(), 
issuedLedgerCnt.get());
-            finishUpdateLedger();
-        }
+        
CompletableFuture.allOf(outstanding.stream().toArray(CompletableFuture[]::new))
+            .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        finalPromise.completeExceptionally(ex);
+                    } else {
+                        finalPromise.complete(null);
+                    }
+                });
 
-        private void finishUpdateLedger() {
-            outstandings.remove(ledgerId);
-            if (outstandings.isEmpty() && stop.get()) {
-                LOG.info("Total number of ledgers issued={} updated={}", 
issuedLedgerCnt.get(), updatedLedgerCnt.get());
-                syncObj.countDown();
+        try {
+            finalPromise.get();
+            LOG.info("Total number of ledgers issued={} updated={}",
+                     issuedLedgerCnt.get(), updatedLedgerCnt.get());
+        } catch (ExecutionException e) {
+            String error = String.format("Error waiting for ledger metadata 
updates to complete (replacing %s with %s)",
+                                         oldBookieId, newBookieId);
+            LOG.info(error, e);
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException(error, e);
             }
         }
     }
 
-    private static final class ReadLedgerMetadataCb implements 
GenericCallback<LedgerMetadata> {
-        final BookKeeper bkc;
-        final Long ledgerId;
-        final BookieSocketAddress curBookieAddr;
-        final BookieSocketAddress toBookieAddr;
-        SettableFuture<Void> future = SettableFuture.create();
-        public ReadLedgerMetadataCb(BookKeeper bkc, Long ledgerId, 
BookieSocketAddress curBookieAddr,
-                BookieSocketAddress toBookieAddr) {
-            this.bkc = bkc;
-            this.ledgerId = ledgerId;
-            this.curBookieAddr = curBookieAddr;
-            this.toBookieAddr = toBookieAddr;
-        }
-
-        ListenableFuture<Void> getFutureListener() {
-            return future;
+    private static LedgerMetadata replaceBookieInEnsembles(LedgerMetadata 
metadata,
+                                                           BookieSocketAddress 
oldBookieId,
+                                                           BookieSocketAddress 
newBookieId) {
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
metadata.getEnsembles().entrySet()) {
+            List<BookieSocketAddress> newEnsemble = e.getValue().stream()
+                .map(b -> b.equals(oldBookieId) ? newBookieId : b)
+                .collect(Collectors.toList());
+            builder.replaceEnsembleEntry(e.getKey(), newEnsemble);
         }
 
-        @Override
-        public void operationComplete(int rc, LedgerMetadata metadata) {
-            if (BKException.Code.NoSuchLedgerExistsException == rc) {
-                future.set(null);
-                return; // this is OK
-            } else if (BKException.Code.OK != rc) {
-                // open ledger failed.
-                LOG.error("Get ledger metadata {} failed: {}", ledgerId, 
BKException.codeLogger(rc));
-                future.setException(BKException.create(rc));
-                return;
-            }
-            boolean updateEnsemble = false;
-            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
metadata.getEnsembles().entrySet()) {
-                List<BookieSocketAddress> newEnsemble = new 
ArrayList<>(e.getValue());
-                int index = newEnsemble.indexOf(curBookieAddr);
-                if (-1 != index) {
-                    newEnsemble.set(index, toBookieAddr);
-                    metadata.updateEnsemble(e.getKey(), newEnsemble);
-                    updateEnsemble = true;
-                }
-            }
-            if (!updateEnsemble) {
-                future.set(null);
-                return; // ledger doesn't contains the given curBookieId
-            }
-            final GenericCallback<LedgerMetadata> writeCb = new 
GenericCallback<LedgerMetadata>() {
-                @Override
-                public void operationComplete(int rc, LedgerMetadata result) {
-                    if (rc != BKException.Code.OK) {
-                        // metadata update failed
-                        LOG.error("Ledger {} metadata update failed. Error 
code {}", ledgerId, rc);
-                        future.setException(BKException.create(rc));
-                        return;
-                    }
-                    future.set(null);
-                }
-            };
-            bkc.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, 
writeCb);
-        }
+        return builder.build();
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index cb1dbd2..5ed75ce 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -66,12 +66,12 @@ public class MetadataUpdateLoopTest {
     public void testBasicUpdate() throws Exception {
         try (LedgerManager lm = new MockLedgerManager()) {
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(5)
-                .withEnsembleEntry(0L, Lists.newArrayList(
-                                           new 
BookieSocketAddress("0.0.0.0:3181"),
-                                           new 
BookieSocketAddress("0.0.0.1:3181"),
-                                           new 
BookieSocketAddress("0.0.0.2:3181"),
-                                           new 
BookieSocketAddress("0.0.0.3:3181"),
-                                           new 
BookieSocketAddress("0.0.0.4:3181"))).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(
+                                          new 
BookieSocketAddress("0.0.0.0:3181"),
+                                          new 
BookieSocketAddress("0.0.0.1:3181"),
+                                          new 
BookieSocketAddress("0.0.0.2:3181"),
+                                          new 
BookieSocketAddress("0.0.0.3:3181"),
+                                          new 
BookieSocketAddress("0.0.0.4:3181"))).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             long ledgerId = 1234L;
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
@@ -88,7 +88,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, newAddress);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet);
             loop.run().get();
@@ -114,7 +114,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -128,7 +128,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference1::compareAndSet).run();
 
@@ -141,7 +141,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(1, b3);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference2::compareAndSet).run();
 
@@ -181,7 +181,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
 
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -196,7 +196,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run();
             CompletableFuture<LedgerMetadata> loop2 = new MetadataUpdateLoop(
@@ -207,7 +207,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run();
 
@@ -237,7 +237,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(2)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -252,7 +252,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b2);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run();
 
@@ -265,7 +265,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(1, b3);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run();
             Assert.assertEquals(loop2.get(), reference.get());
@@ -305,7 +305,7 @@ public class MetadataUpdateLoopTest {
                 .collect(Collectors.toList());
 
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
-                .withEnsembleEntry(0L, initialEnsemble).build();
+                .newEnsembleEntry(0L, initialEnsemble).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -325,7 +325,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(i, replacementBookies.get(i));
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run())
                 .collect(Collectors.toList());
@@ -350,7 +350,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
 
             LedgerMetadata initMeta = 
LedgerMetadataBuilder.create().withEnsembleSize(1)
-                .withEnsembleEntry(0L, Lists.newArrayList(b0)).build();
+                .newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
             GenericCallbackFuture<LedgerMetadata> promise = new 
GenericCallbackFuture<>();
             lm.createLedgerMetadata(ledgerId, initMeta, promise);
             LedgerMetadata writtenMetadata = promise.get();
@@ -377,7 +377,7 @@ public class MetadataUpdateLoopTest {
                     (currentMetadata) -> {
                         List<BookieSocketAddress> ensemble = 
Lists.newArrayList(currentMetadata.getEnsemble(0L));
                         ensemble.set(0, b1);
-                        return 
LedgerMetadataBuilder.from(currentMetadata).withEnsembleEntry(0L, 
ensemble).build();
+                        return 
LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, 
ensemble).build();
                     },
                     reference::compareAndSet).run();
             lm.releaseWrites();

Reply via email to