This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b14458  Make each ensemble in ensemble list immutable
2b14458 is described below

commit 2b14458d966f80d991cec35ed7460639db52997b
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Aug 1 11:29:41 2018 +0200

    Make each ensemble in ensemble list immutable
    
    Previously, the ensemble list was a Map<Long, 
ArrayList<BookieSocketAddress>>.
    ArrayList is by definition mutable, so ensemble passed to metadata users are
    always mutable.
    
    This patch changes in ensembles in the list to be immutable. We were also 
leaking
    the implementation of ledger metadata to the placement policy, so this has 
been
    modified to use List<BookieSocketAddress> also.
    
    Master issue: #281
    
    Author: Ivan Kelly <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1575 from ivankelly/ledger-fragment-immutable-metadata and 
squashes the following commits:
    
    14977a242 [Ivan Kelly] fix dlog
    bdaef31e8 [Ivan Kelly] checkstyle
    6e3431118 [Ivan Kelly] Make each ensemble in ensemble list immutable
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  4 +-
 .../bookie/LocalBookieEnsemblePlacementPolicy.java |  8 ++--
 .../bookie/ScanAndCompareGarbageCollector.java     |  6 +--
 .../apache/bookkeeper/client/BookKeeperAdmin.java  | 25 +++++------
 .../apache/bookkeeper/client/BookieWatcher.java    |  5 +--
 .../client/DefaultEnsemblePlacementPolicy.java     |  4 +-
 .../bookkeeper/client/EnsemblePlacementPolicy.java | 14 +++---
 .../apache/bookkeeper/client/ForceLedgerOp.java    |  4 +-
 .../ITopologyAwareEnsemblePlacementPolicy.java     |  6 +--
 .../apache/bookkeeper/client/LedgerChecker.java    |  6 +--
 .../apache/bookkeeper/client/LedgerCreateOp.java   |  3 +-
 .../apache/bookkeeper/client/LedgerFragment.java   |  3 +-
 .../client/LedgerFragmentReplicator.java           |  9 ++--
 .../org/apache/bookkeeper/client/LedgerHandle.java |  4 +-
 .../apache/bookkeeper/client/LedgerMetadata.java   | 51 ++++++++++++----------
 .../apache/bookkeeper/client/PendingReadOp.java    | 10 ++---
 .../client/RackawareEnsemblePlacementPolicy.java   | 20 ++++-----
 .../RackawareEnsemblePlacementPolicyImpl.java      | 32 +++++++-------
 .../client/ReadLastConfirmedAndEntryOp.java        | 10 ++---
 .../client/RegionAwareEnsemblePlacementPolicy.java |  8 ++--
 .../TopologyAwareEnsemblePlacementPolicy.java      | 13 +++---
 .../apache/bookkeeper/client/UpdateLedgerOp.java   | 10 +++--
 .../replication/BookieLedgerIndexer.java           |  2 +-
 .../bookkeeper/replication/ReplicationWorker.java  |  6 +--
 .../bookie/TestGcOverreplicatedLedger.java         |  9 ++--
 .../bookkeeper/client/BookieRecoveryTest.java      | 14 +++---
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 18 ++++----
 .../client/GenericEnsemblePlacementPolicyTest.java |  2 +-
 .../bookkeeper/client/TestDelayEnsembleChange.java |  8 ++--
 .../bookkeeper/client/TestLedgerChecker.java       | 20 ++++-----
 .../client/TestLedgerFragmentReplication.java      | 12 +++--
 .../apache/bookkeeper/client/TestParallelRead.java | 11 ++---
 .../TestRackawareEnsemblePlacementPolicy.java      | 36 +++++++--------
 ...ackawareEnsemblePlacementPolicyUsingScript.java | 16 +++----
 .../TestRackawarePolicyNotificationUpdates.java    |  2 +-
 .../bookkeeper/client/TestReadEntryListener.java   |  6 +--
 .../TestRegionAwareEnsemblePlacementPolicy.java    | 46 +++++++++----------
 .../apache/bookkeeper/client/TestSequenceRead.java |  9 ++--
 .../bookkeeper/client/TestSpeculativeRead.java     |  4 +-
 .../bookkeeper/client/TestWatchEnsembleChange.java |  4 +-
 .../bookkeeper/client/UpdateLedgerCmdTest.java     |  2 +-
 .../bookkeeper/client/UpdateLedgerOpTest.java      | 10 ++---
 .../AuditorPeriodicBookieCheckTest.java            |  4 +-
 .../replication/AuditorPeriodicCheckTest.java      |  6 +--
 .../replication/BookieAutoRecoveryTest.java        | 10 ++---
 .../TestAutoRecoveryAlongWithBookieServers.java    | 10 ++---
 .../replication/TestReplicationWorker.java         | 10 ++---
 .../bookkeeper/server/http/TestHttpService.java    |  5 ++-
 .../java/org/apache/bookkeeper/tls/TestTLS.java    |  6 +--
 .../org/apache/bookkeeper/client/LedgerReader.java |  6 +--
 .../distributedlog/tools/DistributedLogTool.java   |  2 +-
 51 files changed, 278 insertions(+), 273 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index c4ead36..b119062 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -583,8 +583,8 @@ public class BookieShell implements Tool {
 
         private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, 
Set<BookieSocketAddress> bookiesToInspect) {
             Map<Long, Integer> numBookiesToReplacePerEnsemble = new 
TreeMap<Long, Integer>();
-            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : 
metadata.getEnsembles().entrySet()) {
-                ArrayList<BookieSocketAddress> bookieList = 
ensemble.getValue();
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble 
: metadata.getEnsembles().entrySet()) {
+                List<BookieSocketAddress> bookieList = ensemble.getValue();
                 System.out.print(ensemble.getKey() + ":\t");
                 int numBookiesToReplace = 0;
                 for (BookieSocketAddress bookie : bookieList) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 0414553..46978ea 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -20,8 +20,8 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.collect.Lists;
 import io.netty.util.HashedWheelTimer;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -94,7 +94,7 @@ public class LocalBookieEnsemblePlacementPolicy implements 
EnsemblePlacementPoli
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
@@ -102,14 +102,14 @@ public class LocalBookieEnsemblePlacementPolicy 
implements EnsemblePlacementPoli
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return null;
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
         java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index f5ea91c..137c07c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -26,7 +26,6 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
@@ -248,8 +247,9 @@ public class ScanAndCompareGarbageCollector implements 
GarbageCollector{
                                 release();
                                 return;
                             }
-                            SortedMap<Long, ArrayList<BookieSocketAddress>> 
ensembles = ledgerMetadata.getEnsembles();
-                            for (ArrayList<BookieSocketAddress> ensemble : 
ensembles.values()) {
+                            SortedMap<Long, ? extends 
List<BookieSocketAddress>> ensembles =
+                                ledgerMetadata.getEnsembles();
+                            for (List<BookieSocketAddress> ensemble : 
ensembles.values()) {
                                 // check if this bookie is supposed to have 
this ledger
                                 if (ensemble.contains(selfBookieAddress)) {
                                     release();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index a55ace8..cadbe49 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -817,7 +817,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                  */
                 Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, 
Long>();
                 Long curEntryId = null;
-                for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : 
lh.getLedgerMetadata().getEnsembles()
+                for (Map.Entry<Long, ? extends List<BookieSocketAddress>> 
entry : lh.getLedgerMetadata().getEnsembles()
                          .entrySet()) {
                     if (curEntryId != null) {
                         ledgerFragmentsRange.put(curEntryId, entry.getKey() - 
1);
@@ -863,7 +863,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                  */
                 for (final Long startEntryId : ledgerFragmentsToRecover) {
                     Long endEntryId = ledgerFragmentsRange.get(startEntryId);
-                    ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+                    List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().get(startEntryId);
                     // Get bookies to replace
                     Map<Integer, BookieSocketAddress> targetBookieAddresses;
                     try {
@@ -910,7 +910,7 @@ public class BookKeeperAdmin implements AutoCloseable {
             }, null);
     }
 
-    static String formatEnsemble(ArrayList<BookieSocketAddress> ensemble, 
Set<BookieSocketAddress> bookiesSrc,
+    static String formatEnsemble(List<BookieSocketAddress> ensemble, 
Set<BookieSocketAddress> bookiesSrc,
             char marker) {
         StringBuilder sb = new StringBuilder();
         sb.append("[");
@@ -1060,7 +1060,7 @@ public class BookKeeperAdmin implements AutoCloseable {
     }
 
     private static Map<BookieSocketAddress, BookieSocketAddress> 
getReplacementBookiesMap(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             Map<Integer, BookieSocketAddress> targetBookieAddresses) {
         Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
                 new HashMap<BookieSocketAddress, BookieSocketAddress>();
@@ -1091,11 +1091,11 @@ public class BookKeeperAdmin implements AutoCloseable {
             return false;
         }
         Long lastKey = lm.getEnsembles().lastKey();
-        ArrayList<BookieSocketAddress> lastEnsemble = 
lm.getEnsembles().get(lastKey);
+        List<BookieSocketAddress> lastEnsemble = 
lm.getEnsembles().get(lastKey);
         return containBookies(lastEnsemble, bookies);
     }
 
-    private static boolean containBookies(ArrayList<BookieSocketAddress> 
ensemble,
+    private static boolean containBookies(List<BookieSocketAddress> ensemble,
                                           Set<BookieSocketAddress> bookies) {
         for (BookieSocketAddress bookie : ensemble) {
             if (bookies.contains(bookie)) {
@@ -1544,9 +1544,9 @@ public class BookKeeperAdmin implements AutoCloseable {
 
     public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, 
BookieSocketAddress bookieAddress,
             LedgerMetadata ledgerMetadata) {
-        Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = 
ledgerMetadata.getEnsembles().values();
-        Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = 
ensemblesOfSegments.iterator();
-        ArrayList<BookieSocketAddress> ensemble;
+        Collection<? extends List<BookieSocketAddress>> ensemblesOfSegments = 
ledgerMetadata.getEnsembles().values();
+        Iterator<? extends List<BookieSocketAddress>> 
ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+        List<BookieSocketAddress> ensemble;
         int segmentNo = 0;
         while (ensemblesOfSegmentsIterator.hasNext()) {
             ensemble = ensemblesOfSegmentsIterator.next();
@@ -1565,9 +1565,8 @@ public class BookKeeperAdmin implements AutoCloseable {
         int ensembleSize = ledgerMetadata.getEnsembleSize();
         int writeQuorumSize = ledgerMetadata.getWriteQuorumSize();
 
-        List<Entry<Long, ArrayList<BookieSocketAddress>>> segments =
-            new LinkedList<Entry<Long, ArrayList<BookieSocketAddress>>>(
-                ledgerMetadata.getEnsembles().entrySet());
+        List<Entry<Long, ? extends List<BookieSocketAddress>>> segments =
+            new LinkedList<>(ledgerMetadata.getEnsembles().entrySet());
 
         boolean lastSegment = (segmentNo == (segments.size() - 1));
 
@@ -1617,7 +1616,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         DistributionSchedule distributionSchedule = new 
RoundRobinDistributionSchedule(
                 ledgerMetadata.getWriteQuorumSize(), 
ledgerMetadata.getAckQuorumSize(),
                 ledgerMetadata.getEnsembleSize());
-        ArrayList<BookieSocketAddress> currentSegmentEnsemble = 
segments.get(segmentNo).getValue();
+        List<BookieSocketAddress> currentSegmentEnsemble = 
segments.get(segmentNo).getValue();
         int thisBookieIndexInCurrentEnsemble = 
currentSegmentEnsemble.indexOf(bookieAddress);
         long firstEntryId = segments.get(segmentNo).getKey();
         long lastEntryId = lastSegment ? ledgerMetadata.getLastEntryId() : 
segments.get(segmentNo + 1).getKey() - 1;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 43d0c26..d97121d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -24,7 +24,6 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -200,11 +199,11 @@ class BookieWatcher {
      * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize,
         int ackQuorumSize, Map<String, byte[]> customMetadata)
             throws BKNotEnoughBookiesException {
         long startTime = MathUtils.nowInNano();
-        ArrayList<BookieSocketAddress> socketAddresses;
+        List<BookieSocketAddress> socketAddresses;
         try {
             socketAddresses = placementPolicy.newEnsemble(ensembleSize,
                     writeQuorumSize, ackQuorumSize, customMetadata, new 
HashSet<BookieSocketAddress>(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 7602499..28efe66 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -159,7 +159,7 @@ public class DefaultEnsemblePlacementPolicy implements 
EnsemblePlacementPolicy {
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
@@ -167,7 +167,7 @@ public class DefaultEnsemblePlacementPolicy implements 
EnsemblePlacementPolicy {
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         writeSet.addMissingIndices(ensemble.size());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 500ca16..e185964 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -152,8 +152,8 @@ import org.apache.bookkeeper.stats.StatsLogger;
  *
  * <h3>How to choose bookies to do speculative reads?</h3>
  *
- * <p>{@link #reorderReadSequence(ArrayList, BookiesHealthInfo, WriteSet)} and
- * {@link #reorderReadLACSequence(ArrayList, BookiesHealthInfo, WriteSet)} are
+ * <p>{@link #reorderReadSequence(List, BookiesHealthInfo, WriteSet)} and
+ * {@link #reorderReadLACSequence(List, BookiesHealthInfo, WriteSet)} are
  * two methods exposed by the placement policy, to help client determine a 
better read sequence according to the
  * network topology and the bookie failure history.
  *
@@ -258,9 +258,9 @@ public interface EnsemblePlacementPolicy {
      *                       provides in {@link BookKeeper#createLedger(int, 
int, int, BookKeeper.DigestType, byte[])}
      * @param excludeBookies Bookies that should not be considered as targets.
      * @throws BKNotEnoughBookiesException if not enough bookies available.
-     * @return the 
ArrayList&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
+     * @return the List&lt;org.apache.bookkeeper.net.BookieSocketAddress&gt;
      */
-    ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+    List<BookieSocketAddress> newEnsemble(int ensembleSize,
                                                int writeQuorumSize,
                                                int ackQuorumSize,
                                                Map<String, byte[]> 
customMetadata,
@@ -318,7 +318,7 @@ public interface EnsemblePlacementPolicy {
      * @since 4.5
      */
     DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
@@ -338,7 +338,7 @@ public interface EnsemblePlacementPolicy {
      * @since 4.5
      */
     DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet);
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
index ae0e051..a48a58b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkState;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -39,7 +39,7 @@ class ForceLedgerOp extends SafeRunnable implements 
ForceLedgerCallback {
     boolean completed = false;
     boolean errored = false;
     int lastSeenError = BKException.Code.WriteException;
-    ArrayList<BookieSocketAddress> currentEnsemble;
+    List<BookieSocketAddress> currentEnsemble;
 
     long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 87e6f18..7c9e07c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
@@ -66,7 +66,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T 
extends Node> extends E
         /**
          * @return list of addresses representing the ensemble
          */
-        ArrayList<BookieSocketAddress> toList();
+        List<BookieSocketAddress> toList();
 
         /**
          * Validates if an ensemble is valid.
@@ -93,7 +93,7 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T 
extends Node> extends E
      * @return list of bookies forming the ensemble
      * @throws BKException.BKNotEnoughBookiesException
      */
-    ArrayList<BookieSocketAddress> newEnsemble(
+    List<BookieSocketAddress> newEnsemble(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 6f232cf..cbd9976 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -20,9 +20,9 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -321,8 +321,8 @@ public class LedgerChecker {
         final Set<LedgerFragment> fragments = new HashSet<LedgerFragment>();
 
         Long curEntryId = null;
-        ArrayList<BookieSocketAddress> curEnsemble = null;
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
+        List<BookieSocketAddress> curEnsemble = null;
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : lh
                 .getLedgerMetadata().getEnsembles().entrySet()) {
             if (curEntryId != null) {
                 Set<Integer> bookieIndexes = new HashSet<Integer>();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 213f941..4f8beed 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.client;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.security.GeneralSecurityException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -131,7 +130,7 @@ class LedgerCreateOp implements 
GenericCallback<LedgerMetadata> {
          * Adding bookies to ledger handle
          */
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = bk.getBookieWatcher()
                     .newEnsemble(metadata.getEnsembleSize(),
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 1a97af8..49b8de2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -19,7 +19,6 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -51,7 +50,7 @@ public class LedgerFragment {
         this.bookieIndexes = bookieIndexes;
         this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
         this.schedule = lh.getDistributionSchedule();
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = lh
                 .getLedgerMetadata().getEnsembles();
         this.isLedgerClosed = lh.getLedgerMetadata().isClosed()
                 || !ensemble.equals(ensembles.get(ensembles.lastKey()));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 538cfb7..64394de 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -374,18 +374,19 @@ public class LedgerFragmentReplicator {
          * Update the ledger metadata's ensemble info to point to the new
          * bookie.
          */
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
-                .getEnsembles().get(fragmentStartId);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().get(fragmentStartId);
+        List<BookieSocketAddress> newEnsemble = new ArrayList<>(ensemble);
         for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry : 
oldBookie2NewBookie.entrySet()) {
-            int deadBookieIndex = ensemble.indexOf(entry.getKey());
+            int deadBookieIndex = newEnsemble.indexOf(entry.getKey());
             // update ensemble info might happen after re-read ledger 
metadata, so the ensemble might already
             // change. if ensemble is already changed, skip replacing the 
bookie doesn't exist.
             if (deadBookieIndex >= 0) {
-                ensemble.set(deadBookieIndex, entry.getValue());
+                newEnsemble.set(deadBookieIndex, entry.getValue());
             } else {
                 LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", 
entry.getKey(), ensemble);
             }
         }
+        lh.getLedgerMetadata().updateEnsemble(fragmentStartId, newEnsemble);
         lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
                 fragmentStartId, lh, oldBookie2NewBookie));
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 90b9884..611b182 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -372,9 +372,9 @@ public class LedgerHandle implements WriteHandle {
      * @return count of unique bookies
      */
     public synchronized long getNumBookies() {
-        Map<Long, ArrayList<BookieSocketAddress>> m = 
getLedgerMetadata().getEnsembles();
+        Map<Long, ? extends List<BookieSocketAddress>> m = 
getLedgerMetadata().getEnsembles();
         Set<BookieSocketAddress> s = Sets.newHashSet();
-        for (ArrayList<BookieSocketAddress> aList : m.values()) {
+        for (List<BookieSocketAddress> aList : m.values()) {
             s.addAll(aList);
         }
         return s.size();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index d95074d..b63c41d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -18,9 +18,11 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
@@ -81,9 +83,8 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
     private boolean storeSystemtimeAsLedgerCreationTime;
 
     private LedgerMetadataFormat.State state;
-    private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles =
-        new TreeMap<Long, ArrayList<BookieSocketAddress>>();
-    ArrayList<BookieSocketAddress> currentEnsemble;
+    private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles =  new 
TreeMap<>();
+    List<BookieSocketAddress> currentEnsemble;
     volatile Version version = Version.NEW;
 
     private boolean hasPassword = false;
@@ -157,10 +158,8 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         this.password = new byte[other.password.length];
         System.arraycopy(other.password, 0, this.password, 0, 
other.password.length);
         // copy the ensembles
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : 
other.ensembles.entrySet()) {
-            long startEntryId = entry.getKey();
-            ArrayList<BookieSocketAddress> newEnsemble = new 
ArrayList<BookieSocketAddress>(entry.getValue());
-            this.addEnsemble(startEntryId, newEnsemble);
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : 
other.ensembles.entrySet()) {
+            this.addEnsemble(entry.getKey(), entry.getValue());
         }
         this.customMetadata = other.customMetadata;
     }
@@ -177,7 +176,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
      * @return SortedMap of Ledger Fragments and the corresponding
      * bookie ensembles that store the entries.
      */
-    public TreeMap<Long, ArrayList<BookieSocketAddress>> getEnsembles() {
+    public TreeMap<Long, ? extends List<BookieSocketAddress>> getEnsembles() {
         return ensembles;
     }
 
@@ -186,8 +185,11 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         return ensembles;
     }
 
-    void setEnsembles(TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles) 
{
-        this.ensembles = ensembles;
+    void setEnsembles(TreeMap<Long, ? extends List<BookieSocketAddress>> 
newEnsembles) {
+        this.ensembles = newEnsembles.entrySet().stream()
+            .collect(TreeMap::new,
+                     (m, e) -> m.put(e.getKey(), 
ImmutableList.copyOf(e.getValue())),
+                     TreeMap::putAll);
     }
 
     @Override
@@ -286,14 +288,19 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         state = LedgerMetadataFormat.State.CLOSED;
     }
 
-    public void addEnsemble(long startEntryId, ArrayList<BookieSocketAddress> 
ensemble) {
-        assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
+    public void addEnsemble(long startEntryId, List<BookieSocketAddress> 
ensemble) {
+        checkArgument(ensembles.isEmpty() || startEntryId >= 
ensembles.lastKey());
 
-        ensembles.put(startEntryId, ensemble);
+        ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
         currentEnsemble = ensemble;
     }
 
-    ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
+    public void updateEnsemble(long startEntryId, List<BookieSocketAddress> 
ensemble) {
+        checkArgument(ensembles.containsKey(startEntryId));
+        ensembles.put(startEntryId, ImmutableList.copyOf(ensemble));
+    }
+
+    List<BookieSocketAddress> getEnsemble(long entryId) {
         // the head map cannot be empty, since we insert an ensemble for
         // entry-id 0, right when we start
         return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
@@ -312,7 +319,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
      * @return the entry id of the next ensemble change (-1 if no further 
ensemble changes)
      */
     long getNextEnsembleChange(long entryId) {
-        SortedMap<Long, ArrayList<BookieSocketAddress>> tailMap = 
ensembles.tailMap(entryId + 1);
+        SortedMap<Long, ? extends List<BookieSocketAddress>> tailMap = 
ensembles.tailMap(entryId + 1);
 
         if (tailMap.isEmpty()) {
             return -1;
@@ -360,7 +367,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
             }
         }
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
             LedgerMetadataFormat.Segment.Builder segmentBuilder = 
LedgerMetadataFormat.Segment.newBuilder();
             segmentBuilder.setFirstEntryId(entry.getKey());
             for (BookieSocketAddress addr : entry.getValue()) {
@@ -399,7 +406,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         
s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
         
s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
             s.append(lSplitter).append(entry.getKey());
             for (BookieSocketAddress addr : entry.getValue()) {
                 s.append(tSplitter);
@@ -707,7 +714,7 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
         return sb.toString();
     }
 
-    void mergeEnsembles(SortedMap<Long, ArrayList<BookieSocketAddress>> 
newEnsembles) {
+    void mergeEnsembles(SortedMap<Long, ? extends List<BookieSocketAddress>> 
newEnsembles) {
         // allow new metadata to be one ensemble less than current metadata
         // since ensemble change might kick in when recovery changed metadata
         int diff = ensembles.size() - newEnsembles.size();
@@ -715,21 +722,21 @@ public class LedgerMetadata implements 
org.apache.bookkeeper.client.api.LedgerMe
             return;
         }
         int i = 0;
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : 
newEnsembles.entrySet()) {
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : 
newEnsembles.entrySet()) {
             ++i;
             if (ensembles.size() != i) {
                 // we should use last ensemble from current metadata
                 // not the new metadata read from zookeeper
                 long key = entry.getKey();
-                ArrayList<BookieSocketAddress> ensemble = entry.getValue();
-                ensembles.put(key, ensemble);
+                List<BookieSocketAddress> ensemble = entry.getValue();
+                ensembles.put(key, ImmutableList.copyOf(ensemble));
             }
         }
     }
 
     Set<BookieSocketAddress> getBookiesInThisLedger() {
         Set<BookieSocketAddress> bookies = new HashSet<BookieSocketAddress>();
-        for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) {
+        for (List<BookieSocketAddress> ensemble : ensembles.values()) {
             bookies.addAll(ensemble);
         }
         return bookies;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 76332df..0aac38f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -91,12 +91,12 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         int firstError = BKException.Code.OK;
         int numBookiesMissingEntry = 0;
 
-        final ArrayList<BookieSocketAddress> ensemble;
+        final List<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final LedgerEntryImpl entryImpl;
         final long eId;
 
-        LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        LedgerEntryRequest(List<BookieSocketAddress> ensemble, long lId, long 
eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
             this.eId = eId;
@@ -278,7 +278,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
 
         int numPendings;
 
-        ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long 
eId) {
             super(ensemble, lId, eId);
             numPendings = writeSet.size();
         }
@@ -328,7 +328,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         final BitSet sentReplicas;
         final BitSet erroredReplicas;
 
-        SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long 
eId) {
             super(ensemble, lId, eId);
 
             this.sentReplicas = new 
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
@@ -519,7 +519,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
     void initiate() {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
-        ArrayList<BookieSocketAddress> ensemble = null;
+        List<BookieSocketAddress> ensemble = null;
         do {
             if (i == nextEnsembleChange) {
                 ensemble = getLedgerMetadata().getEnsemble(i);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index d41ca9f..1fd7580 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -19,7 +19,7 @@ package org.apache.bookkeeper.client;
 
 import io.netty.util.HashedWheelTimer;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -95,7 +95,7 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
         try {
@@ -129,7 +129,7 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return super.reorderReadSequence(ensemble, bookiesHealthInfo,
@@ -138,7 +138,7 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return super.reorderReadLACSequence(ensemble, bookiesHealthInfo,
@@ -146,12 +146,12 @@ public class RackawareEnsemblePlacementPolicy extends 
RackawareEnsemblePlacement
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                                    int writeQuorumSize,
-                                                    int ackQuorumSize,
-                                                    Set<BookieSocketAddress> 
excludeBookies,
-                                                    Ensemble<BookieNode> 
parentEnsemble,
-                                                    Predicate<BookieNode> 
parentPredicate)
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                 int writeQuorumSize,
+                                                 int ackQuorumSize,
+                                                 Set<BookieSocketAddress> 
excludeBookies,
+                                                 Ensemble<BookieNode> 
parentEnsemble,
+                                                 Predicate<BookieNode> 
parentPredicate)
             throws BKException.BKNotEnoughBookiesException {
         try {
             return super.newEnsemble(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index ae640bb..13efa5c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -479,17 +479,17 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(ensembleSize, writeQuorumSize, 
excludeBookies, null, null);
     }
 
-    protected ArrayList<BookieSocketAddress> newEnsembleInternal(int 
ensembleSize,
-                                                               int 
writeQuorumSize,
-                                                               
Set<BookieSocketAddress> excludeBookies,
-                                                               
Ensemble<BookieNode> parentEnsemble,
-                                                               
Predicate<BookieNode> parentPredicate)
+    protected List<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+                                                            int 
writeQuorumSize,
+                                                            
Set<BookieSocketAddress> excludeBookies,
+                                                            
Ensemble<BookieNode> parentEnsemble,
+                                                            
Predicate<BookieNode> parentPredicate)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(
                 ensembleSize,
@@ -501,12 +501,12 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
-                                                    int writeQuorumSize,
-                                                    int ackQuorumSize,
-                                                    Set<BookieSocketAddress> 
excludeBookies,
-                                                    Ensemble<BookieNode> 
parentEnsemble,
-                                                    Predicate<BookieNode> 
parentPredicate)
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                 int writeQuorumSize,
+                                                 int ackQuorumSize,
+                                                 Set<BookieSocketAddress> 
excludeBookies,
+                                                 Ensemble<BookieNode> 
parentEnsemble,
+                                                 Predicate<BookieNode> 
parentPredicate)
             throws BKNotEnoughBookiesException {
         return newEnsembleInternal(
                 ensembleSize,
@@ -517,7 +517,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                 parentPredicate);
     }
 
-    protected ArrayList<BookieSocketAddress> newEnsembleInternal(
+    protected List<BookieSocketAddress> newEnsembleInternal(
             int ensembleSize,
             int writeQuorumSize,
             int ackQuorumSize,
@@ -624,7 +624,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
                         !enforceMinNumRacksPerWriteQuorum || 
firstBookieInTheEnsemble);
                 racks[i] = prevNode.getNetworkLocation();
             }
-            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            List<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
                 LOG.error("Not enough {} bookies are available to form an 
ensemble : {}.",
                           ensembleSize, bookieList);
@@ -996,7 +996,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         Map<Integer, String> writeSetWithRegion = new HashMap<>();
@@ -1037,7 +1037,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends 
TopologyAwareEnsembleP
      * @return ordering of bookies to send read to
      */
     DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
-        ArrayList<BookieSocketAddress> ensemble,
+        List<BookieSocketAddress> ensemble,
         DistributionSchedule.WriteSet writeSet,
         Map<Integer, String> writeSetWithRegion,
         BookiesHealthInfo bookiesHealthInfo,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index 1436990..b342b6f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -22,8 +22,8 @@ package org.apache.bookkeeper.client;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
-import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -73,12 +73,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         int firstError = BKException.Code.OK;
         int numMissedEntryReads = 0;
 
-        final ArrayList<BookieSocketAddress> ensemble;
+        final List<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final DistributionSchedule.WriteSet orderedEnsemble;
         final LedgerEntryImpl entryImpl;
 
-        ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long 
lId, long eId) {
+        ReadLACAndEntryRequest(List<BookieSocketAddress> ensemble, long lId, 
long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
             this.writeSet = lh.getDistributionSchedule().getEnsembleSet(eId);
@@ -243,7 +243,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
 
         int numPendings;
 
-        ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        ParallelReadRequest(List<BookieSocketAddress> ensemble, long lId, long 
eId) {
             super(ensemble, lId, eId);
             numPendings = orderedEnsemble.size();
         }
@@ -293,7 +293,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         final BitSet erroredReplicas;
         final BitSet emptyResponseReplicas;
 
-        SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        SequenceReadRequest(List<BookieSocketAddress> ensemble, long lId, long 
eId) {
             super(ensemble, lId, eId);
 
             this.sentReplicas = new BitSet(orderedEnsemble.size());
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index dcd4b17..c52e0fe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -225,7 +225,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
+    public List<BookieSocketAddress> newEnsemble(int ensembleSize, int 
writeQuorumSize, int ackQuorumSize,
             Map<String, byte[]> customMetadata, Set<BookieSocketAddress> 
excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
 
@@ -394,7 +394,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
                 }
             } while ((remainingEnsemble > 0) && (remainingEnsemble < 
remainingEnsembleBeforeIteration));
 
-            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            List<BookieSocketAddress> bookieList = ensemble.toList();
             if (ensembleSize != bookieList.size()) {
                 LOG.error("Not enough {} bookies are available to form an 
ensemble : {}.",
                           ensembleSize, bookieList);
@@ -522,7 +522,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
@@ -540,7 +540,7 @@ public class RegionAwareEnsemblePlacementPolicy extends 
RackawareEnsemblePlaceme
 
     @Override
     public final DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         if (UNKNOWN_REGION.equals(myRegion)) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
index 85917b1..19cb505 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -51,7 +52,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
 
         public static final EnsembleForReplacementWithNoConstraints INSTANCE =
             new EnsembleForReplacementWithNoConstraints();
-        static final ArrayList<BookieSocketAddress> EMPTY_LIST = new 
ArrayList<BookieSocketAddress>(0);
+        static final List<BookieSocketAddress> EMPTY_LIST = new 
ArrayList<BookieSocketAddress>(0);
 
         @Override
         public boolean addNode(BookieNode node) {
@@ -60,7 +61,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy implements
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> toList() {
+        public List<BookieSocketAddress> toList() {
             return EMPTY_LIST;
         }
 
@@ -301,7 +302,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         final int ackQuorumSize;
         final int minRacksOrRegionsForDurability;
         final int minNumRacksPerWriteQuorum;
-        final ArrayList<BookieNode> chosenNodes;
+        final List<BookieNode> chosenNodes;
         final Set<String> racksOrRegions;
         private final CoverageSet[] quorums;
         final Predicate<BookieNode> parentPredicate;
@@ -453,7 +454,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
         }
 
         @Override
-        public ArrayList<BookieSocketAddress> toList() {
+        public List<BookieSocketAddress> toList() {
             ArrayList<BookieSocketAddress> addresses = new 
ArrayList<BookieSocketAddress>(ensembleSize);
             for (BookieNode bn : chosenNodes) {
                 addresses.add(bn.getAddr());
@@ -490,7 +491,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
 
     @Override
     public DistributionSchedule.WriteSet reorderReadSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         return writeSet;
@@ -498,7 +499,7 @@ abstract class TopologyAwareEnsemblePlacementPolicy 
implements
 
     @Override
     public DistributionSchedule.WriteSet reorderReadLACSequence(
-            ArrayList<BookieSocketAddress> ensemble,
+            List<BookieSocketAddress> ensemble,
             BookiesHealthInfo bookiesHealthInfo,
             DistributionSchedule.WriteSet writeSet) {
         DistributionSchedule.WriteSet retList = reorderReadSequence(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
index d0281c7..b55e2d0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/UpdateLedgerOp.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -221,10 +223,12 @@ public class UpdateLedgerOp {
                 return;
             }
             boolean updateEnsemble = false;
-            for (ArrayList<BookieSocketAddress> ensembles : 
metadata.getEnsembles().values()) {
-                int index = ensembles.indexOf(curBookieAddr);
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
metadata.getEnsembles().entrySet()) {
+                List<BookieSocketAddress> newEnsemble = new 
ArrayList<>(e.getValue());
+                int index = newEnsemble.indexOf(curBookieAddr);
                 if (-1 != index) {
-                    ensembles.set(index, toBookieAddr);
+                    newEnsemble.set(index, toBookieAddr);
+                    metadata.updateEnsemble(e.getKey(), newEnsemble);
                     updateEnsemble = true;
                 }
             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
index b7b4285..027081a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/BookieLedgerIndexer.java
@@ -73,7 +73,7 @@ public class BookieLedgerIndexer {
                     public void operationComplete(int rc,
                             LedgerMetadata ledgerMetadata) {
                         if (rc == BKException.Code.OK) {
-                            for (Map.Entry<Long, 
ArrayList<BookieSocketAddress>> ensemble : ledgerMetadata
+                            for (Map.Entry<Long, ? extends 
List<BookieSocketAddress>> ensemble : ledgerMetadata
                                     .getEnsembles().entrySet()) {
                                 for (BookieSocketAddress bookie : ensemble
                                         .getValue()) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 7d51a7a..d5c85f1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -31,9 +31,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -369,8 +369,8 @@ public class ReplicationWorker implements Runnable {
             return false;
         }
 
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
admin.getLedgerMetadata(lh).getEnsembles();
-        ArrayList<BookieSocketAddress> finalEnsemble = 
ensembles.get(ensembles.lastKey());
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
admin.getLedgerMetadata(lh).getEnsembles();
+        List<BookieSocketAddress> finalEnsemble = 
ensembles.get(ensembles.lastKey());
         Collection<BookieSocketAddress> available = 
admin.getAvailableBookies();
         for (BookieSocketAddress b : finalEnsemble) {
             if (!available.contains(b)) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
index 07cdd6b..9feb9df 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -149,8 +148,8 @@ public class TestGcOverreplicatedLedger extends 
LedgerManagerTestCase {
             Assert.fail("No ledger metadata found");
         }
         BookieSocketAddress address = null;
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembleMap = 
newLedgerMetadata.get().getEnsembles();
-        for (ArrayList<BookieSocketAddress> ensemble : ensembleMap.values()) {
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembleMap = 
newLedgerMetadata.get().getEnsembles();
+        for (List<BookieSocketAddress> ensemble : ensembleMap.values()) {
             address = ensemble.get(0);
         }
         ServerConfiguration bkConf = getBkConf(address);
@@ -236,8 +235,8 @@ public class TestGcOverreplicatedLedger extends 
LedgerManagerTestCase {
         for (BookieServer bk : bs) {
             allAddresses.add(bk.getLocalAddress());
         }
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
ledgerMetadata.getEnsembles();
-        for (ArrayList<BookieSocketAddress> fragmentEnsembles : 
ensembles.values()) {
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
ledgerMetadata.getEnsembles();
+        for (List<BookieSocketAddress> fragmentEnsembles : ensembles.values()) 
{
             allAddresses.removeAll(fragmentEnsembles);
         }
         Assert.assertEquals(allAddresses.size(), 1);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 42ff0f9..b1ad88a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -506,7 +506,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
     private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) 
throws Exception {
         LedgerMetadata md = getLedgerMetadata(lh);
 
-        Map<Long, ArrayList<BookieSocketAddress>> ensembles = 
md.getEnsembles();
+        Map<Long, ? extends List<BookieSocketAddress>> ensembles = 
md.getEnsembles();
 
         HashMap<Long, Long> ranges = new HashMap<Long, Long>();
         ArrayList<Long> keyList = Collections.list(
@@ -517,7 +517,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         }
         ranges.put(keyList.get(keyList.size() - 1), untilEntry);
 
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : 
ensembles.entrySet()) {
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
ensembles.entrySet()) {
             int quorum = md.getAckQuorumSize();
             long startEntryId = e.getKey();
             long endEntryId = ranges.get(startEntryId);
@@ -586,7 +586,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         long numDupes = 0;
         for (LedgerHandle lh : lhs) {
             LedgerMetadata md = getLedgerMetadata(lh);
-            for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : 
md.getEnsembles().entrySet()) {
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
md.getEnsembles().entrySet()) {
                 HashSet<BookieSocketAddress> set = new 
HashSet<BookieSocketAddress>();
                 long fragment = e.getKey();
 
@@ -619,7 +619,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         closeLedgers(lhs);
 
         // Shutdown last bookie server in last ensemble
-        ArrayList<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = 
lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
@@ -648,7 +648,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         BookieSocketAddress bookieToKill = 
lastEnsemble.get(lastEnsemble.size() - 1);
         killBookie(bookieToKill);
@@ -684,7 +684,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         writeEntriestoLedgers(numMsgs, 0, lhs);
 
         // Shutdown the first bookie server
-        ArrayList<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
+        List<BookieSocketAddress> lastEnsemble = 
lhs.get(0).getLedgerMetadata().getEnsembles()
           .entrySet().iterator().next().getValue();
         // removed bookie
         BookieSocketAddress bookieToKill = lastEnsemble.get(0);
@@ -727,7 +727,7 @@ public class BookieRecoveryTest extends 
BookKeeperClusterTestCase {
         List<LedgerHandle> newLhs = openLedgers(lhs);
         for (LedgerHandle newLh : newLhs) {
             // first ensemble should contains bookieToKill2 and not contain 
bookieToKill
-            Map.Entry<Long, ArrayList<BookieSocketAddress>> entry =
+            Map.Entry<Long, ? extends List<BookieSocketAddress>> entry =
               
newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
             assertFalse(entry.getValue().contains(bookieToKill));
             assertTrue(entry.getValue().contains(bookieToKill2));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 75bb4a4..93683c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -144,7 +144,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -189,7 +189,7 @@ public class BookieWriteLedgerTest extends
 
         CountDownLatch sleepLatch1 = new CountDownLatch(1);
         CountDownLatch sleepLatch2 = new CountDownLatch(1);
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
 
         sleepBookie(ensemble.get(0), sleepLatch1);
@@ -383,7 +383,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown one bookie in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -522,7 +522,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown one bookie in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -840,7 +840,7 @@ public class BookieWriteLedgerTest extends
         startNewBookie();
 
         // Shutdown three bookies in the last ensemble and continue writing
-        ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().entrySet().iterator().next().getValue();
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -913,7 +913,7 @@ public class BookieWriteLedgerTest extends
         }
         // Start One more bookie and shutdown one from last ensemble before 
reading
         startNewBookie();
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
@@ -981,7 +981,7 @@ public class BookieWriteLedgerTest extends
         }
 
         CountDownLatch sleepLatch1 = new CountDownLatch(1);
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
 
         ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next().getValue();
 
@@ -1067,7 +1067,7 @@ public class BookieWriteLedgerTest extends
                 if (j == numEntriesToWrite / 2) {
                     // Start One more bookie and shutdown one from last 
ensemble at half-way
                     startNewBookie();
-                    ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet()
+                    List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet()
                             .iterator().next().getValue();
                     killBookie(ensemble.get(0));
                 }
@@ -1136,7 +1136,7 @@ public class BookieWriteLedgerTest extends
         }
         // Start One more bookie and shutdown one from last ensemble before 
reading
         startNewBookie();
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
                 .getValue();
         killBookie(ensemble.get(0));
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
index 36beaa2..bf369cf 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -132,7 +132,7 @@ public class GenericEnsemblePlacementPolicyTest extends 
BookKeeperClusterTestCas
                 try (LedgerHandle lh = bk.createLedger(2, 2, 2, digestType, 
PASSWORD.getBytes(), customMetadata)) {
                     lh.addEntry(value);
                     long lId = lh.getId();
-                    ArrayList<BookieSocketAddress> ensembleAtFirstEntry = 
lh.getLedgerMetadata().getEnsemble(lId);
+                    List<BookieSocketAddress> ensembleAtFirstEntry = 
lh.getLedgerMetadata().getEnsemble(lId);
                     assertEquals(2, ensembleAtFirstEntry.size());
                     killBookie(ensembleAtFirstEntry.get(0));
                     lh.addEntry(value);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index cf693ed..62be7cb 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -103,7 +103,7 @@ public class TestDelayEnsembleChange extends 
BookKeeperClusterTestCase {
         LedgerMetadata md = lh.getLedgerMetadata();
 
         for (long eid = startEntry; eid < untilEntry; eid++) {
-            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            List<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new 
VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
                 bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
@@ -121,7 +121,7 @@ public class TestDelayEnsembleChange extends 
BookKeeperClusterTestCase {
         LedgerMetadata md = lh.getLedgerMetadata();
 
         for (long eid = startEntry; eid < untilEntry; eid++) {
-            ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
+            List<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new 
VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
                 bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
@@ -257,8 +257,8 @@ public class TestDelayEnsembleChange extends 
BookKeeperClusterTestCase {
                         CLIENT_SCOPE + "." + WATCHER_SCOPE + "." + 
REPLACE_BOOKIE_TIME)
                         .getSuccessCount() > 0);
 
-        ArrayList<BookieSocketAddress> firstFragment = 
lh.getLedgerMetadata().getEnsemble(0);
-        ArrayList<BookieSocketAddress> secondFragment = 
lh.getLedgerMetadata().getEnsemble(3 * numEntries);
+        List<BookieSocketAddress> firstFragment = 
lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> secondFragment = 
lh.getLedgerMetadata().getEnsemble(3 * numEntries);
         assertFalse(firstFragment.get(0).equals(secondFragment.get(0)));
         assertFalse(firstFragment.get(1).equals(secondFragment.get(1)));
         assertFalse(firstFragment.get(2).equals(secondFragment.get(2)));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 20308e6..16a0958 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -137,7 +137,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         // Entry should have added in first 2 Bookies.
 
         // Kill the 3rd BK from ensemble.
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(2);
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
@@ -180,7 +180,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         startNewBookie();
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
 
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
 
         BookieSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
@@ -213,7 +213,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
 
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress firstBookieFromEnsemble = firstEnsemble.get(0);
         killBookie(firstBookieFromEnsemble);
@@ -258,7 +258,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         }
 
         // Kill all three bookies
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         for (BookieSocketAddress bkAddr : firstEnsemble) {
             killBookie(firstEnsemble, bkAddr);
@@ -321,7 +321,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         BookieSocketAddress lastBookieFromEnsemble = firstEnsemble.get(0);
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
@@ -355,7 +355,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         for (int i = 0; i < 10; i++) {
             lh.addEntry(TEST_LEDGER_ENTRY_DATA);
         }
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         DistributionSchedule.WriteSet writeSet = 
lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed());
         BookieSocketAddress lastBookieFromEnsemble = 
firstEnsemble.get(writeSet.get(0));
@@ -400,7 +400,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
     public void testClosedEmptyLedger() throws Exception {
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         lh.close();
 
@@ -427,7 +427,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
     public void testClosedSingleEntryLedger() throws Exception {
         LedgerHandle lh = bkc.createLedger(3, 2, BookKeeper.DigestType.CRC32,
                 TEST_LEDGER_PASSWORD);
-        ArrayList<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
+        List<BookieSocketAddress> firstEnsemble = lh.getLedgerMetadata()
             .getEnsembles().get(0L);
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
         lh.close();
@@ -496,7 +496,7 @@ public class TestLedgerChecker extends 
BookKeeperClusterTestCase {
         return result;
     }
 
-    private void killBookie(ArrayList<BookieSocketAddress> firstEnsemble, 
BookieSocketAddress ensemble)
+    private void killBookie(List<BookieSocketAddress> firstEnsemble, 
BookieSocketAddress ensemble)
             throws Exception {
         LOG.info("Killing " + ensemble + " from ensemble=" + firstEnsemble);
         killBookie(ensemble);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 883dc9c..b26798b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -25,8 +25,8 @@ import static org.junit.Assert.fail;
 
 import com.google.common.collect.Sets;
 
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
@@ -111,12 +111,10 @@ public class TestLedgerFragmentReplication extends 
BookKeeperClusterTestCase {
         }
 
         // Killing all bookies except newly replicated bookie
-        SortedMap<Long, ArrayList<BookieSocketAddress>> 
allBookiesBeforeReplication = lh
+        SortedMap<Long, ? extends List<BookieSocketAddress>> 
allBookiesBeforeReplication = lh
                 .getLedgerMetadata().getEnsembles();
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = 
allBookiesBeforeReplication
-                .entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry : 
allBookiesBeforeReplication.entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (newBkAddr.equals(bookie)) {
                     continue;
@@ -237,7 +235,7 @@ public class TestLedgerFragmentReplication extends 
BookKeeperClusterTestCase {
         LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
                 TEST_PSSWD) {
             @Override
-            ArrayList<BookieSocketAddress> getEnsemble(long entryId) {
+            List<BookieSocketAddress> getEnsemble(long entryId) {
                 return null;
             }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index ce851a5..ba500fc 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -27,8 +27,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BKException.Code;
@@ -151,8 +151,7 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(10);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(10);
         CountDownLatch latch1 = new CountDownLatch(1);
         CountDownLatch latch2 = new CountDownLatch(1);
         // sleep two bookie
@@ -184,8 +183,7 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -222,8 +220,7 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index e085006..dd95b7a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -63,7 +63,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
     static final Logger LOG = 
LoggerFactory.getLogger(TestRackawareEnsemblePlacementPolicy.class);
 
     RackawareEnsemblePlacementPolicy repp;
-    final ArrayList<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
+    final List<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
     DistributionSchedule.WriteSet writeSet = 
DistributionSchedule.NULL_WRITE_SET;
     ClientConfiguration conf = new ClientConfiguration();
     BookieSocketAddress addr1, addr2, addr3, addr4;
@@ -453,7 +453,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         bookiePendingMap.put(addr5, 9L); // not in write set
         bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
         bookiePendingMap.put(addr7, 10L);
-        ArrayList<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
+        List<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
         ensemble.add(addr1);
         ensemble.add(addr2);
         ensemble.add(addr3);
@@ -669,9 +669,9 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum()));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -703,7 +703,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
             fail("Should get not enough bookies exception since there is only 
one rack.");
@@ -765,7 +765,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble calls 
should
          * succeed.
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
@@ -821,7 +821,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          * ensembleSizes (as long as there are enough number of bookies in each
          * rack).
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; 
ensembleSize < 40; ensembleSize++) {
             ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, 
ackQuorumSize, null, new HashSet<>());
             assertEquals("Number of writeQuorum sets covered", ensembleSize,
@@ -872,7 +872,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
          * and there are enough bookies in 3 racks, this newEnsemble call 
should
          * succeed.
          */
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = numOfRacks * numOfBookiesPerRack;
         int writeQuorumSize = numOfRacks;
         int ackQuorumSize = numOfRacks;
@@ -1198,12 +1198,12 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 2;
             int acqQuorumSize = 2;
-            ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+            List<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
                     null, new HashSet<>());
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
             ensembleSize = 4;
-            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+            List<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
                     null, new HashSet<>());
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum());
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -1276,7 +1276,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
-        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, null,
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, 
writeQuorumSize, writeQuorumSize, null,
                 new HashSet<>());
         int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, 
minNumRacksPerWriteQuorum);
         assertEquals("minimum number of racks covered for writequorum 
ensemble: " + ensemble, ensembleSize, numCovered);
@@ -1317,12 +1317,12 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
             int ensembleSize = 3;
             int writeQuorumSize = 3;
             int ackQuorumSize = 2;
-            ArrayList<BookieSocketAddress> ensemble1 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+            List<BookieSocketAddress> ensemble1 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
                     null, new HashSet<>());
             assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble1, 2, 
conf.getMinNumRacksPerWriteQuorum()));
             ensembleSize = 4;
             writeQuorumSize = 4;
-            ArrayList<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
+            List<BookieSocketAddress> ensemble2 = 
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null,
                     new HashSet<>());
             assertEquals(ensembleSize, getNumCoveredWriteQuorums(ensemble2, 2, 
conf.getMinNumRacksPerWriteQuorum()));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1554,7 +1554,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         int numTries = 10000;
 
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
@@ -1625,7 +1625,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
 
         repp.updateBookieInfo(bookieInfoMap);
 
-        ArrayList<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
+        List<BookieSocketAddress> ensemble = new 
ArrayList<BookieSocketAddress>();
         Set<BookieSocketAddress> excludeList = new 
HashSet<BookieSocketAddress>();
         try {
             excludeList.add(addr1);
@@ -1644,7 +1644,7 @@ public class TestRackawareEnsemblePlacementPolicy extends 
TestCase {
         }
     }
 
-    static int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize,
+    static int getNumCoveredWriteQuorums(List<BookieSocketAddress> ensemble, 
int writeQuorumSize,
             int minNumRacksPerWriteQuorumConfValue) throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
@@ -1722,13 +1722,13 @@ public class TestRackawareEnsemblePlacementPolicy 
extends TestCase {
 
         // we will never use addr4 even it is in the stabilized network 
topology
         for (int i = 0; i < 5; i++) {
-            ArrayList<BookieSocketAddress> ensemble =
+            List<BookieSocketAddress> ensemble =
                     repp.newEnsemble(3, 3, 3, null, new 
HashSet<BookieSocketAddress>());
             assertFalse(ensemble.contains(addr4));
         }
 
         // we could still use addr4 for urgent allocation if it is just bookie 
flapping
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, 
null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, 
new HashSet<BookieSocketAddress>());
         assertTrue(ensemble.contains(addr4));
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
index 524d1d5..d9f2535 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicyUsingScript.java
@@ -30,8 +30,8 @@ import static org.junit.Assert.fail;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -257,9 +257,9 @@ public class 
TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
             assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
@@ -281,10 +281,10 @@ public class 
TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
             int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
             assertTrue(numCovered == 2);
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
             numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
             assertTrue(numCovered == 2);
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -315,9 +315,9 @@ public class 
TestRackawareEnsemblePlacementPolicyUsingScript {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, 
null, new HashSet<>());
             assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null, new HashSet<>());
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null, new HashSet<>());
             assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
             fail("Should not get not enough bookies exception.");
@@ -346,7 +346,7 @@ public class 
TestRackawareEnsemblePlacementPolicyUsingScript {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
     }
 
-    private int getNumCoveredWriteQuorums(ArrayList<BookieSocketAddress> 
ensemble, int writeQuorumSize)
+    private int getNumCoveredWriteQuorums(List<BookieSocketAddress> ensemble, 
int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
index b1ecdba..7dc1d39 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawarePolicyNotificationUpdates.java
@@ -104,7 +104,7 @@ public class TestRackawarePolicyNotificationUpdates extends 
TestCase {
         int ensembleSize = 3;
         int writeQuorumSize = 2;
         int acqQuorumSize = 2;
-        ArrayList<BookieSocketAddress> ensemble = 
repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize,
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(ensembleSize, 
writeQuorumSize, acqQuorumSize,
                 Collections.emptyMap(), Collections.emptySet());
         int numCovered = 
TestRackawareEnsemblePlacementPolicy.getNumCoveredWriteQuorums(ensemble, 
writeQuorumSize,
                 conf.getMinNumRacksPerWriteQuorum());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 39942b3..66956b2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -229,7 +229,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
                 lh.getLedgerMetadata().getEnsemble(5);
         // kill two bookies
         killBookie(ensemble.get(0));
@@ -269,7 +269,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
             lh.getLedgerMetadata().getEnsemble(5);
         // kill bookies
         killBookie(ensemble.get(0));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index ed8cf29..e541230 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -475,7 +475,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, 
null, new HashSet<BookieSocketAddress>());
+            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, 
new HashSet<BookieSocketAddress>());
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -536,10 +536,10 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -569,7 +569,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr4);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 
2, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
             assertTrue(numCovered >= 1);
@@ -578,7 +578,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
             fail("Should not get not enough bookies exception even there is 
only one rack.");
         }
         try {
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
             assertTrue(numCovered >= 1 && numCovered < 3);
@@ -618,10 +618,10 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         addrs.add(addr8);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 
2, null,
+            List<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
-            ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 
2, null,
+            List<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, 
null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -669,7 +669,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         addrs.add(addr10);
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 
4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr4));
             assert(ensemble.contains(addr8));
@@ -738,7 +738,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
         try {
             ((SettableFeature) 
featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 
4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
                     new HashSet<BookieSocketAddress>());
             assertEquals(2, getNumRegionsInEnsemble(ensemble));
             assert(ensemble.contains(addr1));
@@ -753,7 +753,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         }
         try {
             ((SettableFeature) 
featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 
4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
                     new HashSet<BookieSocketAddress>());
             fail("Should get not enough bookies exception even there is only 
one region with insufficient bookies.");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -761,7 +761,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         }
         try {
             ((SettableFeature) 
featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 
4, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, 
null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.contains(addr1));
             assert(ensemble.contains(addr3));
@@ -835,7 +835,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
 
         try {
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 
10, null,
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, 
null,
                     new HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -847,7 +847,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
         try {
             Set<BookieSocketAddress> excludedAddrs = new 
HashSet<BookieSocketAddress>();
             excludedAddrs.add(addr10);
-            ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 
10, null, excludedAddrs);
+            List<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, 
null, excludedAddrs);
             assert(ensemble.contains(addr11) && ensemble.contains(addr12));
             assert(ensemble.size() == 10);
             assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -937,7 +937,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends 
TestCase {
             ackQuorum = 5;
         }
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new 
HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
@@ -1073,7 +1073,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
                     .set(true);
         }
 
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         try {
             ensemble = repp.newEnsemble(6, 6, 4, null, new 
HashSet<BookieSocketAddress>());
             assert(ensemble.size() == 6);
@@ -1141,7 +1141,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         excludedAddrs.add(addr10);
         excludedAddrs.add(addr9);
         try {
-            ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, 
null, excludedAddrs);
+            List<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, 
excludedAddrs);
             LOG.info("Ensemble : {}", list);
             fail("Should throw BKNotEnoughBookiesException when there is not 
enough bookies");
         } catch (BKNotEnoughBookiesException bnebe) {
@@ -1202,7 +1202,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
     private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, 
boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, 
null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1258,7 +1258,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
     private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, 
boolean isReadLAC) throws Exception {
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, 
null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1296,7 +1296,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         reorderReadSequenceWithUnavailableOrReadOnlyBookiesTest(true);
     }
 
-    static Set<BookieSocketAddress> 
getBookiesForRegion(ArrayList<BookieSocketAddress> ensemble, String region) {
+    static Set<BookieSocketAddress> 
getBookiesForRegion(List<BookieSocketAddress> ensemble, String region) {
         Set<BookieSocketAddress> regionBookies = new 
HashSet<BookieSocketAddress>();
         for (BookieSocketAddress address : ensemble) {
             String r = StaticDNSResolver.getRegion(address.getHostName());
@@ -1307,7 +1307,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         return regionBookies;
     }
 
-    static void appendBookieIndexByRegion(ArrayList<BookieSocketAddress> 
ensemble,
+    static void appendBookieIndexByRegion(List<BookieSocketAddress> ensemble,
                                           DistributionSchedule.WriteSet 
writeSet,
                                           String region,
                                           List<Integer> finalSet) {
@@ -1328,7 +1328,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
 
         prepareNetworkTopologyForReorderTests(myRegion);
 
-        ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, 
null, new HashSet<BookieSocketAddress>());
+        List<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, 
new HashSet<BookieSocketAddress>());
         assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
 
         DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1372,7 +1372,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         }
     }
 
-    private int getNumRegionsInEnsemble(ArrayList<BookieSocketAddress> 
ensemble) {
+    private int getNumRegionsInEnsemble(List<BookieSocketAddress> ensemble) {
         Set<String> regions = new HashSet<String>();
         for (BookieSocketAddress addr: ensemble) {
             regions.add(StaticDNSResolver.getRegion(addr.getHostName()));
@@ -1380,7 +1380,7 @@ public class TestRegionAwareEnsemblePlacementPolicy 
extends TestCase {
         return regions.size();
     }
 
-    private int 
getNumCoveredRegionsInWriteQuorum(ArrayList<BookieSocketAddress> ensemble, int 
writeQuorumSize)
+    private int getNumCoveredRegionsInWriteQuorum(List<BookieSocketAddress> 
ensemble, int writeQuorumSize)
             throws Exception {
         int ensembleSize = ensemble.size();
         int numCoveredWriteQuorums = 0;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
index 50c24bf..283cf75 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -54,10 +55,10 @@ public class TestSequenceRead extends 
BookKeeperClusterTestCase {
     private LedgerHandle createLedgerWithDuplicatedBookies() throws Exception {
         final LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, passwd);
         // introduce duplicated bookies in an ensemble.
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
lh.getLedgerMetadata().getEnsembles();
-        TreeMap<Long, ArrayList<BookieSocketAddress>> newEnsembles = new 
TreeMap<>();
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
-            ArrayList<BookieSocketAddress> newList = new 
ArrayList<BookieSocketAddress>(entry.getValue().size());
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
lh.getLedgerMetadata().getEnsembles();
+        TreeMap<Long, List<BookieSocketAddress>> newEnsembles = new 
TreeMap<>();
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : 
ensembles.entrySet()) {
+            List<BookieSocketAddress> newList = new 
ArrayList<BookieSocketAddress>(entry.getValue().size());
             BookieSocketAddress firstBookie = entry.getValue().get(0);
             for (BookieSocketAddress ignored : entry.getValue()) {
                 newList.add(firstBookie);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 4a49451..aa21059 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -27,10 +27,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -308,7 +308,7 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
 
         LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
 
-        ArrayList<BookieSocketAddress> ensemble = 
l.getLedgerMetadata().getEnsembles().get(0L);
+        List<BookieSocketAddress> ensemble = 
l.getLedgerMetadata().getEnsembles().get(0L);
         BitSet allHosts = new BitSet(ensemble.size());
         for (int i = 0; i < ensemble.size(); i++) {
             allHosts.set(i, true);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index a729139..e6daa66 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -26,9 +26,9 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -92,7 +92,7 @@ public class TestWatchEnsembleChange extends 
BookKeeperClusterTestCase {
         LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, 
"".getBytes());
         long lastLAC = readLh.getLastAddConfirmed();
         assertEquals(numEntries - 2, lastLAC);
-        ArrayList<BookieSocketAddress> ensemble =
+        List<BookieSocketAddress> ensemble =
                 lh.getLedgerMetadata().currentEnsemble;
         for (BookieSocketAddress addr : ensemble) {
             killBookie(addr);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
index d773bda..6eca0de 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
@@ -94,7 +94,7 @@ public class UpdateLedgerCmdTest extends 
BookKeeperClusterTestCase {
 
     private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> 
ledgers, BookieSocketAddress toBookieAddr)
             throws InterruptedException, BKException {
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int updatedLedgersCount = 0;
         for (LedgerHandle lh : ledgers) {
             // ledger#close() would hit BadVersion exception as rename
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
index aa01e65..7f67242 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
@@ -98,7 +98,7 @@ public class UpdateLedgerOpTest extends 
BookKeeperClusterTestCase {
             ledgers.add(createLedgerWithEntries(bk, 0));
         }
 
-        ArrayList<BookieSocketAddress> ensemble = 
lh1.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = 
lh1.getLedgerMetadata().getEnsemble(0);
 
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         baseConf.setUseHostNameAsBookieID(true);
@@ -139,7 +139,7 @@ public class UpdateLedgerOpTest extends 
BookKeeperClusterTestCase {
             ledgers.add(createLedgerWithEntries(bk, 0));
         }
 
-        ArrayList<BookieSocketAddress> ensemble = 
lh1.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = 
lh1.getLedgerMetadata().getEnsemble(0);
 
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         baseConf.setUseHostNameAsBookieID(true);
@@ -194,7 +194,7 @@ public class UpdateLedgerOpTest extends 
BookKeeperClusterTestCase {
         LedgerHandle lh = createLedgerWithEntries(bk, 100);
 
         BookieServer bookieServer = bs.get(0);
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(0);
         BookieSocketAddress curBookieAddr = null;
         for (BookieSocketAddress bookieSocketAddress : ensemble) {
             if (bookieServer.getLocalAddress().equals(bookieSocketAddress)) {
@@ -276,7 +276,7 @@ public class UpdateLedgerOpTest extends 
BookKeeperClusterTestCase {
             }
         };
         th.start();
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(0);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(0);
         BookieSocketAddress curBookieAddr = ensemble.get(0);
         BookieSocketAddress toBookieAddr = new 
BookieSocketAddress("localhost:" + curBookieAddr.getPort());
         UpdateLedgerOp updateLedgerOp = new UpdateLedgerOp(bk, bkadmin);
@@ -297,7 +297,7 @@ public class UpdateLedgerOpTest extends 
BookKeeperClusterTestCase {
 
     private int getUpdatedLedgersCount(BookKeeper bk, List<LedgerHandle> 
ledgers, BookieSocketAddress toBookieAddr)
             throws InterruptedException, BKException {
-        ArrayList<BookieSocketAddress> ensemble;
+        List<BookieSocketAddress> ensemble;
         int updatedLedgersCount = 0;
         for (LedgerHandle lh : ledgers) {
             // ledger#close() would hit BadVersion exception as rename
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index efe6356..50ad2c9 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerMa
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.ArrayList;
 import java.util.List;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -106,8 +107,9 @@ public class AuditorPeriodicBookieCheckTest extends 
BookKeeperClusterTestCase {
 
                 LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, 
"passwd".getBytes());
                 LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
-                List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+                List<BookieSocketAddress> ensemble = new 
ArrayList<>(md.getEnsembles().get(0L));
                 ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+                md.updateEnsemble(0L, ensemble);
 
                 TestCallbacks.GenericCallbackFuture<LedgerMetadata> cb =
                     new TestCallbacks.GenericCallbackFuture<LedgerMetadata>();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index e706d1e..cc4287d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -366,7 +366,7 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
     private BookieSocketAddress 
replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception {
         int bookieIdx = -1;
         Long entryId = 
LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().firstKey();
-        ArrayList<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
+        List<BookieSocketAddress> curEnsemble = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles().get(entryId);
 
         // Identify a bookie in the current ledger ensemble to be replaced
@@ -463,9 +463,9 @@ public class AuditorPeriodicCheckTest extends 
BookKeeperClusterTestCase {
         // check that ensemble has changed and the bookie that rejected writes 
has
         // been replaced in the ensemble
         LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32, 
"passwd".getBytes());
-        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : 
LedgerHandleAdapter.getLedgerMetadata(newLh).
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : 
LedgerHandleAdapter.getLedgerMetadata(newLh).
                 getEnsembles().entrySet()) {
-            ArrayList<BookieSocketAddress> ensemble = e.getValue();
+            List<BookieSocketAddress> ensemble = e.getValue();
             assertFalse("Ensemble hasn't been updated", 
ensemble.contains(replacedBookie));
         }
         newLh.close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index abd8d56..4929b0e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -443,9 +443,9 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 
5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
         int ledgerReplicaIndex = 0;
-        final SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
+        final SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
-        final ArrayList<BookieSocketAddress> bkAddresses = ensembles.get(0L);
+        final List<BookieSocketAddress> bkAddresses = ensembles.get(0L);
         BookieSocketAddress replicaToKillAddr = bkAddresses.get(0);
         for (BookieSocketAddress bookieSocketAddress : bkAddresses) {
             if (!isCreatedFromIp(bookieSocketAddress)){
@@ -522,9 +522,9 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 
5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
         int ledgerReplicaIndex = 0;
-        final SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
+        final SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
-        final ArrayList<BookieSocketAddress> bkAddresses = ensembles.get(0L);
+        final List<BookieSocketAddress> bkAddresses = ensembles.get(0L);
         BookieSocketAddress replicaToKillAddr = bkAddresses.get(0);
         for (BookieSocketAddress bookieSocketAddress : bkAddresses) {
             if (isCreatedFromIp(bookieSocketAddress)) {
@@ -577,7 +577,7 @@ public class BookieAutoRecoveryTest extends 
BookKeeperClusterTestCase {
     }
 
     private int getReplicaIndexInLedger(LedgerHandle lh, BookieSocketAddress 
replicaToKill) {
-        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
+        SortedMap<Long, ? extends List<BookieSocketAddress>> ensembles = 
LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
         int ledgerReplicaIndex = -1;
         for (BookieSocketAddress addr : ensembles.get(0L)) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
index 06a8b85..ca76784 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestAutoRecoveryAlongWithBookieServers.java
@@ -23,10 +23,9 @@ package org.apache.bookkeeper.replication;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
-import java.util.Set;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -88,10 +87,9 @@ public class TestAutoRecoveryAlongWithBookieServers extends
         }
 
         // Killing all bookies except newly replicated bookie
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = 
LedgerHandleAdapter
-                .getLedgerMetadata(lh).getEnsembles().entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                 lh.getLedgerMetadata().getEnsembles().entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (bookie.equals(newBkAddr)) {
                     continue;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0b15f1d..f64456b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -25,11 +25,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Cleanup;
@@ -689,10 +688,9 @@ public class TestReplicationWorker extends 
BookKeeperClusterTestCase {
     private void killAllBookies(LedgerHandle lh, BookieSocketAddress excludeBK)
             throws Exception {
         // Killing all bookies except newly replicated bookie
-        Set<Entry<Long, ArrayList<BookieSocketAddress>>> entrySet = 
LedgerHandleAdapter
-                .getLedgerMetadata(lh).getEnsembles().entrySet();
-        for (Entry<Long, ArrayList<BookieSocketAddress>> entry : entrySet) {
-            ArrayList<BookieSocketAddress> bookies = entry.getValue();
+        for (Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                 lh.getLedgerMetadata().getEnsembles().entrySet()) {
+            List<BookieSocketAddress> bookies = entry.getValue();
             for (BookieSocketAddress bookie : bookies) {
                 if (bookie.equals(excludeBK)) {
                     continue;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index 63eb641..d360b1f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -646,6 +647,7 @@ public class TestHttpService extends 
BookKeeperClusterTestCase {
             try {
                 testListUnderReplicatedLedgerService(mFactory);
             } catch (Exception e) {
+                LOG.info("Exception in test", e);
                 throw new UncheckedExecutionException(e.getMessage(), 
e.getCause());
             }
             return null;
@@ -671,8 +673,9 @@ public class TestHttpService extends 
BookKeeperClusterTestCase {
 
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, 
"passwd".getBytes());
         LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
-        List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+        List<BookieSocketAddress> ensemble = new 
ArrayList<>(md.getEnsembles().get(0L));
         ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+        md.updateEnsemble(0L, ensemble);
 
         TestCallbacks.GenericCallbackFuture<LedgerMetadata> cb =
             new TestCallbacks.GenericCallbackFuture<LedgerMetadata>();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index d4a480f..8dafe8b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -28,11 +28,11 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
@@ -436,8 +436,8 @@ public class TestTLS extends BookKeeperClusterTestCase {
         ClientConfiguration clientConf = new 
ClientConfiguration(baseClientConf);
         LedgerMetadata metadata = testClient(clientConf, 2);
         assertTrue(metadata.getEnsembles().size() > 0);
-        Collection<ArrayList<BookieSocketAddress>> ensembles = 
metadata.getEnsembles().values();
-        for (ArrayList<BookieSocketAddress> bookies : ensembles) {
+        Collection<? extends List<BookieSocketAddress>> ensembles = 
metadata.getEnsembles().values();
+        for (List<BookieSocketAddress> bookies : ensembles) {
             for (BookieSocketAddress bookieAddress : bookies) {
                 int port = bookieAddress.getPort();
                 assertTrue(tlsBookiePorts.contains(port));
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 9d39d38..88a1344 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -83,7 +83,7 @@ public class LedgerReader {
         bookieClient = bkc.getBookieClient();
     }
 
-    public static SortedMap<Long, ArrayList<BookieSocketAddress>> 
bookiesForLedger(final LedgerHandle lh) {
+    public static SortedMap<Long, ? extends List<BookieSocketAddress>> 
bookiesForLedger(final LedgerHandle lh) {
         return lh.getLedgerMetadata().getEnsembles();
     }
 
@@ -120,7 +120,7 @@ public class LedgerReader {
             }
         };
 
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback,
@@ -225,7 +225,7 @@ public class LedgerReader {
             }
         };
 
-        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
+        List<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
             bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback,
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 02edeba..d19e768 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -1037,7 +1037,7 @@ import org.slf4j.LoggerFactory;
             LedgerHandle lh = 
bkc.client().get().openLedgerNoRecovery(segment.getLogSegmentId(),
                     BookKeeper.DigestType.CRC32, 
getConf().getBKDigestPW().getBytes(UTF_8));
             long eidFirst = 0;
-            for (SortedMap.Entry<Long, ArrayList<BookieSocketAddress>>
+            for (SortedMap.Entry<Long, ? extends List<BookieSocketAddress>>
                     entry : LedgerReader.bookiesForLedger(lh).entrySet()) {
                 long eidLast = entry.getKey().longValue();
                 long count = eidLast - eidFirst + 1;

Reply via email to